E2EHIRING Logo
Jobs
Jobs
courses
Courses
mentorship
Mentorship
more
Moredropdown
E2EHIRING Logo
more
Jobs
Jobs
courses
Courses
mentorship
Mentorship
HomeSepratorIconBlogsSepratorIconSpring Cloud Stream with Solace SepratorIcon

Spring Cloud Stream with Solace

Han SoloDeepak Samria
calendar1 Feb 2022
poster

SCS is a framework for building highly scalable event-driven microservices connected with shared messaging systems. The benefit of using Spring Cloud Stream is loose coupling between message broker and application. We can change the message broker without doing any changes in the code, the only thing we to change properties in application.properties file. For example, if we don't use Spring Cloud Stream then we need to create one config class where we should declare the following beans as below to declare Queue, Topic Exchange (Binding both of these with routing key), Amqp Template.

@Bean
public Queue queue() {
 return new 
Queue(QUEUE);
}


@Bean
public TopicExchange topicExchange() {
 return new TopicExchange(EXCHANGE);
}

@Bean
public Binding binding(Queue queue, TopicExchange topicExchange) {
 return BindingBuilder.bind(queue).to(topicExchange).with(ROUTING_KEY);
}

@Bean
 public MessageConverter converter() {
 return new Jackson2JsonMessageConverter();
 }

@Bean
public AmqpTemplate template(ConnectionFactory connectionFactory) {
 final RabbitTemplate rabbitTemplate = new RabbitTemplate();
 rabbitTemplate.setConnectionFactory(connectionFactory);
 rabbitTemplate.setMessageConverter(converter());
 return rabbitTemplate;
}



If we use Spring Cloud Stream then the entire above work will be handled by Spring Cloud Stream. Let us check the sample project for that. Our POM.xml file looks like this, majorly we require Spring cloud Stream and Message Broker dependency (in this case it is Solace).


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.7</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.spring.cloud.stream.solace</groupId>
    <artifactId>solace-integration-spring-boot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>solace-integration-spring-boot</name>
    <description>Project for Spring Integration and Solace</description>
    <properties>
        <java.version>11</java.version>
        <solace-spring-cloud.version>2.1.0</solace-spring-cloud.version>
        <spring-cloud.version>2020.0.4</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.solace.spring.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-solace</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <scope>test</scope>
            <classifier>test-binder</classifier>
            <type>test-jar</type>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.solace.spring.cloud</groupId>
                <artifactId>solace-spring-cloud-bom</artifactId>
                <version>${solace-spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Our application.yml file looks like this:
server:
  port: 8080
spring:
  cloud:
    function:
      definition: processor;consumer
    stream:
      bindings:
        producer-out-0:
          destination: sample.queue.processor
          group: nonexclusive
        processor-in-0:
          destination: sample.queue.processor
          group: nonexclusive  
        processor-out-0:
          destination: sample.queue.consumer
          group: nonexclusive 
        consumer-in-0:
          destination: sample.queue.consumer
          group: nonexclusive   
        
      solace:
        bindings:
          consumer-in-0:
            consumer:
              provisionSubscriptionsToDurableQueue: false
              provisionDurableQueue: false
              queueNamePrefix: ""
              useFamiliarityInQueueName: false
              useDestinationEncodingInQueueName: false
              useGroupNameInQueueName: false
              content-type: "application/json"
          processor-in-0:
            consumer:
              provisionSubscriptionsToDurableQueue: false
              provisionDurableQueue: false
              queueNamePrefix: ""
              useFamiliarityInQueueName: false
              useDestinationEncodingInQueueName: false
              useGroupNameInQueueName: false
              content-type: "application/json" 
          producer-out-0:
            producer:
              provisionSubscriptionsToDurableQueue: false
              provisionDurableQueue: false
              queueNamePrefix: ""
              useFamiliarityInQueueName: false
              useDestinationEncodingInQueueName: false
              useGroupNameInQueueName: false
              content-type: "application/json"     
          processor-out-0:
            producer:
              provisionSubscriptionsToDurableQueue: false
              provisionDurableQueue: false
              queueNamePrefix: ""
              useFamiliarityInQueueName: false
              useDestinationEncodingInQueueName: false
              useGroupNameInQueueName: false
              content-type: "application/json"    
    
      binders:
        solace:
          type: solace
          environment:
            solace:
              java:
                host: tcps://mr-0hnblucrjm6.messaging.solace.cloud:55443 
                msgVpn: demoservice 
                clientUsername: solace-cloud-client 
                clientPassword: qtc6uqufdg5b990crlsibra09i 
                connectRetries: 3 
                connectRetriesPerHost: 0 
                reconnectRetries: 3 

Here point to be noted is while binding any binder we should use <binder_name>-in or out-index no., where in or out means flow from application to broker and index is used for the same binder. By using this we can bind any number of beans as we want.

Message Producer is looking like this:

@RestController
public class MessageProducer {
    
    @Autowired
    private StreamBridge streamBridge;
    
    @PostMapping("/msg")
    public ResponseEntity<String> sendMessage(@RequestBody String message){
        boolean send = streamBridge.send("producer-out-0", message);
        return ResponseEntity.ok("Message sent");
    }
}


Here StreamBridge will send the message to “producer-out-0” binding which is bound with queue sample.queue.processor refer application.properties.

Our Processor looks like this:

package com.spring.cloud.stream.solace.processor;
import java.util.function.Function;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageProcessor {

    @Bean
    public Function<String, String> processor() {
        return (msg) -> {
            System.out.println("Prining from Processor");
            return msg.toUpperCase()+" : Message is processed";
        };
    }
}

Here our processor will listen to sample.queue.processor and send the response to sample.queue.consumer after performing our business logic. 

Last but not least our consumer looks like this:

package com.spring.cloud.stream.solace.consumer;

import java.util.function.Consumer;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageConsumer {
    
    @Bean
    public Consumer<String> consumer() {
        return (msg) -> {
          System.out.println("Received the message in Consumer :"+msg);
        };
    }
}

It will listen to queue sample.queue.consumer for any message. 

In this, we haven't hardcode any configuration which is entirely handled by Spring Cloud Stream. When we require we can change the message broker by changing some properties in application.yml file.

Bullet Points: 

  • There is some issue in Solace while working with application.properties, so it is better to use application.yml instead.
  • There is no batch facility is provided by Solace, so if you required you need to implement it yourself.

If you think there are some mistakes or some errors in the above article, feel free to reach out to me.


Recent Posts

Accelerate Your Career with e2eInternship Program June 2023: Fast Track your Coding Skills!

Accelerate Your Career with e2eInternship Program June 2023: Fast Track your Coding Skills!

Secure Your Future with e2eHiring: Apply for the Ideal Internship Today

Secure Your Future with e2eHiring: Apply for the Ideal Internship Today

Upskilling: Key to Survival and Career Advancement amid Layoffs

Upskilling: Key to Survival and Career Advancement amid Layoffs

Decoding Dream Job: What makes it so difficult to get a dream job?

Decoding Dream Job: What makes it so difficult to get a dream job?

Improving Training Impact Through Effective Presentations

Improving Training Impact Through Effective Presentations

copycopycopycopy

Han Solo

Recent Posts

Accelerate Your Career with e2eInternship Program June 2023: Fast Track your Coding Skills!

Accelerate Your Career with e2eInternship Program June 2023: Fast Track your Coding Skills!

Secure Your Future with e2eHiring: Apply for the Ideal Internship Today

Secure Your Future with e2eHiring: Apply for the Ideal Internship Today

Upskilling: Key to Survival and Career Advancement amid Layoffs

Upskilling: Key to Survival and Career Advancement amid Layoffs

Decoding Dream Job: What makes it so difficult to get a dream job?

Decoding Dream Job: What makes it so difficult to get a dream job?

Improving Training Impact Through Effective Presentations

Improving Training Impact Through Effective Presentations