Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

With the introduction of Spring WebFlux, we got another powerful tool to write reactive, non-blocking applications. While using this technology is now way easier than before, debugging reactive sequences in Spring WebFlux can be quite cumbersome.

In this quick tutorial, we’ll see how to easily log events in asynchronous sequences and how to avoid some simple mistakes.

2. Maven Dependency

Let’s add the Spring WebFlux dependency to our project so we can create reactive streams:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

We can get the latest spring-boot-starter-webflux dependency from Maven Central.

3. Creating a Reactive Stream

To begin let’s create a reactive stream using Flux and use the log() method to enable logging:

Flux<Integer> reactiveStream = Flux.range(1, 5).log();

Next, we will subscribe to it to consume generated values:

reactiveStream.subscribe();

4. Logging Reactive Stream

After running the above application we see our logger in action:

2018-11-11 22:37:04 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:37:04 INFO | request(unbounded)
2018-11-11 22:37:04 INFO | onNext(1)
2018-11-11 22:37:04 INFO | onNext(2)
2018-11-11 22:37:04 INFO | onNext(3)
2018-11-11 22:37:04 INFO | onNext(4)
2018-11-11 22:37:04 INFO | onNext(5)
2018-11-11 22:37:04 INFO | onComplete()

We see every event that occurred on our stream. Five values were emitted and then stream closed with an onComplete() event.

5. Advanced Logging Scenario

We can modify our application to see a more interesting scenario. Let’s add take() to Flux which will instruct the stream to provide only a specific number of events:

Flux<Integer> reactiveStream = Flux.range(1, 5).log().take(3);

After executing the code, we’ll see the following output:

2018-11-11 22:45:35 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:45:35 INFO | request(unbounded)
2018-11-11 22:45:35 INFO | onNext(1)
2018-11-11 22:45:35 INFO | onNext(2)
2018-11-11 22:45:35 INFO | onNext(3)
2018-11-11 22:45:35 INFO | cancel()

As we can see, take() caused the stream to cancel after emitting three events.

The placement of log() in your stream is crucial. Let’s see how placing log() after take() will produce different output:

Flux<Integer> reactiveStream = Flux.range(1, 5).take(3).log();

And the output:

2018-11-11 22:49:23 INFO | onSubscribe([Fuseable] FluxTake.TakeFuseableSubscriber)
2018-11-11 22:49:23 INFO | request(unbounded)
2018-11-11 22:49:23 INFO | onNext(1)
2018-11-11 22:49:23 INFO | onNext(2)
2018-11-11 22:49:23 INFO | onNext(3)
2018-11-11 22:49:23 INFO | onComplete()

As we can see changing the point of observation changed the output. Now the stream produced three events, but instead of cancel(), we see onComplete(). This is because we observe the output of using take() instead of what was requested by this method.

6. Conclusion

In this quick article, we saw how to log reactive streams using built-in log() method.

And as always, the source code for the above example can be found over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – Junit (guide) (cat=Reactive)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.