Spring Boot Apache Kafka Example
Overview
In this article, we'll explore how to integrate Apache Kafka with Spring Boot.
- Overview of Apache Kafka
- Install Kafka
- spring boot kafka project
Apache Kafka Tutorial :
- Apache Kafka Architecture
- Install Apache Kafka (Zookeeper+Broker) / Kafka Tool
- Spring Boot Apache Kafka String Message Example
- Spring Boot Apache Kafka JSON Message Example
- Apache Kafka Interview Questions and Answers
Overview of Apache Kafka
Apache Kafka is A high-throughput distributed streaming platform. It's a publish-subscribe messaging rethought as a distributed commit log
A streaming platform has three key capabilities:- Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system
- Store streams of records in a fault-tolerant durable way
- Process streams of records as they occur.
- Building real-time streaming data pipelines that reliably get data between systems or applications
- Building real-time streaming applications that transform or react to the streams of data
Take a look at our suggested posts:
Create Spring Boot Kafka Example
Let's get started on the project now.
Project Structure
Kafka Server Port
Define kafka server port in application.properties as given below.techgeeknext.kafka.bootstrap-servers: localhost:9092
- Create Spring Boot Maven project
- Define the pom.xml as follows- Add the spring-kafka dependency.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.techgeeknext</groupId> <artifactId>kafaExample</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafaExample</name> <description>Spring Boot Kafka</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
Define the Configuration Class to produce the messages.
In the above example, ProducerFactory is responsible for creating Kafka Producer instances. In producerConfigs(), we configure below properties:package com.techgeeknext.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; @Configuration public class KafkaProducerConfig { @Value("${techgeeknext.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
- BOOTSTRAP_SERVERS_CONFIG - Host and port on which Kafka is running.
- KEY_SERIALIZER_CLASS_CONFIG - Serializer class to be used for the key.
- VALUE_SERIALIZER_CLASS_CONFIG - Serializer class to be used for the value. We are using StringSerializer for both keys and values.
- Define the Service class to auto wire the Kafka Template object to publish the message using techgeeknext-topic as shown.
package com.techgeeknext.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducerService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; String kafkaTopic = "techgeeknext-topic"; public void send(String message) { kafkaTemplate.send(kafkaTopic, message); } }
- Now define controller class for publishing the message through kafka.
package com.techgeeknext.controller; import com.techgeeknext.service.KafkaProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping(value = "/techgeeknext-kafka/") public class KafkaExampleController { @Autowired KafkaProducerService kafkaProducer; @GetMapping(value = "/producer") public String sendMessage(@RequestParam("message") String message) { kafkaProducer.send(message); return "Message sent Successfully to the Kafka topic techgeeknext-topic"; } }
- Define Consumer configuration class to consume messages.
package com.techgeeknext.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @EnableKafka @Configuration public class KafkaConsumerConfig { @Value("${techgeeknext.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
-
Now, write a Listener method using @KafkaListener annotation to listen to the messages coming via techgeeknext-topic with group-id.
package com.techgeeknext; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener; @SpringBootApplication public class KafaExampleApplication { public static void main(String[] args) { SpringApplication.run(KafaExampleApplication.class, args); } @KafkaListener(topics = "techgeeknext-topic", groupId = "group-id") public void listen(String message) { System.out.println("Received Messasge : " + message); } }
- Next start the Spring Boot Application by running spring-boot:run.
- Start zookeeper : This Kafka installation comes with a built-in zookeeper. Zookeeper is mainly used to track the status of the nodes present in the Kafka cluster and to keep track of Kafka topics, messages, etc.
Refer Install Apache Kafka to know the steps to install Zookeeper and Kafka..\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
- Start the Apache Kafka : Use below command to start the Apache Kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
- Finally, hit the url with Hello message as follows- http://localhost:8080/techgeeknext-kafka/producer?message=Hello
- Then, hit the url with different message Next Message Testing as follows-
http://localhost:8080/techgeeknext-kafka/producer?message=Next Message Testing
Above 2 urls will trigger the message to be sent to the techgeeknext-topic with group-id.
- On the Spring Boot console, we can see consumer started and messages is received by the consumer.
- Also, you can verify that techgeeknext-topic with group-id has been created in Kafka Tool with other things like partition,consumer, producer etc.
Messages Producer
Message Consumer
Download Source Code
The full source code for this article can be found on below.Download it here - Spring Boot Kafka Example