E2EHIRING Logo
search

Search blogs by title

Jobs
Jobs
internships
Internships
Company
Assessment
mentorship
Mentorship
more
Moredropdown
Login
HomeSepratorIconBlogsSepratorIconSpring Cloud Stream with Solace SepratorIcon

Spring Cloud Stream with Solace

Han SoloDeepak Samria
calendar1 Feb 2022
poster

SCS is a framework for building highly scalable event-driven microservices connected with shared messaging systems. The benefit of using Spring Cloud Stream is loose coupling between message broker and application. We can change the message broker without doing any changes in the code, the only thing we to change properties in application.properties file. For example, if we don't use Spring Cloud Stream then we need to create one config class where we should declare the following beans as below to declare Queue, Topic Exchange (Binding both of these with routing key), Amqp Template.

@Bean
public Queue queue() {
 return new 
Queue(QUEUE);
}


@Bean
public TopicExchange topicExchange() {
 return new TopicExchange(EXCHANGE);
}

@Bean
public Binding binding(Queue queue, TopicExchange topicExchange) {
 return BindingBuilder.bind(queue).to(topicExchange).with(ROUTING_KEY);
}

@Bean
 public MessageConverter converter() {
 return new Jackson2JsonMessageConverter();
 }

@Bean
public AmqpTemplate template(ConnectionFactory connectionFactory) {
 final RabbitTemplate rabbitTemplate = new RabbitTemplate();
 rabbitTemplate.setConnectionFactory(connectionFactory);
 rabbitTemplate.setMessageConverter(converter());
 return rabbitTemplate;
}



If we use Spring Cloud Stream then the entire above work will be handled by Spring Cloud Stream. Let us check the sample project for that. Our POM.xml file looks like this, majorly we require Spring cloud Stream and Message Broker dependency (in this case it is Solace).


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.7</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.spring.cloud.stream.solace</groupId>
    <artifactId>solace-integration-spring-boot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>solace-integration-spring-boot</name>
    <description>Project for Spring Integration and Solace</description>
    <properties>
        <java.version>11</java.version>
        <solace-spring-cloud.version>2.1.0</solace-spring-cloud.version>
        <spring-cloud.version>2020.0.4</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.solace.spring.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-solace</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <scope>test</scope>
            <classifier>test-binder</classifier>
            <type>test-jar</type>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.solace.spring.cloud</groupId>
                <artifactId>solace-spring-cloud-bom</artifactId>
                <version>${solace-spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Our application.yml file looks like this:
server:
  port: 8080
spring:
  cloud:
    function:
      definition: processor;consumer
    stream:
      bindings:
        producer-out-0:
          destination: sample.queue.processor
          group: nonexclusive
        processor-in-0:
          destination: sample.queue.processor
          group: nonexclusive  
        processor-out-0:
          destination: sample.queue.consumer
          group: nonexclusive 
        consumer-in-0:
          destination: sample.queue.consumer
          group: nonexclusive   
        
      solace:
        bindings:
          consumer-in-0:
            consumer:
              provisionSubscriptionsToDurableQueue: false
              provisionDurableQueue: false
              queueNamePrefix: ""
              useFamiliarityInQueueName: false
              useDestinationEncodingInQueueName: false
              useGroupNameInQueueName: false
              content-type: "application/json"
          processor-in-0:
            consumer:
              provisionSubscriptionsToDurableQueue: false
              provisionDurableQueue: false
              queueNamePrefix: ""
              useFamiliarityInQueueName: false
              useDestinationEncodingInQueueName: false
              useGroupNameInQueueName: false
              content-type: "application/json" 
          producer-out-0:
            producer:
              provisionSubscriptionsToDurableQueue: false
              provisionDurableQueue: false
              queueNamePrefix: ""
              useFamiliarityInQueueName: false
              useDestinationEncodingInQueueName: false
              useGroupNameInQueueName: false
              content-type: "application/json"     
          processor-out-0:
            producer:
              provisionSubscriptionsToDurableQueue: false
              provisionDurableQueue: false
              queueNamePrefix: ""
              useFamiliarityInQueueName: false
              useDestinationEncodingInQueueName: false
              useGroupNameInQueueName: false
              content-type: "application/json"    
    
      binders:
        solace:
          type: solace
          environment:
            solace:
              java:
                host: tcps://mr-0hnblucrjm6.messaging.solace.cloud:55443 
                msgVpn: demoservice 
                clientUsername: solace-cloud-client 
                clientPassword: qtc6uqufdg5b990crlsibra09i 
                connectRetries: 3 
                connectRetriesPerHost: 0 
                reconnectRetries: 3 

Here point to be noted is while binding any binder we should use <binder_name>-in or out-index no., where in or out means flow from application to broker and index is used for the same binder. By using this we can bind any number of beans as we want.

Message Producer is looking like this:

@RestController
public class MessageProducer {
    
    @Autowired
    private StreamBridge streamBridge;
    
    @PostMapping("/msg")
    public ResponseEntity<String> sendMessage(@RequestBody String message){
        boolean send = streamBridge.send("producer-out-0", message);
        return ResponseEntity.ok("Message sent");
    }
}


Here StreamBridge will send the message to “producer-out-0” binding which is bound with queue sample.queue.processor refer application.properties.

Our Processor looks like this:

package com.spring.cloud.stream.solace.processor;
import java.util.function.Function;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageProcessor {

    @Bean
    public Function<String, String> processor() {
        return (msg) -> {
            System.out.println("Prining from Processor");
            return msg.toUpperCase()+" : Message is processed";
        };
    }
}

Here our processor will listen to sample.queue.processor and send the response to sample.queue.consumer after performing our business logic. 

Last but not least our consumer looks like this:

package com.spring.cloud.stream.solace.consumer;

import java.util.function.Consumer;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageConsumer {
    
    @Bean
    public Consumer<String> consumer() {
        return (msg) -> {
          System.out.println("Received the message in Consumer :"+msg);
        };
    }
}

It will listen to queue sample.queue.consumer for any message. 

In this, we haven't hardcode any configuration which is entirely handled by Spring Cloud Stream. When we require we can change the message broker by changing some properties in application.yml file.

Bullet Points: 

  • There is some issue in Solace while working with application.properties, so it is better to use application.yml instead.
  • There is no batch facility is provided by Solace, so if you required you need to implement it yourself.

If you think there are some mistakes or some errors in the above article, feel free to reach out to me.


Recent Posts

How is Technology impacting the HR practices in India in the post Covid Era?

How is Technology impacting the HR practices in India in the post Covid Era?

Collection of 500+ ML projects

Collection of 500+ ML projects

How to Retain Your Tech Employees amid Economic Recession?

How to Retain Your Tech Employees amid Economic Recession?

User enumeration - vulnerability and mitigation

User enumeration - vulnerability and mitigation

LabelImg for image tagging - Image processing

LabelImg for image tagging - Image processing

copycopycopycopy

Han Solo

Recent Posts

How is Technology impacting the HR practices in India in the post Covid Era?

How is Technology impacting the HR practices in India in the post Covid Era?

Collection of 500+ ML projects

Collection of 500+ ML projects

How to Retain Your Tech Employees amid Economic Recession?

How to Retain Your Tech Employees amid Economic Recession?

User enumeration - vulnerability and mitigation

User enumeration - vulnerability and mitigation

LabelImg for image tagging - Image processing

LabelImg for image tagging - Image processing