
SCS is a framework for building highly scalable event-driven microservices connected with shared messaging systems. The benefit of using Spring Cloud Stream is loose coupling between message broker and application. We can change the message broker without doing any changes in the code, the only thing we to change properties in application.properties file. For example, if we don't use Spring Cloud Stream then we need to create one config class where we should declare the following beans as below to declare Queue, Topic Exchange (Binding both of these with routing key), Amqp Template.
@Bean
public Queue queue() {
return new Queue(QUEUE);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE);
}
@Bean
public Binding binding(Queue queue, TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(ROUTING_KEY);
}
@Bean
public MessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate template(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
return rabbitTemplate;
}If we use Spring Cloud Stream then the entire above work will be handled by Spring Cloud Stream. Let us check the sample project for that. Our POM.xml file looks like this, majorly we require Spring cloud Stream and Message Broker dependency (in this case it is Solace).
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.7</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.spring.cloud.stream.solace</groupId>
<artifactId>solace-integration-spring-boot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>solace-integration-spring-boot</name>
<description>Project for Spring Integration and Solace</description>
<properties>
<java.version>11</java.version>
<solace-spring-cloud.version>2.1.0</solace-spring-cloud.version>
<spring-cloud.version>2020.0.4</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>spring-cloud-starter-stream-solace</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-bom</artifactId>
<version>${solace-spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Our application.yml file looks like this:
server:
port: 8080
spring:
cloud:
function:
definition: processor;consumer
stream:
bindings:
producer-out-0:
destination: sample.queue.processor
group: nonexclusive
processor-in-0:
destination: sample.queue.processor
group: nonexclusive
processor-out-0:
destination: sample.queue.consumer
group: nonexclusive
consumer-in-0:
destination: sample.queue.consumer
group: nonexclusive
solace:
bindings:
consumer-in-0:
consumer:
provisionSubscriptionsToDurableQueue: false
provisionDurableQueue: false
queueNamePrefix: ""
useFamiliarityInQueueName: false
useDestinationEncodingInQueueName: false
useGroupNameInQueueName: false
content-type: "application/json"
processor-in-0:
consumer:
provisionSubscriptionsToDurableQueue: false
provisionDurableQueue: false
queueNamePrefix: ""
useFamiliarityInQueueName: false
useDestinationEncodingInQueueName: false
useGroupNameInQueueName: false
content-type: "application/json"
producer-out-0:
producer:
provisionSubscriptionsToDurableQueue: false
provisionDurableQueue: false
queueNamePrefix: ""
useFamiliarityInQueueName: false
useDestinationEncodingInQueueName: false
useGroupNameInQueueName: false
content-type: "application/json"
processor-out-0:
producer:
provisionSubscriptionsToDurableQueue: false
provisionDurableQueue: false
queueNamePrefix: ""
useFamiliarityInQueueName: false
useDestinationEncodingInQueueName: false
useGroupNameInQueueName: false
content-type: "application/json"
binders:
solace:
type: solace
environment:
solace:
java:
host: tcps://mr-0hnblucrjm6.messaging.solace.cloud:55443
msgVpn: demoservice
clientUsername: solace-cloud-client
clientPassword: qtc6uqufdg5b990crlsibra09i
connectRetries: 3
connectRetriesPerHost: 0
reconnectRetries: 3
Here point to be noted is while binding any binder we should use <binder_name>-in or out-index no., where in or out means flow from application to broker and index is used for the same binder. By using this we can bind any number of beans as we want.
Message Producer is looking like this:
@RestController
public class MessageProducer {
@Autowired
private StreamBridge streamBridge;
@PostMapping("/msg")
public ResponseEntity<String> sendMessage(@RequestBody String message){
boolean send = streamBridge.send("producer-out-0", message);
return ResponseEntity.ok("Message sent");
}
}
Here StreamBridge will send the message to “producer-out-0” binding which is bound with queue sample.queue.processor refer application.properties.
Our Processor looks like this:
package com.spring.cloud.stream.solace.processor;
import java.util.function.Function;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageProcessor {
@Bean
public Function<String, String> processor() {
return (msg) -> {
System.out.println("Prining from Processor");
return msg.toUpperCase()+" : Message is processed";
};
}
}
Here our processor will listen to sample.queue.processor and send the response to sample.queue.consumer after performing our business logic.
Last but not least our consumer looks like this:
package com.spring.cloud.stream.solace.consumer;
import java.util.function.Consumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageConsumer {
@Bean
public Consumer<String> consumer() {
return (msg) -> {
System.out.println("Received the message in Consumer :"+msg);
};
}
}
It will listen to queue sample.queue.consumer for any message.
In this, we haven't hardcode any configuration which is entirely handled by Spring Cloud Stream. When we require we can change the message broker by changing some properties in application.yml file.
Bullet Points:
If you think there are some mistakes or some errors in the above article, feel free to reach out to me.