
Before diving into the reactive concept lets us assume a layman example, where you went to a restaurant for your favorite dish and you see there a long queue for order because what restaurant guys are doing they take one order preparing it then serving it to the customer. This means at a time they are working on a single order. What do you feel in that case? The same thing happens when hitting a request to your application and processes that request first then accepts another request.
The solution is as you know already what restaurant guys will do they take one order pass the information to the cooking team, meanwhile, they took another order and tell their cooking team hey! let us know when you prepare it. We have another order to prepare. Inside our application, the same thing is achieved by reactive programming where it promotes an asynchronous and non-blocking process. It assigns a task to some thread and doesn't wait for its completion and simply moves on to the other task and when that thread finishes the task it informs the main thread.
If you want to implement this paradigm then we have a framework which is “WebFlux”, which is built by using https://projectreactor.io/ . Generally, it used a Netty server. Now let us check the internal processing of WebFlux. It works on Publisher and Subscriber model. Let us have a look at how they communicate:
Step 1: Subscriber call subscriber method on the publisher.
Step 2: Publisher acknowledges the Subscription of that subscriber.
Step 3: Subscriber sends a request for data.
Step 4: The publisher sends the data to the subscriber by calling onNext() method to n times which indicates the number of entries/objects.
Step 5: After successful completion of publishing it call onComplete() or send onError() if an error occur.
There is two types of the publisher are used by WebFlux which are as below:
Mono: It is used for handling 0 to 1 results.
Flux: It is used for handling 0 to N results.
Now let us look at the example of Flux, to use WebFlux we will few dependencies one is for Reactor and webFlux each:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
Lets us now run a test case to see the working of WebFlux:
@Test
public void testFlux(){
Flux<String> fluxString = Flux.just("Foo", "Bar", "FooBar", "BarFoo").log();
fluxString.subscribe(System.out::println,(e)->System.out.println(e.getMessage()));
}
Here we are using .log to see the log of its working and see the logs:
12:43:40.467 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
12:43:40.476 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
12:43:40.478 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
12:43:40.478 [main] INFO reactor.Flux.Array.1 - | onNext(Foo)
Foo
12:43:40.478 [main] INFO reactor.Flux.Array.1 - | onNext(Bar)
Bar
12:43:40.478 [main] INFO reactor.Flux.Array.1 - | onNext(FooBar)
FooBar
12:43:40.478 [main] INFO reactor.Flux.Array.1 - | onNext(BarFoo)
BarFoo
12:43:40.478 [main] INFO reactor.Flux.Array.1 - | onComplete()
After checking these logs we can clearly get to know how it works internally. Now its time to check what we discussed earlier in this article about its serving terminology. For that let us create and controller :
@RestController
@RequestMapping("/student")
public class StudentController {
@Autowired
private StudentService service;
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Student> getAllStudentsWebFlux() {
return service.getAllStudentsWebFlux();
}
}
The variable produced will pass the result as a stream. Now let's create its service class:
@Service
public class StudentService {
@Autowired
private StudentDao StudentDao;
public Flux<Student> getAllStudentsWebFlux() {
Flux<Student> students = StudentDao.getStudentsWebFlux();
return students;
}
}
Dao class will look like this:
@Component
public class StudentDao {
private static void sleepExecution(int i){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Flux<Student> getStudentsWebFlux() {
return Flux.range(1,30)
.delayElements(Duration.ofSeconds(1))
.doOnNext(i -> System.out.println("count: " + i))
.map(i -> new Student(i, "name" + i, "city"+i));
}
}
Here we have promoted our thread to sleep in each process to get more understanding in this. After this what we need to do apart from this is to add a router and Handler to get WebFlux benefit:
@Configuration
public class RouterConfig {
@Autowired
private StudentStreamHandler streamHandler;
@Bean
public RouterFunction<ServerResponse> routerFunction(){
return RouterFunctions.route()
.GET("/router/customers/stream",streamHandler::getStudents)
.build();
}
}
@Service
public class StudentStreamHandler {
@Autowired
private StudentDao studentDao;
public Mono<ServerResponse> getCustomers(ServerRequest request) {
Flux<Student> studentMono = studentDao.getStudentsWebFlux();
return ServerResponse.ok().
contentType(MediaType.TEXT_EVENT_STREAM)
.body(studentMono, Student.class);
}
}
And last, our Student entity looks like this:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Student {
private int id;
private String name;
private String city;
}
Now we are ready to run the application and see the magic of reactive programming. So when we hit the endpoint “student”, what we will see is that we will get the output in our browser as like we are streaming it from the application rather than getting all data in one shot. And we don't need to not wait for the application to complete its process. This is the magic of Reactive Programming.
Please comment in case of any mistake in this article.