Spring Cloud Stream with RabbitMQ: Message-Driven Microservices
In this tutorial, we will introduce you to Spring Cloud Stream , which is a framework for building message-driven microservice applications that are connected by messaging brokers such as RabbitMQ, Apache Kafka, etc.
RabbitMQ Tutorial :
Note : The full source code for Spring Cloud Stream with RabbitMQ example can be downloaded at the end of this article.
Spring Cloud Stream builds on Spring Boot to develop stand-alone, production-grade Spring applications and uses Spring Integration to connect to message brokers.
In this example, will be using RabbitMQ as a message broker to communicate between microservices. Let's review some basic concept of broker.
How messaging works in microservices
In microservices architecture, we may need to communicate between different microservices, so rather than using HTTP, we can can use RabbitMQ as message broker for communication. In th above diagram, Microservice A publishes it's event to Message Broker, from which Subscribers(Microservice B and C) can scribe the events.
RabbitMQ is an open source, message broker. RabbitMQ uses Advanced Message Queuing Protocol (AMQP). Its an open standard layer used to communicates date across network by means of byte stream.
Now let's start with example
RabbitMQ Setup
You can setup RabbitMQ at local and test this example.
In this example will use free cloud based rabbitMQ provided by
Cloudamqp
Do Signup
Create New Instance
Provide Name and click on Select Region.
Select Data Center and click on Review.
Click on create instance.
Instance would be created successfully.
Project Setup
Create Producer Spring Boot Project from Spring InitializrProducer Project Structure
Add below dependencies in pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.tech.geek.next.emp</groupId>
<artifactId>spring-cloud-stream-amqp-rabbitmq-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-cloud-stream-amqp-rabbitmq-producer</name>
<description>Demo project for Spring Boot Producer</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.SR1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Create Message Channel
Let's create interface to create the message channel.package com.tech.geek.next.emp.springcloudstreamamqprabbitmqproducer;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface EmployeeBinding {
@Output("empChannel")
MessageChannel publishMessage();
}
Create Controller
package com.tech.geek.next.emp.springcloudstreamamqprabbitmqproducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class EmployeeProducerController {
private MessageChannel messageChannel;
public EmployeeProducerController(EmployeeBinding empBinding) {
messageChannel = empBinding.publishMessage();
}
@GetMapping("/publish/{msg}")
public void publish(@PathVariable String msg) {
this.messageChannel.send(MessageBuilder.withPayload(msg)
.build());
}
}
Producer Main Application
Add@EnableBinding
to enable rabbitmq binding.
package com.tech.geek.next.emp.springcloudstreamamqprabbitmqproducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableBinding(EmployeeBinding.class)
@SpringBootApplication
public class SpringCloudStreamAmqpRabbitmqProducerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamAmqpRabbitmqProducerApplication.class, args);
}
}
application.properties
Add below properties to tell Cloud AMQP to connect to defined url. Use below url from details as given below:spring.rabbitmq.addresses=amqp://cnecwrfz@chimpanzee.rmq.cloudamqp.com/cnecwrfz
spring.cloud.stream.bindings.empChannel.destination = publishMessage
server.port=8080
Create Consumer Spring Boot Project from Spring Initializr
Consumer Project Structure
Add below dependencies in pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.tech.geek.next.emp</groupId>
<artifactId>spring-cloud-stream-amqp-rabbitmq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-cloud-stream-amqp-rabbitmq-consumer</name>
<description>Demo project for Spring Boot Consumer</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.SR1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
</project>
Create Binding
package com.tech.geek.next.emp.springcloudstreamamqprabbitmqconsumer;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface EmployeeBinding {
String EMP_CHANNEL = "empChannel";
@Input(EMP_CHANNEL)
SubscribableChannel empMsg();
}
Create Message Listener
package com.tech.geek.next.emp.springcloudstreamamqprabbitmqconsumer;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
@EnableBinding(EmployeeBinding.class)
public class EmployeeListener {
@StreamListener(target = EmployeeBinding.EMP_CHANNEL)
public void processEmpNameChannel(String msg) {
System.out.println(msg);
}
}
application.properties
Add below properties to tell Cloud AMQP to connect to defined url.spring.rabbitmq.addresses=amqp://cnecwrfz@chimpanzee.rmq.cloudamqp.com/cnecwrfz
spring.cloud.stream.bindings.empChannel.destination = publishMessage
server.port=8080
Testing
Start Producer, you can see the producer logs:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.5.RELEASE)
2019-10-31 16:02:19.140 INFO 16212 --- [ main] oudStreamAmqpRabbitmqProducerApplication : Starting SpringCloudStreamAmqpRabbitmqProducerApplication
2019-10-31 16:02:21.233 INFO 16212 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'mbeanServer' of type [com.sun.jmx.mbeanserver.JmxMBeanServer] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-10-31 16:02:22.216 INFO 16212 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
2019-10-31 16:02:22.281 INFO 16212 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2019-10-31 16:02:22.282 INFO 16212 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.19]
2019-10-31 16:02:22.508 INFO 16212 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2019-10-31 16:02:22.508 INFO 16212 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 3254 ms
2019-10-31 16:02:23.900 INFO 16212 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-10-31 16:02:24.721 INFO 16212 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2019-10-31 16:02:25.053 INFO 16212 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel empChannel
2019-10-31 16:02:25.191 INFO 16212 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel errorChannel
2019-10-31 16:02:25.274 INFO 16212 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel nullChannel
2019-10-31 16:02:25.312 INFO 16212 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageHandler errorLogger
2019-10-31 16:02:25.375 INFO 16212 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2019-10-31 16:02:25.376 INFO 16212 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2019-10-31 16:02:25.376 INFO 16212 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started _org.springframework.integration.errorLogger
2019-10-31 16:02:25.778 INFO 16212 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [chimpanzee.rmq.cloudamqp.com:5672]
2019-10-31 16:02:28.081 INFO 16212 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#11dfd79:0/SimpleConnection@18af6fa [delegate=amqp://cnecwrfz@54.17.199.170:5672/cnecwrfz, localPort= 53402]
2019-10-31 16:02:28.752 INFO 16212 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.empChannel' has 1 subscriber(s).
2019-10-31 16:02:28.826 INFO 16212 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-10-31 16:02:28.830 INFO 16212 --- [ main] oudStreamAmqpRabbitmqProducerApplication : Started SpringCloudStreamAmqpRabbitmqProducerApplication in 10.574 seconds (JVM running for 22.928)
2019-10-31 16:03:18.024 INFO 16212 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2019-10-31 16:03:18.024 INFO 16212 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2019-10-31 16:03:18.044 INFO 16212 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 16 ms
2019-10-31 16:03:18.142 INFO 16212 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [chimpanzee.rmq.cloudamqp.com:5672]
2019-10-31 16:03:19.710 INFO 16212 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#1a39ed9:0/SimpleConnection@1ff4b7e [delegate=amqp://cnecwrfz@54.17.199.170:5672/cnecwrfz, localPort= 53419]
Let's start the Consumer, publish the message by using producer endpoint localhost:8080/publish/Welcome techgeeknext , and see the Consumer logs.
If you see the logs, Producer produces the message and Consumer consumes the message
Welcome techgeeknext
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.5.RELEASE)
2019-10-31 16:02:41.099 INFO 19424 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration' of type [org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration$$EnhancerBySpringCGLIB$$49062570] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-10-31 16:02:41.126 INFO 19424 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'mbeanServer' of type [com.sun.jmx.mbeanserver.JmxMBeanServer] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-10-31 16:02:42.033 INFO 19424 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9090 (http)
2019-10-31 16:02:42.081 INFO 19424 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2019-10-31 16:02:42.082 INFO 19424 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.19]
2019-10-31 16:02:42.271 INFO 19424 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2019-10-31 16:02:42.272 INFO 19424 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 3090 ms
2019-10-31 16:02:43.514 INFO 19424 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-10-31 16:02:44.226 INFO 19424 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2019-10-31 16:02:44.567 INFO 19424 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel empChannel
2019-10-31 16:02:44.714 INFO 19424 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel nullChannel
2019-10-31 16:02:44.752 INFO 19424 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel errorChannel
2019-10-31 16:02:44.813 INFO 19424 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageHandler errorLogger
2019-10-31 16:02:44.880 INFO 19424 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.empChannel' has 1 subscriber(s).
2019-10-31 16:02:44.893 INFO 19424 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2019-10-31 16:02:44.894 INFO 19424 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2019-10-31 16:02:44.894 INFO 19424 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started _org.springframework.integration.errorLogger
2019-10-31 16:02:45.198 INFO 19424 --- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: publishMessage.anonymous.62k5RQVATBGYxVtWf1uJiw, bound to: publishMessage
2019-10-31 16:02:45.205 INFO 19424 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [chimpanzee.rmq.cloudamqp.com:5672]
2019-10-31 16:02:46.540 INFO 19424 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#17163a7:0/SimpleConnection@145dc8f [delegate=amqp://cnecwrfz@54.17.199.170:5672/cnecwrfz, localPort= 53410]
2019-10-31 16:02:47.682 INFO 19424 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel publishMessage.anonymous.62k5RQVATBGYxVtWf1uJiw.errors
2019-10-31 16:02:47.864 INFO 19424 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'application.publishMessage.anonymous.62k5RQVATBGYxVtWf1uJiw.errors' has 1 subscriber(s).
2019-10-31 16:02:47.864 INFO 19424 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'application.publishMessage.anonymous.62k5RQVATBGYxVtWf1uJiw.errors' has 2 subscriber(s).
2019-10-31 16:02:48.786 INFO 19424 --- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started inbound.publishMessage.anonymous.62k5RQVATBGYxVtWf1uJiw
2019-10-31 16:02:48.866 INFO 19424 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 9090 (http) with context path ''
2019-10-31 16:02:48.866 INFO 19424 --- [ main] oudStreamAmqpRabbitmqConsumerApplication : Started SpringCloudStreamAmqpRabbitmqConsumerApplication in 10.557 seconds (JVM running for 21.898)
Welcome techgeeknext
In this tutorial, we have see how microservices communicates with each other by means of RabbitMQ message broker. We can also use other message bokers like Kafka etc by changing configuration properties.
Download Source Code
The full source code for this article can be found on below.Download it here - Spring Cloud Stream with RabbitMQ example