In today’s blog post, we continue the topic we started in the entry „What is reactive programming?”. We explained the idea of reactive programming in detail. What Spring WebFlux is. How to integrate it with the database and what is R2DBC. We also explained such concepts as WebClient, Publisher, and Subscriber. In this article – more about Flux and Mono – we’ll take the first steps in creating sequences using Project Reactor, a fourth-generation reactive library based on the Reactive Streams specification.

Flux is an asynchronous sequence (stream) 

that processes 0-N components

Flux<T>

Flux<T> is a standard publisher which shows an asynchronous sequence from 0 to N elements. It is finalized by a completion or error signal. They show that by three calls:

  • doOnNext -> for the sequence
  • doOnComplete -> for completion
  • doOnError -> for an error

With such a comprehensive range of possibilities, Flux is a reactive general-purpose type. It’s worth noting that each event type is optional, which means that the absence of any of them doesn’t interrupt Flux. Without the onNext event, but with the onComplete event – the sequence will be finished. The reverse arrangement will give us an infinite sequence. For example, Flux.interval(Duration) will create a Flux<Long> that continuously emits clock „ticks”.

Mono, asynchronous 0-1 result

Mono<T>

Mono<T> is a publisher that returns one item via the onNext signal, then terminates with the onComplete signal. It can also emit a single onError signal.

Mono offers only a subset of the operators available to Flux. Some operators (especially those that combine Mono with another publisher) switch to the latter. For example, Mono.concatWith(Publisher) returns Flux, while Mono.then(Mono) returns another Mono.

We create the first Flux and Mono

Mono

One of the easiest ways to create a Flux is with the Flux.just() method. We can also create it using an already existing List. Mono has an analogous method (we can also create an empty Mono).

Synchronous subscription – example

The example for the subscribe method shows one way to make values appear using a lambda.

We now have two lambda expressions: one for the content we expect, and one for errors.

Generating a synchronous generate sequence

Flux-generate

The simplest form of Flux creation is the generate method.

This includes synchronous and one-by-one emissions. The method can be called only once per callback. We can then additionally call error(Throwable) or complete(), but this is optional.

Generating an asynchronous create sequence

Flux-create

Create is a more advanced form of Flux creation that is suitable for multiple issues per round, even from multiple threads.

Unlike generate, it does not have a state-based variant. On the other hand, it can call multithreaded events in the callback. 

Create can be very useful for connecting an existing API to the reactive world – for example, an asynchronous listener-based API.

Image

Launch your rocket!

Leave your email - we'll get back to you as soon as possible!