Spring Boot Apache Kafka JSON Message Example (2024)
Overview
In this article, we'll explore how to publish and consume Json message in apache kafka.
Apache Kafka Tutorial :
- Apache Kafka Architecture
- Install Apache Kafka (Zookeeper+Broker) / Kafka Tool
- Spring Boot Apache Kafka String Message Example
- Spring Boot Apache Kafka JSON Message Example
- Apache Kafka Interview Questions and Answers
Create Spring Boot Kafka Example
Let's get started on the project now.
Project Structure
Kafka Server Port
Define kafka server port in application.properties as given below.techgeeknext.kafka.bootstrap-servers: localhost:9092
- Create Spring Boot Maven project
- Define the pom.xml as follows- Add the spring-kafka dependency.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.techgeeknext</groupId> <artifactId>kafaExample</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafaExample</name> <description>Spring Boot Kafka</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
Define the Configuration Class to produce the messages.
In the above example, ProducerFactory is responsible for creating Kafka Producer instances. In producerConfigs(), we configure below properties:package com.techgeeknext.config; import com.techgeeknext.model.User; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaProducerConfig { private final Logger LOG = LoggerFactory.getLogger(getClass()); @Value("${techgeeknext.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ProducerFactory<String, User> userProducerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, User> userKafkaTemplate() { return new KafkaTemplate<>(userProducerFactory()); } }
- BOOTSTRAP_SERVERS_CONFIG - Host and port on which Kafka is running.
- KEY_SERIALIZER_CLASS_CONFIG - Serializer class to be used for the key.
- VALUE_SERIALIZER_CLASS_CONFIG - Serializer class to be used for the value. We are using JsonSerializer for values. Will serialize the object of the model class.
- Define the Service class to auto wire the Kafka Template object to publish the Json in techgeeknext-topic as shown.
package com.techgeeknext.service; import com.techgeeknext.model.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducerService { private final Logger LOG = LoggerFactory.getLogger(KafkaProducerService.class); @Autowired private KafkaTemplate<String, User> kafkaTemplate; String kafkaTopic = "techgeeknext-topic"; public void send(User user) { LOG.info("Sending User Json Serializer : {}", user); kafkaTemplate.send(kafkaTopic, user); } }
- Now define controller class for publishing the Json Object through kafka topic.
package com.techgeeknext.controller; import com.techgeeknext.model.User; import com.techgeeknext.service.KafkaProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @RestController @RequestMapping(value = "/techgeeknext-kafka/") public class KafkaExampleController { @Autowired KafkaProducerService kafkaProducer; @PostMapping("/producer") public String sendMessage(@RequestBody User user) { kafkaProducer.send(user); return "Message sent successfully to the Kafka topic techgeeknext-topic"; } }
- Define Consumer configuration class to consume Json message.
package com.techgeeknext.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import com.techgeeknext.model.User; @EnableKafka @Configuration public class KafkaConsumerConfig { @Value("${techgeeknext.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ConsumerFactory<String, User> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "techgeeknext-group"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(User.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
-
Now, write a Listener method using @KafkaListener annotation to listen to the Json messages coming via techgeeknext-topic with techgeeknext-group.
package com.techgeeknext; import com.techgeeknext.model.User; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener; @SpringBootApplication public class KafaExampleApplication { public static void main(String[] args) { SpringApplication.run(KafaExampleApplication.class, args); } @KafkaListener(topics = "techgeeknext-topic", groupId = "techgeeknext-group") public void listen(User user) { System.out.println("Received User information : " + user); } }
- Next start the Spring Boot Application by running spring-boot:run.
- Start zookeeper : This Kafka installation comes with a built-in zookeeper. Zookeeper is mainly used to track the status of the nodes present in the Kafka cluster and to keep track of Kafka topics, messages, etc.
Refer Install Apache Kafka to know the steps to install Zookeeper and Kafka..\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
- Start the Apache Kafka : Use below command to start the Apache Kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
- Use postman to post/send the User object to apache kafka as follows- http://localhost:8080/techgeeknext-kafka/producer/
- Again send other User object with different User Name-
http://localhost:8080/techgeeknext-kafka/producer/
Above 2 post endpoints will trigger the json message to be sent to the techgeeknext-topic with techgeeknext-group.
- On the Spring Boot console, we can see consumer started and User Json messages is received by the consumer.
- Also, you can verify that techgeeknext-topic with techgeeknext-group has been created in Kafka Tool with other things like partition,consumer, producer etc.
Take a look at our suggested posts:
Messages Producer
Message Consumer
Download Source Code
The full source code for this article can be found on below.Download it here - Spring Boot Apache Kafka Json Message Example