
SCS is a framework for building highly scalable event-driven microservices connected with shared messaging systems. The benefit of using Spring Cloud Stream is loose coupling between message broker and application. We can change the message broker without doing any changes in the code, the only thing we to change properties in application.properties file. For example, if we don't use Spring Cloud Stream then we need to create one config class where we should declare the following beans as below to declare Queue, Topic Exchange (Binding both of these with routing key), and Amqp Template.
@Bean public Queue queue() { return new Queue(QUEUE); } @Bean public TopicExchange topicExchange() { return new TopicExchange(EXCHANGE); } @Bean public Binding binding(Queue queue, TopicExchange topicExchange) { return BindingBuilder.bind(queue).to(topicExchange).with(ROUTING_KEY); } @Bean public MessageConverter converter() { return new Jackson2JsonMessageConverter(); } @Bean public AmqpTemplate template(ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setMessageConverter(converter()); return rabbitTemplate; }
If we use Spring Cloud Stream then the entire above work will be handled by Spring Cloud Stream. Let us check the sample project for that. Our POM.xml file looks like majorly we require Spring Cloud Stream and Message Broker dependency (in this case it is RabbitMQ).
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.1</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.spring.cloud.stream.rabbitmq</groupId>
<artifactId>rabbitmq-integration-spring-boot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq-integration-spring-boot</name>
<description>Project for Spring Integration and RabbitMQ</description>
<properties>
<java.version>11</java.version>
<spring-cloud.version>2021.0.0</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
Our application.properties file look like this:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.function.definition=processor;consumer
spring.cloud.stream.bindings.processor-in-0.destination=sample.queue
spring.cloud.stream.bindings.processor-in-0.group=processor
spring.cloud.stream.bindings.processor-out-0.destination=sample.queue
spring.cloud.stream.bindings.processor-out-0.group=consumer
spring.cloud.stream.bindings.consumer-in-0.destination=sample.queue
spring.cloud.stream.bindings.consumer-in-0.group=consumer
spring.cloud.stream.bindings.producer-out-0.destination=sample.queue
spring.cloud.stream.bindings.producer-out-0.group=processor
Here point to be noted is while binding any binder we should use <binder_name>-in or out-index no., where in or out means flow from application to broker and index is used for the same binder. By using this we can bind any number of the bean as we want.
Message Producer is looking like this:
package com.spring.cloud.stream.rabbitmq.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @RestController public class MessageProducer { @Autowired private StreamBridge streamBridge; @PostMapping("/msg") public ResponseEntity<String> sendMessage(@RequestBody String message){ streamBridge.send("producer-out-0", message); return ResponseEntity.ok("Message sent"); } }
Here StreamBridge will send the message to “producer-out-0” binding which is bound with queue sample.queue.processor refer application.properties.
Our Processor looks like this:
package com.spring.cloud.stream.rabbitmq.processor;
import java.util.function.Function;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageProcessor {
@Bean
public Function<String, String> processor() {
return (msg) -> {
System.out.println("Prining from Processor");
return msg.toUpperCase()+" : Message is processed";
};
}
}
Here our processor will listen to sample.queue.processor and send the response to sample.queue.consumer after performing our business logic.
Last but not least our consumer looks like this:
package com.spring.cloud.stream.rabbitmq.consumer;
import java.util.function.Consumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageConsumer {
@Bean
public Consumer<String> consumer() {
return (msg) -> {
System.out.println("Received the message in Consumer :"+msg);
};
}
}
It will listen to queue sample.queue.consumer for any message.
In this, we haven't hardcode any configuration which is entirely handled by Spring Cloud Stream. When we require we can change the message broker by changing some properties in application.properties file.
There are a few more properties, I'm leaving here for your ready reference which save someone's some time:
To customize the delivery tag of the message:
spring.cloud.stream.rabbit.bindings.<binding-name>-in-0.consumer.consumer-tag-prefix=articleTag
To enable batching in application:
spring.cloud.stream.rabbit.default.batchingEnabled=true
spring.cloud.stream.rabbit.bindings.<binding-name>-in-0.consumer.batch-size=(any Integer value)
If you think there are some mistake in this article feel free to comment it out.