RabbitMQ w parze z Javą, czyli jak działa popularny broker wiadomości

W życiu każdego programisty pracującego w zespole, który wytwarza oprogramowanie oparte o systemy, wymieniające dane między sobą, przychodzi taki moment, że zwykła komunikacja oparta o RESTful API nie wystarcza. No bo jak wygląda odpytywanie endpointu co sekundę, czy status produktu się zmienił? Albo wysyłanie zapytania do serwera wiadomości co 10 sekund, czy przyszła nowa korespondencja. Trudno byłoby też wysyłać swoją lokalizację co 30 sekund korzystając w protokołu http. No nie… takie rozwiązania w ogóle się nie sprawdzają w środowiskach produkcyjnych, szczególnie gdy dołożymy do tego efekt skali w postaci jednocześnie działających użytkowników. Na rozwiązanie tego problemu, z pomocą przychodzi nam pojęcie komunikacji asynchronicznej.


Adam Włodarczyk. Właściciel firmy Hindbrain tworzącej oprogramowanie na urządzenia mobilne oraz systemy CRM w różnych technologiach. Jego ulubionymi językami są Java i Objective-C, chociaż próbuje też sił w językach takich jak Kotlin czy Swift. Zdarza się, że nawet czasem pokoduje w PHP’ie. Żeby dać ujście ekstrawertycznej części swojej osobowości, uczy i jest mentorem kolejnego pokolenia programistów w Coderslab. Po godzinach stara się dbać o kręgosłup pływając i ćwicząc na siłowni.


Czym ta komunikacja asynchroniczna jest? Koncepcja tego rodzaju komunikacji jest dość prosta do wyjaśnienia. Polega ona na wysyłaniu danych przez nadawcę wiadomości bez konieczności przesłania informacji zwrotnej, co oznacza, że po drugiej stronie nie musi być odbiorcy. Taki rodzaj komunikacji nie wymaga współistnienia rozmawiających ze sobą procesów (bytów). Komunikaty produkowane przez nadawcę, są wysyłane w eter i buforowane w jakimś miejscu (agregatorze), a stamtąd dopiero odbiorca, w odpowiednim dla niego momencie odczytuje dane, które na niego czekają.

Koncepcja ta zrodziła się już dość dawno, bo w latach 90 dwudziestego wieku, i możemy ją znaleźć w wielu miejscach, nawet w codziennym życiu. Najprostszym przykładem do zobrazowania tego rodzaju komunikacji jest wysyłanie wiadomości SMS. Osoba wysyłająca wiadomość SMS nie wie czy jej odbiorca tę wiadomość otrzymał, a tym bardziej czy ją odczytał (nie mówię tu oczywiście o wiadomościach typu iMessage czy wysyłanych przez WhatsApp’a). Natomiast odbiorca, po zalogowaniu się do sieci ma możliwość odebrania wszystkich zbuforowanych wiadomości w dogodnym dla niego momencie.

Analizując przykład z wiadomościami SMS, można wyciągnąć pewne wnioski. Do zaimplementowania jakiejś formy komunikacji asynchronicznej pomiędzy elementami naszego systemu, powinniśmy posiadać jakiś bufor. Tylko jaki? Czym on powinien się charakteryzować, jakie założenia spełniać?

Powinien mieć możliwość buforowania wiadomości wysyłanych przez jedną ze stron oraz możliwość odbierania tych danych przez inną. Powinien również wykorzystywać inny kanał przesyłu danych (protokół) niż http. Powinien mieć możliwość kategoryzowania danych wysyłanych przez nadawcę i możliwość odbierania tych danych przez odbiorcę na podstawie jakiegoś klucza.

Takim buforem spełniającym nasze potrzeby, może być jeden z bardziej popularnych brokerów wiadomości — RabbitMQ. Według danych, które można znaleźć na stronie projektu, wdrożono już ponad 35 tys. instancji tego open source’owego rozwiązania w startupach oraz dużych korporacjach.

W brokerze wiadomości mamy dwóch aktorów. Pierwszym z nich jest tzw. publisher, czyli ta strona komunikacji, która jest wytwórcą. Generuje dane i wysyła je do brokera. Drugim aktorem natomiast jest subscriber, czyli klient, który subskrybuje się do danego kanału komunikacji i nasłuchuje czy broker posiada jakieś nowe opublikowane dane.

Rysunek 1. Schemat działania brokera wiadomości

Dodatkowo, RabbitMQ daje możliwość tworzenia wielu kanałów (kolejek). Co bardzo ułatwia zarządzanie dystrybucją danych (po pierwsze pod kątem podziału danych względem kontekstu, po drugie podziału danych pod kątem subskrybentów, którzy powinni otrzymać dane). Żeby przybliżyć bardziej działanie tego brokera, jako poligon przyjąłem sobie, że będę tworzył system, w którym danymi będą wymieniały się:

1. Backend RESTful API napisany w Javie, wykorzystując Spring Boot.

2. Aplikację mobilną napisaną w Javie, wykorzystującą Android API.

Przyjąłem dwa przypadki, na podstawie których postaram się pokazać, jak powyższy broker wykorzystać, by zaczął z nami współpracować i żebyśmy mieli z niego pożytek. Pierwszym scenariuszem, w którym postanowiłem wykorzystać komunikację asynchroniczną, jest sytuacja kiedy to smartfon (a raczej jego właściciel wraz z nim) zmienia swoją pozycję geograficzną i backend musi to wiedzieć. Jasne moglibyśmy wykorzystać do tego zwykły endpoint http i metodą POST wysyłać dane, gdzie naszym payloadem byłyby: pozycja geograficzna oraz id użytkownika, który się porusza. Po pierwsze takie rozwiązanie obciąża protokół http, a po drugie możemy sobie zafundować atak DDoS przy rosnącej liczbie aktywnych użytkowników w naszym systemie. No mało to wydajne, a tym bardziej bezpieczne rozwiązanie.

Dodam jeszcze, że jeden tylko użytkownik, w zależności od tego, jak skonfigurujemy Service GPS po stronie aplikacji mobilnej, może wysyłać 100, a nawet więcej requestów
z aktualizacją pozycji w ciągu jednej minuty. Wystarczy to przemnożyć przez potencjalną liczbę jednocześnie aktywnych userów (przyjmijmy sobie ok. 1000 osób), oznacza to ok. 100 tys. requestów na minutę, z samą lokalizacją, czyli… baaardzo dużo. Osobiście nie chciałbym, żeby system „padł” z powodu DDoS’a typu „friendly fire”.

Ok, więc wiem już, że chciałbym wykorzystać broker wiadomości do wysyłania i aktualizacji pozycji użytkownika w systemie. Teraz jak to zrobić? Czas wykorzystać wiedzę teoretyczną i przerodzić ją w praktycznie działającą implementację.

Przykład wykorzystania

W celu skrócenia tego artykułu założyłem, że mamy już dostęp do działającej instancji brokera wiadomości na serwerze. RabbitMQ powinien mieć otwarte porty 15672 oraz 5672 na serwerze gdzie się znajduje, żebyśmy mogli bez większych przeszkód korzystać z niego na zewnątrz.

Rysunek 2. Konfiguracja portów RabbitMQ

Następnie serwer RabbitMQ powinien mieć utworzoną kolejkę (można ją stworzyć w Wizardzie w Web Menadżerze naszej instancji).

Rysunek 3. Lista kolejek

Dodatkowo nasza kolejka musi być spięta (posiadać bindowanie) do kanału wymiany (to właśnie tam, jest pierwsze miejsce styku wiadomości od nadawcy z brokerem wiadomości). Kanał wymiany decyduje później na podstawie klucza routingowego, do jakiej kolejki ma być skierowana wiadomość.

Rysunek 4. Przepływ wiadomości w brokerze

Tak skonfigurowany broker powinien pozwolić na swobodne przekazywanie wiadomości z pozycją użytkownika z aplikacji mobilnej do serwera. Zacznijmy od strony aplikacji mobilnej, czyli Android API. W tym scenariuszu to aplikacja mobilna będzie dla Rabbit’a publisherem, a backend subscriberem.

Do poprawnego działania komunikacji z RabbitMQ będziemy oczywiście potrzebowali dołączyć odpowiednią zależność, w swoich projektach androidowych do zarządzania zależnościami wykorzystuję gradle’a. Poniżej to czego potrzebuję.

Najpierw muszę stworzyć Service, który dostarczy mi dane dla brokera, czyli tworzę Location Update Service.

Dzięki tej klasie będę w stanie wysłać aktualną pozycję użytkownika korzystającego w aplikacji. Następnym krokiem jest stworzenie klasy, tworzącej połączenie z brokerem wiadomości, spreparowanie jej w odpowiedni sposób (tak by można było wysłać ją do brokera) oraz spełniającą sam proces wysyłki danych.

Zanim jednak stworzę klasę, potrzebuję jeszcze mieć konfigurację niezbędną do stworzenia połączenia. Przechowuję ją, w pliku z properties’ami (w celu łatwiejszego zarządzania ustawieniami połączenia).

Sama klasa wykorzystywana do komunikacji z Rabbit’em.

W tym momencie już nasz publisher (czyli aplikacja mobilna) wysyła do brokera dane, które się tam buforują i czekają na subscribera. Gdy tylko taki się pojawi, dane zostaną mu przekazane i bufor zostanie oczyszczony.  

Przyszedł czas na implementację subskrybenta w Springu. Muszę najpierw uzupełnić zestaw zależności odpowiedzialny za korzystanie z RabbitMQ.

W application.properties tworzę odpowiednie zmienne dla definiowania nazw kolejki:

Dodatkowo należy skonfigurować połączenie z naszym brokerem wiadomości. Można to zrobić
w application.properties:

Muszę stworzyć Listener, który będzie subskrybował się do odpowiedniej kolejki w brokerze wiadomości.

Dzięki temu listenerowi, który subskrybuje się na utworzoną wcześniej kolejkę, do której publikuje aplikacja mobilna, jestem w stanie w zasadzie w sposób ciągły aktualizować pozycję użytkownika w systemie. Wykorzystuję do tego odrębny protokół (Advanced Message Queuing Protocol), co pozwala w niezakłócony sposób przesyłać wiadomości za pomocą brokera oraz jednocześnie odciążyć protokół http, wykorzystywany do komunikacji RESTful API.

Drugim przypadkiem użycia będzie sytuacja, kiedy to serwer (Spring Boot) będzie nadawcą wiadomości, a aplikacja mobilna będzie subskrybowała się na konkretną kolejkę. Wyobraźmy sobie sytuację, kiedy klient kupuje towar i oczekujemy na zapłatę z zewnętrznego procesora płatniczego. Po uzyskaniu wiadomości z procesora o prawidłowo dokonanej płatności można rozpocząć wysyłkę danych do aplikacji mobilnej, celem zakończenia procesu zakupu.

W tym celu po stronie backendu potrzebuję stworzyć Beany dla kanału wymiany, klucza routingowego oraz połączenia z Rabbitem. Ale zanim Beany, muszę mieć jeszcze konfigurację dotyczącą nazwy kolejki oraz kanału komunikacji zapisaną w appliaction.properties (dla ułatwienia konfiguracji).

A Beany tworzę w klasie konfiguracyjnej Springa.

Dodatkowo w miejscu gdzie otrzymuję informacje o pozytywnie wykonanej płatności, mogę wysłać wiadomość do brokera za pomocą komponentu Message Sender.

Sam Message Sender jest dość banalny w swojej implementacji. Wygląda on następująco.

Powyższe snippety kodu pozwalają na stworzenie kolejki po stronie Rabbit’a (jeśli jeszcze nie istnieje w brokerze wiadomości), oraz na podłączenie się do niej przez publisher, którym w tym przypadku jest backend.

Teraz możemy zająć się odbiorem wiadomości po stronie aplikacji mobilnej. Do tego celu warto wykorzystać RxJavę, aby było łatwiej działać po stronie samego Androida. Najpierw tworzę implementację obserwatora Rabbita, który będzie subskrybował się na wcześniej utworzoną kolejkę i nasłuchiwał czy do brokera dotarły jakieś nowe wiadomości z drugiej strony (nadawcy).

Teraz wystarczy jedynie utworzyć DataSource z Observable’m Rabbita.

Na sam koniec nasz Observable wystarczy dodać do odpowiedniego Presentera (widoku).

Od tej pory aplikacja nasłuchuje zmian w brokerze na kolejce.

Podsumowanie

Uważam, że broker RabbitMQ jest bardzo elastycznym rozwiązaniem, które można zaimplementować w wielu miejscach i wielu konfiguracjach. Z łatwością można go używać z Node.JS’em, gdzie dostępnych jest wiele bibliotek połączeniowych czy też na platformie iOS (Objective-C lub Swfit). Dodatkowo broker ten udostępnia wiele możliwości konfiguracji samych kolejek. Więcej znajdziecie na stronie projektu.

Jest tam również mnogość tutoriali, które w prosty i przystępny sposób pokazują jak skonfigurować i zaimplementować publishera oraz subscribera (nie tylko w Javie). Zachęcam do zabawy.


Zdjęcie główne artykułu pochodzi z stocksnap.io.

Patronujemy

 
 
Polecamy
W Nowej Zelandii rozwiązania Microsoftu są popularne. Historia Tomka Kolasy