Podstawowe funkcje WebFlux – tworzymy sekwencje, pierwszy Flux i Mono

30-09-2021

W dzisiejszym wpisie kontynuujemy tematykę, którą rozpoczęliśmy w materiale „Czym jest programowanie reaktywne?”. Dosyć szczegółowo wyjaśniliśmy wtedy ideę tytułowego programowania, opowiedzieliśmy czym jest Spring WebFlux, jak zintegrować go z bazą danych i co to jest R2DBC. Wyjaśniliśmy także takie pojęcia  jak WebClient, Publisher i Subscriber. W tym artykule więcej o Flux i Mono – podejmiemy pierwsze kroki w tworzeniu sekwencji za pomocą Project Reactor’a, biblioteki reaktywnej czwartej generacji, opartej na specyfikacji Reactive Streams.

Flux, asynchroniczna sekwencja (strumień) przetwarzająca 0-N elementów

Flux

Flux<T> to standardowy publisher, który przedstawia asynchroniczną sekwencję od 0 do N elementów. Finalizowana jest ona sygnałem zakończania, bądź błędu. Przedstawiają to trzy wywołania:

  • doOnNext -> dla sekwencji
  • doOnComplete -> dla zakończenia
  • doOnError -> dla błędu

Z tak dużym zakresem możliwości, Flux jest typem reaktywnym, ogólnego przeznaczenia. Warto zauważyć, że każdy typ zdarzenia jest opcjonalny, co sprawia, że brak któregokolwiek z nich nie przerywa działania strumienia. Zatem bez zdarzenia onNext, ale ze zdarzeniem onComplete, sekwencja będzie skończona. W układzie odwrotnym powstanie nam nieskończona sekwencja. Na przykład Flux.interval(Duration) utworzy Flux<Long>, który nieustannie będzie emitował  „tyknięcia” zegara.

Mono, asynchroniczny 0-1 wynik

Mono

Mono<T> to publisher, który zwraca jeden element poprzez sygnał onNext, następnie kończy sygnałem onComplete. Może też emitować pojedynczy sygnał onError.

Mono oferuje tylko podzbiór operatorów, które są dostępne dla Fluxa, a niektóre operatory (zwłaszcza te, które łączą Mono z innym publisherem) przełączają się na tego ostatniego. Na przykład, Mono.concatWith(Publisher) zwraca Flux, podczas gdy Mono.then(Mono) zwraca inne Mono.

Tworzymy pierwszego Fluxa i Mono

flux.just

Jednym z najprostszych sposobów na stworzenie Fluxa jest metoda Flux.just(). Możemy go także stworzyć za pomocą już istniejącej Listy. Mono posiada analogiczną metodę (możemy stworzyć też puste Mono).

Subskrypcja synchroniczna – przykład

Przykład dla metody subscribe pokazuje jeden ze sposobów, aby wartości się pojawiły za pomocą lambdy.

Mamy teraz dwa wyrażenia lambda: jedno dla zawartości, której oczekujemy i jedno dla błędów.

Generowanie sekwencji synchronicznej generate

sekwencja synchroniczna generate

Najprostszą formą tworzenia Fluxa jest metoda generate.

Dotyczy to emisji synchronicznych i jeden po drugim. Metodę można wywołać tylko raz na wywołanie zwrotne. Możemy wtedy dodatkowo wywołać error(Throwable)  lub  complete(), ale jest to opcjonalne.

Generowanie sekwencji asynchronicznej create

sekwencja synchroniczna create

Createto bardziej zaawansowana forma tworzenia Fluxa, która nadaje się do wielu emisji na rundę, nawet z wielu wątków.

W przeciwieństwie do generate, nie posiada ona wariantu opartego na stanie. Z drugiej strony może wywoływać zdarzenia wielowątkowe w wywołaniu zwrotnym. 

Create może być bardzo przydatne do połączenia istniejącego interfejsu API ze światem reaktywnym — na przykład asynchronicznego interfejsu API opartego na listenerach.