Kategorie
java

RxJava po prostu

RxJava to jedna z przełomowych bibliotek w Javie. Spopularyzowana mocno przez rozwiązania Netflix, urzekła wiele osób swoim podejściem do reaktywnego programowania. Pozwoliła na ucieczkę od problemu callback hell przez jasno zdefiniowany model obsługi strumieni zdarzeń. Temat niełatwy, ale wart zgłębienia.

Lecimy!

Junior Java Developer Handbook

Z newsletter o pytaniach o RxJava

Hey, chciałem podziękować wszystkim osobom, które odpowiedziały na maila newsletterowego z pytaniem, co Was interesuje w RxJava. Pytania zebrałem, a odpowiedzi znajdziecie w dalszej części artykułu. Ponieważ taka forma wpada po raz pierwszy na blog, to proszę o Wasze komentarze, czy Wam się podoba ❓ Mi się podoba bardzo, bo mogę odpowiedzieć dokładnie na Wasze potrzeby w danym temacie 😊

RxJava i ReactiveX po ludzku

Zacznijmy od tego, że RxJava jest implementacją ReactiveX, czyli rozszerzeń reaktywnych (ang. Reactive Extensions). Rozszerzenia reaktywne dostępne są dla wielu języków (np. RxJS dla Javascript).

Gdy wchodzimy na stronę reactive.io możemy przeczytać tam bardzo ładną definicję:

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

Źródło: reactive.io

Jak każda definicja brzmi mądrze. Ale, żeby coś z niej konkretnego wynieść, warto zastanowić się, co w rzeczywistości znaczą te słowa. Tu przyda się trochę praktyka, bo bez niej może być to nieco ciężkie. Rozłóżmy tę definicję na czynniki pierwsze:

  • library for composing, na początek mówi nam, że nie jest to jakaś platforma, czy produkt, ale po prostu biblioteka, której możemy użyć w kodzie do budowy naszych systemów.
  • asynchronous and event-based, dotyczy asynchronicznego zachowania programu, czyli nie czekamy na zakończenie wybranej operacji przy uwzględnieniu jednak, że pojawi się zdarzenie mówiące o jej wyniku.
  • by using observable sequences, gdzie od razu widać nawiązanie do wzorca obserwator (ang. observer pattern). Czyli my, jako obserwatorzy (ang. observer), będziemy powiadamiani w sekwencji o zmianach zachodzących u podmiotu obserwowanego (ang. observable).

Mów do mnie po javowsku

Zobrazujmy sobie tradycyjne zachowanie w świecie Java. Weźmy na przykład kolekcje. Gdy popatrzymy na SDK, możemy zauważyć, że wszystkie implementują interfejs Iterable. Weźmy na przykład pewną listę użytkowników:

List<User> users = userRepository.find();

Teraz zastanówmy się, w jaki sposób możemy z tej listy skorzystać. Oczywiście mamy kilka możliwości:

user.forEach(...)
for(User user : users) { ... }
users.stream() ...

Jednak te wszystkie konstrukcje mają ze sobą wspólny jeden element. My, jako klient danej kolekcji, pobieramy z niej kolejne elementy. Jest to model, który możemy nazwać modelem pull. Czyli jest to podobne np. do zwykłych, tradycyjnych żądań HTTP. Czyli takich, gdzie to my mówimy, że chcemy ściągnąć pewne dane. Wysyłamy żądanie i sobie czekamy, czekamy i czekamy … na odpowiedź. Jak widać synchronicznie (bo jesteśmy tradycjonalistami 😊).

RxJava odwraca tę relację. Mamy tu do czynienia z tzw. modelem push. Czyli to nie my bierzemy kolejne elementy z kolekcji. Jesteśmy natomiast powiadamiani o kolejnym elemencie w strumieniu danych. I co jest istotne w kontekście asynchroniczności, nie masz gwarancji, co do kolejności tego, w jakiej pojawią się te elementy w naszej sekwencji. Ot tyle.

I mam teraz nadzieję, że po przeczytaniu paru powyższych akapitów, koncepcja ReactiveX będzie dla Ciebie już zrozumiała.

Mała uwaga, jeśli po przeczytaniu tego wstępu cały czas jest to dla Ciebie niejasna koncepcja, to napisz proszę do mnie. Chętnie pomogą w rozwianiu wątpliwości.

Parę typów observable

Ok. Zacznijmy od tego, że typów obiektów observable jest kilka. Oczywiście na pierwszy plan ze względu na nazwę wychodzi Observable, ale jest ich więcej! Tutaj lista z opisem, czym się charakteryzują.

  • Flowable reprezentuje strumień 0 – N elementów, implementuje standard Reactive Streams i ma wsparcie dla mechanizmu backpressure (czyli wspiera strategie radzenia sobie z przypadkami dużo szybszych producentów zdarzeń niż możliwości konsumentów tych zdarzeń).
  • Observable reprezentuje strumień 0 – N elementów, przy czym nie ma tutaj automatycznego wsparcia dla mechanizmu backpressure.
  • Single reprezentuje strumień dokładnie z jednym elementem lub błędem.
  • Completable reprezentuje strumień bez elementów, który jest w stanie zakończonym lub błędzie.
  • Maybe reprezentuje strumień bez elementów lub z dokładnie jednym elementem lub błędem.

W dalszej części będziemy posługiwać się główne type Observable, żeby pokazać, jak ta RxJava w ogóle działa.

RxJava podstawy

Zacznijmy od czegoś prostego. Utworzymy sobie obiekt Observable, który będzie emitował nam dokładnie jeden element typu String. Do tego celu wykorzystamy metodę just(), w świecie ReactiveX nazywaną operatorem.

    Observable<String> just = Observable.just("Lorem ipsum dolor");

Fajnie, prosto, ale po uruchomieniu na konsoli nie zobaczymy zupełnie nic. Dlaczego tak się dzieje?

Otóż musimy powiedzieć, że nas jesteśmy zainteresowani tym, co dany Observable będzie emitował. Nazywamy to subskrypcją.

Mechanizm subskrypcji oznacza, że chcemy dostawać powiadomienia o zdarzeniach, czyli nowych elementach w strumieniu. Czyli, tak jak pisałem wcześniej, nie bierzemy sami kolejnych elementów, a czekamy na informację o nich.

I takiej subskrypcji dokonujemy w taki sposób:

    Observable<String> just = Observable.just("Lorem ipsum dolor");
    just.subscribe(text -> log.info(text));

Nie mamy tutaj nic asynchronicznego. Nie dotykamy wątków, poza wątkiem, w którym wywoływany jest ten kod (co widać poniżej, wątek main). Po uruchomieniu tego kodu na konsoli powinniśmy dostać natychmiast mniej więcej taki efekt:

12:45:47.330 [main] INFO pl.itbrains.blog.samples.rxjava3.RxJava3Test - Lorem ipsum dolor

Gdybyśmy chcieli mieć źródło emitujące więcej wartości, to możemy zrobić to w ten sposób:

    Observable<String> just = Observable.just(
        "Lorem ipsum dolor 1",
        "Lorem ipsum dolor 2",
        "Lorem ipsum dolor 3");
    just.subscribe(log::info);

Czyli tym razem emitujemy trzy wartości. Przy okazji użyliśmy referencji do metody info. Efekt na konsoli:

13:45:24.549 [main] INFO pl.itbrains.blog.samples.rxjava3.RxJava3Test - Lorem ipsum dolor 1
13:45:24.562 [main] INFO pl.itbrains.blog.samples.rxjava3.RxJava3Test - Lorem ipsum dolor 2
13:45:24.562 [main] INFO pl.itbrains.blog.samples.rxjava3.RxJava3Test - Lorem ipsum dolor 3

Operator Observable.just(…) mamy przeciążony. Możemy do niej przekazać od 1 do 10 parametrów. I tyle też zdarzeń w efekcie zostanie wyemitowanych przez strumień.

Gdybyśmy chcieli mieć Observable, który nie emituje elementów, to możemy skorzystać z Observable.empty().

Warto tutaj zauważyć, że choć wszędzie w definicjacjach RxJavy pojawia się naturalnie słowo asynchronous, to RxJava by default jest jednowątkowa. A to oznacza tyle, że dopóki sami tego nie określimy, dopóty tej asynchroniczności nie dostaniemy.

Operatory

Podstawową koncepcją w ReactiveX są operatory. Czyli metody, które pozwalają nam działać na strumieniach. Możemy wyróżnić kilka grup takich operatorów:

  • Tworzące nowe obiekty observable, np. create, defer, from, interval, just.
  • Transformujące obiekty emitowane przez observable, np. map, flatMap.
  • Filtrujące, np. destinct, filter, first, skip.
  • Łączące, np. combineLatest, merge, switch, zip.
  • Obsługujące sytuacje wyjątkowe: catch i retry.
  • Wspomagające, np. delay, subscribeOn, subscribe, observeOn, timout.
  • Warunkowe, np. contains, skipUntil, shipWhile, takeUntil, takeWhile.
  • Matematyczne, np. avarage, count, reduce, sum.
  • Wspierająca strategie backpressure, np. onBackpressureBuffer.
  • Pomagające modelach podłączania, np. replay.
  • Konwertujące: to.

Jak widać samych operatorów jest dużo. A wymienionych została tylko część z nich. Widać też, że API dostarczane nam przez bibliotekę jest bardzo bogate. A to sprawia, że nasze możliwości rosną.

Przegląd podstawowych operatorów

Na pierwszy rzut oka widać, że API RxJavy jest trochę podobne do API Streams w Javie:

    Observable
        .range(1, 10)
        .filter(i -> i % 2 == 0)
        .map(i -> i * 10)
        .subscribe(i -> log.info(i.toString()));

Co tutaj zrobiliśmy. Przede wszystkim utworzyliśmy sobie observable poprzez operator range, który emituje 10 liczb począwszy od 1. Następnie filtrujemy filter taki strumień i bierzemy tylko liczby, które są parzyste. A dalej przekształcamy map każdą z tych parzystych liczb tak, żeby w strumieniu pojawiła się liczb pomnożona przez 10. Na końcu dokonujemy subskrypcji i na konsoli powinniśmy otrzymać wynik podobny do tego:

14:22:28.069 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 20
14:22:28.071 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 40
14:22:28.072 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 60
14:22:28.072 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 80
14:22:28.072 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 100

Zobaczmy teraz, jak observable mogą ze sobą współpracować. Utwórzmy dwa:

    String[] alphabet = {"A", "B", "C", "D"};
    Observable<String> letters = Observable
        .fromArray(alphabet);
    Observable<Integer> numbers = Observable
        .range(1, 6);

I sprawdźmy działanie operatora zipWith:

    letters.zipWith(numbers, (letter, number) -> letter + number.toString())
        .subscribe(text -> log.info(text));

Pierwsze, co możemy zauważyć, to to, że oba strumienie emitują skończoną liczbę elementów i że ich liczba będzie różna. Jak się zachowa operator zipWith? Otóż w momencie otrzymania informacji o zakończeniu strumienia letters zakończy on też nasz główny strumień. Sam operator zipWith łączy w pary elementy z obu strumieni. Stąd przekazana do niego lambda (letter, number) -> …, która mówi, co zrobić z tym połączeniem. Tutaj po prostu skleić jako napis. Efekt na konsoli:

14:50:20.623 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - A1
14:50:20.624 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - B2
14:50:20.624 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - C3
14:50:20.624 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - D4

Możemy również połączyć strumienie (przy okazji przekształcając jeszcze liczby w tekst):

    letters.mergeWith(
        numbers.map(Object::toString)
    ).subscribe(text -> log.info(text));

I dostajemy:

15:01:41.906 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - A
15:01:41.906 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - B
15:01:41.906 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - C
15:01:41.906 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - D
15:01:41.906 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 1
15:01:41.906 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 2
15:01:41.906 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 3
15:01:41.906 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 4
15:01:41.906 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 5
15:01:41.906 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 6

Własny Observable w RxJava

Możemy również utworzyć własny observable. Niech będzie to przykład, w którym chcielibyśmy emitować liczby od 100 do 200, przy czym chcielibyśmy, żeby ta emisja następowała, co 100 ms. Kod wyglądałby następująco.

    Observable<Integer> myObservable = Observable.create(emitter -> {
      AtomicInteger counter = new AtomicInteger(100);
      for (;;) {
        int current = counter.getAndIncrement();
        if (current > 200)
          emitter.onComplete();
        else {
          emitter.onNext(current);
          sleep(Duration.ofMillis(100));
        }
      }
    });

    myObservable
        .map(Object::toString)
        .subscribe(text -> log.info(text));

I otrzymalibyśmy taki log zgodnie z oczekiwaniami:

15:19:17.865 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 100
15:19:17.967 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 101
15:19:18.068 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 102
15:19:18.169 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 103
15:19:18.270 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 104
...
15:19:27.732 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 198
15:19:27.832 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 199
15:19:27.933 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - 200

Przykładowy case

Zróbmy więc coś asynchronicznie. Założmy przez chwilę, że mamy usługę, która pobiera nam dane o cenach akcji.

Cena reprezentowana jest przez klasę:

@Data
@Builder
public class StockPrice {
  private String code;
  private int price;

  @Override
  public String toString() {
    return "StockPrice{" +
        "code='" + code + '\'' +
        ", price=" + price +
        '}';
  }
}

A nasz usługa:

  public StockPrice traditionalStockPrice(String code) {
    log.info("Getting stock price for {}", code);
    sleep(Duration.ofSeconds(1));
    return StockPrice.builder()
        .code(code)
        .price(397)
        .build();
  }

Nic wyszukanego. Tradycyjne symulowanie wywołania zdalnej usługi. Sleep opóźnia powrót z metody o jedną sekundę.

Chcielibyśmy jednak skorzystać z RxJava i mieć możliwość bardziej przejrzystego zarządzania asynchronicznością operacji. Przy okazji zamiast z Observable, tak czysto dla odmiany, skorzystamy z Flowable. Przy tym przykładzie nie będzie widać żadnych różnic, a zmiana ma charakter czysto przeglądu API.

Nasza metoda w wersji z RxJava byłaby następująca:

  public Flowable<StockPrice> rxStockPrice(String code) {
    return Flowable
        .fromCallable(() -> traditionalStockPrice(code));
  }

Co tu się dzieje. Przede wszystkim widzimy, że zwracamy observable typu Flowable, który utworzony jest poprzez operator fromCallable. Dla przypomnienia, Callable jest typem podobnym do Runnable. I reprezentuje operację, która może być wykonana na innym niż bieżący wątek. W przeciwieństwie do Runnable, Callable zwraca wynik lub wyjątek.

Czyli w tym przykładzie mówimy w ten sposób. Chcę utworzyć strumień Flowable, który wyemituje jeden element. Ważnym jest fakt, że element ten będzie rezultatem operacji potencjalnie asynchronicznej (stąd lambda implementująca interfejs funkcyjny Callable – wywołuje się leniwie).

Nie jest to najprostsze zagadnienie, więc nie martw się, jeśli coś jest teraz niejasne. Przyjrzymy się przykładowi i powinno się wyjaśnić.

Chcielibyśmy pobrać równocześnie ceny akcji Amazon i Netflix. Następnie zrobić z nich strumień. Wiemy, że opóźnienie operacji pobierania akcji ustaliliśmy na 1 sekundę. Spodziewamy się więc, że po sekundzie otrzymamy wyniki.

    Flowable<StockPrice> amzn = rxStockPrice("AMZN");
    Flowable<StockPrice> nflx = rxStockPrice("NFLX");

    amzn.mergeWith(nflx)
        .subscribe(sp -> log.info("Stock {} with price: {}", sp.getCode(), sp.getPrice()));

I ku naszemu zaskoczeniu tak się nie dzieje. Bo widzimy, że kod wykonuje się cały czas synchronicznie w głównym wątku programu:

15:37:54.776 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - Getting stock price for AMZN
15:37:55.779 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - Stock AMZN with price: 397
15:37:55.779 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - Getting stock price for NFLX
15:37:56.780 [main] INFO pl.itbrains.samples.rxjava3.RxJava3Test - Stock NFLX with price: 397

Nie 1 sekunda, a 2 sekundy z małym hakiem. Nie równolegle, a jedna operacja występuje po drugie. No … nie o to nam chodziło. Być może trzeba coś zmienić.

Wprowadźmy nieco asynchroniczności

Pamiętasz, jak pisałem, że musimy sami ustawić, żeby RxJava działała w trybie asynchronicznym. To właśnie jest ten moment.

Bez wchodzenia w szczegóły, gdy w RxJava chcemy przetwarzać operacje w innych wątkach, to musimy skorzystać ze schedulera. To nic innego jak pula wątków. Możemy sobie utworzyć go np. w ten sposób:

private Scheduler scheduler = scheduler(2);

gdzie metoda scheduler wygląda następująco:

  Scheduler scheduler(int n) {
    return Schedulers.from(Executors.newFixedThreadPool(n));
  }

W tym momencie jesteśmy w stanie zmienić implementację rxStockPrice() z uwzględnieniem wykorzystania schedulera. W tym celu korzystamy z operatora subscribeOn, który pozwala nam określić scheduler, na którym będzie działał nasz observable. W rezultacie otrzymamy taki kawałek kodu:

  public Flowable<StockPrice> rxStockPrice(String code) {
    return Flowable
        .fromCallable(() -> traditionalStockPrice(code))
        .subscribeOn(scheduler);
  }

Gdy to mamy i ponownie uruchomimy nasz przykład, wynik będzie dokładnie odzwierciedlał nasze oczekiwania:

    Flowable<StockPrice> amzn = rxStockPrice("AMZN");
    Flowable<StockPrice> nflx = rxStockPrice("NFLX");

    Flowable.merge(amzn, nflx)
        .subscribe(sp -> log.info(sp.toString()));

    sleep(Duration.ofSeconds(5));
15:55:39.833 [pool-1-thread-1] INFO pl.itbrains.samples.rxjava3.RxJava3Test - Getting stock price for AMZN
15:55:39.833 [pool-1-thread-2] INFO pl.itbrains.samples.rxjava3.RxJava3Test - Getting stock price for NFLX
15:55:40.857 [pool-1-thread-2] INFO pl.itbrains.samples.rxjava3.RxJava3Test - StockPrice{code='NFLX', price=397}
15:55:40.857 [pool-1-thread-2] INFO pl.itbrains.samples.rxjava3.RxJava3Test - StockPrice{code='AMZN', price=397}

Jak widać operacje pobierania ceny akcji odpaliły się równolegle na różnych wątkach i po sekundzie z groszami strumień się zakończył emitując oba wyniki. Dokładnie o to nam chodziło 😊

Czy zauważyłaś lub zauważyłeś metodę sleep na końcu?
Ćwiczenie dla Ciebie – po co się ona tam znalazła?
W razie wątpliwości nasz do mnie na maila. Odpowiadam na wszystkie.

O testowaniu

O ile testowanie w pierwszej wersji RxJavy było dosyć uciążliwe, to od wersji 2 jest ono dużo prostsze. Mamy wbudowany TestObserver. A korzystać z niego można w ten sposób:

  @Test
  public void testTest() {
    Observable
        .just("A", "B", "C")
        .test()
        .assertValueCount(3)
        .assertValueAt(2, "C");
  }

Dzięki takiej konstrukcji możemy sprawnie przetestować nasze Observable.

Odpowiedzi na pytania czytelników z newslettera

Pytanie Moniki: Parę razy słyszałam już o RxJava. Czy w normalnej, domowej aplikacji jest mi to potrzebne?

Odpowiedź: Nie. Również i w większych projektach korporacyjnych, które nie wymagają dużego obciążenia RxJava nie jest zwykle obecna. Przydaje się ona, gdy zaczynamy widzieć drivery wydajnościowe, tam gdzie chcemy zapewnić maksymalną wydajność z poszczególnych maszyn. Tradycyjny model w Javie tj. wątek per żądanie tego nie daje.

Pytanie Eweliny, Tomka i Łukasza: Jaki jest próg wejścia do RxJavy?

Odpowiedź: Na pewno nie jest niski. Jest raczej wysoki. Trzeba zrozumieć modele zarządzania wątkami i poznać dobrze operatory, żeby programować efektywnie.

Pytanie Kasi i Eryka: Czym różni się RxJava od Reactora i kiedy, którego używać?

Odpowiedź: Obie biblioteki w zasadzie rozwiązują ten sam problem, a ich w gruncie rzeczy jest całkiem podobne. RxJava występuje w wielu projektach, bo jest starsza – także w takim przypadku należy korzystać z RxJavy. Z kolei dla nowych aplikacji w Springu naturalne będzie stosowanie Reactora. Przy programowaniu na Androida naturalnym wyborem będzie RxJava.

Pytanie Krzyśka: Czy lepiej uczyć się RxJavy, czy Reactora?

Odpowiedź: Odpowiem tak, gdy nauczysz się jednego to bez problemu będziesz rozumiał również drugi. Obecnie trendy jest uczenie się Spring Boota. Ja bym w takim razie wybrał Spring Webflux zamiast Spring MVC, tak żeby naukę obu połączyć. Choć nie musi być to na dzień dobry najprostsze.

RxJava podsumowanie

RxJava nie jest tematem prostym. Nie jedna osoba zjadła sobie na niej zęby. Do jej poznania kluczowe moim zdaniem jest podejście małymi kroczkami. Wprowadzanie powoli poszczególnych elementów. Tak, żeby na raz materiał tej biblioteki nas nie przytłaczał.

Na zakończenie, dla Ciebie do zapamiętania bez wątpienia będzie, że RxJava:

  • Jest bardzo efektywną abstrakcją do zarządzania strumieniami zdarzeń.
  • Działa w jednym wątku by default.
  • Aby działać w niej wielowątkowo, asynchronicznie, musimy sami to zadeklarować.
  • Jej bogate API stanowi ciekawą alternatywę dla API Stream w JDK.
  • Jeśli programujesz frontend to np. Angular uwielbia RxJS, czyli ReactiveX dla Javascript. Także ucząc się ReactiveX spotkasz te same mechanizmy w wielu językach programowania.

Daj znać, jak Tobie się pracuje z RxJava 😊❗❗❗

5 1 vote
Article Rating
Subscribe
Powiadom o
guest
2 komentarzy
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Entuzjasta
Entuzjasta
17 dni temu

Cześć. Fajny artykuł 😉 Od dłuższego czasu przymierzam się do poznania, nazwijmy to, paradygmatu reaktywnego. Interesuje mnie to w szczególności w kwestii tworzenia restowego api w oparciu o Springa, więc tam może akurat RxJavy nie będzie, ale właśnie Webflux i te sprawy 🙂
Tak czy siak, nie mogę za bardzo znaleźć ciekawych, rzetelnych źródeł z przykładami takich aplikacji, ale chciałbym też poczytać ogólnie o reactive i jego użyciu. Masz może jakieś ciekawe źródła do polecenia – albo jeszcze lepiej – książkę?
Pozdrawiam 🙂