dramatiq exatel

Dramatiq jako kolejka zadań. Jak usprawnić skalowanie systemu

W jednym z naszych projektów pojawiła się potrzeba jednoczesnego uruchamiania setek lub tysięcy zadań. Były to najczęściej idempotentne, niezależne od siebie funkcje uruchamiane w reakcji na zdarzenie w systemie, na których wynik możemy trochę poczekać. Przykładami takich zadań było generowanie i wysyłanie raportów czy ekstrakcja danych z pliku i zapis do bazy danych.

Większość kodu niewymagającego najwyższej wydajności napisaliśmy w Pythonie. W związku z tym szukaliśmy projektu w łatwy sposób pozwalającego na integrację właśnie z tym językiem. O tym w artykule.

Szymon Nogieć
Latest posts by Szymon Nogieć (see all)

Wymagania, które stawialiśmy kolejce zadań to:

  • wysoka niezawodność i wydajność,
  • prostota skalowania,
  • łatwość użycia,
  • integracja z Pythonem w wersji 3.5+.

Mile widziane były cechy, takie jak:

  • możliwość wykorzystania Redisa jako backend dla zadań i wyników,
  • priorytetyzacja zadań,
  • automatyczne ponawianie,
  • pipelining,
  • prostota implementacji.

Rozwiązania jakie braliśmy pod uwagę to Celery, rq, dramatiq, huey. Które wybraliśmy i dlaczego?

Celery odrzuciliśmy z powodu wielkości, skomplikowania i problemów z używaniem. Nie spełniał kilku z naszych wymagań (prostota implementacji, priorytetyzacja zadań). 

rq wykorzystujemy w TAMA i także sprawiała trochę problemów. Jest także dosyć prosta i nie spełnia kilku z cech nice-to-have (niezawodność, automatyczne ponawianie). Dodatkowo brakuje możliwości równoległego przetwarzania bez uruchamiania wielu instancji rq. Chcieliśmy także spróbować innego rozwiązania niż rq – potencjalnie lepszego.

Wybieraliśmy pomiędzy huey a dramatiq. Zdecydowaliśmy się na to drugie, ponieważ jako jedno z głównych wymagań twórcy postawili sobie “high reliability and performance”. Dodatkowo twórca dramatiq zapewnia niezawodność dostarczania zadań do workerów i auto przeładowywanie kodu. Całe porównanie między rozwiązaniami można znaleźć na tej stronie. Jako backend do kolejkowania zadań pozwala na użycie m.in. Redis czy RabbitMQ. Ze względu na dużą znajomość Redisa w zespole oraz fakt, iż już używaliśmy go w projekcie, to właśnie tej bazy użyliśmy jako backend komunikacyjny.

Użycie

Zaprezentowane przykłady uruchamiam z developerskich kontenerów i docker-compose zdefiniowanymi w następujący sposób.

docker-compose.yml

Dockerfile

Instancje aplikacji workera włączam przy użyciu aplikacji dramatiq. Uruchamia ona wiele równoległych procesów workera, które pobierają z backendu zakolejkowane zadania i je realizuje. Jako parametry przyjmowane są nazwy modułów, w których znajdują się definicje zadań do realizacji w ramach workera. Dramatiq pozwala także na parametryzację liczby używanych procesów/wątków (domyślnie procesów jest tyle, ile rdzeni procesora i 8 wątków na każdy proces).

Funkcję do wykonania w ramach kolejki definiujemy w dramatiq jako aktora. Jest to cienki wrapper dla callable’i zawierający metadane dotyczącego tego, w jaki sposób powinny zostać one asynchronicznie uruchamiane w ramach kolejki.

Podstawowy przykład wygląda w ten sposób:

tasks.py

Tak udekorowaną funkcję możemy wywoływać w normalny, synchroniczny sposób:

Taką funkcję możemy także wysłać na worker do wykonania asynchronicznego – to znaczy zlecenie realizacji zadania, gdy instancja workera będzie miała taką możliwość. Najpierw jednak należy ustawić konfigurację backendu dla kolejek jako Redis, gdyż domyślnie dramatiq wykorzystuje RabbitMQ:

tasks.py

Następnie możemy wysłać trzy zadania do realizacji. Wywołanie metody send() natychmiast kolejkuje je do wykonania, wykorzystując wcześniej skonfigurowany backend redisowy.

main.py

Następnie uruchamiamy całość z użyciem docker-compose:

Oczywiście do aktorów możemy przekazywać parametry. Jedynym warunkiem, który muszą spełnić jest możliwość serializacji do JSONa, ponieważ w ten sposób zostają przesłane i zapisane w backendzie. Serializator JSON można zmienić, na przykład na pickle

Innym sposobem definicji aktorów jest używanie struktury klas. Aby zdefiniować zadania dla kolejki w ten sposób, należy zdefiniować klasę dziedziczącą po GenericActor i implementując metodę perform(self).

tasks.py

main.py

Ustawienia dla aktorów

Każdemu z aktorów możemy ustawiać parametry ich wykonywania. W przypadku dekoratora jest to realizowane przez przekazywanie do niego argumentów, dla aktorów opartych o hierarchię klas jest to realizowane w postaci parametrów wewnętrznej klasy Meta. Poniżej spojrzymy na kilka z nich.

Powtórzenia

Dla każdego aktora możemy zdefiniować parametry określające w jaki sposób workery mają realizować jego powtarzanie w przypadku niepowodzenia (na przykład rzucenie wyjątkiem). Możemy zdefiniować np. maksymalną liczbę powtórzeń, warunek kiedy zadanie ma być uruchamiane ponownie, czas po jakim ma zostać powtórzone, wyjątki które nie rozpoczną powtórnego uruchomienia, etc.

Poniżej mamy przykład aktora, który zostanie uruchomiony ponownie maksymalnie dwa razy:

tasks.py

i uruchamiamy go raz. Po starcie widzimy:

W logach widzimy, że zadanie zostało uruchomione trzykrotnie. Pierwszy raz było to normalne wykonanie, a po wyrzuceniu wyjątku – dwie próby ponownego wykonania. Po przekroczeniu limitu nastąpiło porzucenie zadania.

Czas

Dramatiq pozwala także na konfigurację maksymalnego czasu dla zadania i maksymalny czas jaki dane zadanie może oczekiwać w kolejce. Poniżej mamy funkcję, której zadania mogą czekać w kolejce maksymalnie 1 sekundę, a jej maksymalny czas wykonania to 5 sekund:

tasks.py

W przypadku przekroczenia maksymalnego czasu wykonania (jak w powyższym przypadku) zostaje rzucony wyjątek TimeLimitExceeded, a zadanie zostanie zatrzymane.

Poszczególne zadania możemy także uruchamiać z opóźnieniem. Realizowane jest to poprzez wywołanie udekorowanej funkcji z metodą send_with_options zamiast zwykłego send. Przykładowo, time_limited_task zostanie wywołane z półsekundowym opóźnieniem:

main.py

Priorytetyzacja

Dramatiq pozwala na definiowanie priorytetów dla poszczególnych zadań. Podobnie jak z innymi opcjami, definiowane są jako parametr dekoratora. Może to być przydatne w przypadku dużego obciążenia workerów i potrzeby wykonywania pilnych zadań w pierwszej kolejności.

tasks.py

main.py

W wyniku zdefiniowania priorytetów, wcześniej kolejkowane zadania z niskim priorytetem zostają odłożone na później, na rzecz zadań wysoko priorytetowych:

Wyniki działań aktorów

Domyślnie dramatiq nie przechowuje wyników zwracanych przez zadania. Aby mieć taką możliwość należy zdefiniować backend dla wyników i dodać do naszego brokera. Użyjemy ponownie Redisa:

tasks.py

Następnie definiujemy zadanie, które musi w parametrach posiadać opcję store_results na true:

tasks.py

Następnie kolejkujemy zadanie:

main.py

otrzymując wynik:

Testy jednostkowe

Dramatiq posiada klasy stub do wykorzystania w testach jednostkowych. Są to między innymi StubBroker zastępujący zwykły broker i StubBackend do użycia zamiast normalnego backendu dla wyników funkcji. 

Moduł z zadaniami możemy dostosować do wykonywania testów jednostkowych, na przykład przy użyciu zmiennej środowiskowej:

tasks.py

Następnie dla wygody możemy zdefiniować z użyciem pytest fixture’y dla stub_brokera oraz stub_workera i umieścić je w conftests.py:

tests/conftest.py

I zacząć pisać testy:

tests/test_tasks.py

Uruchomić je możemy wewnątrz kontenera task_sender za pomocą skryptu:

run_unit_tests.sh

I uruchomienie:

Podsumowanie

Dramatiq dobrze sprawdził się w naszym zastosowaniu. Mocno uprościł skalowanie systemu – w przypadku dużego obciążenia workerów wystarczy uruchomić kilka kolejnych instancji wpiętych do tej samej bazy Redisa i to wszystko. W przypadku wymagania wysokiej dostępności i niezawodności istnieje możliwość konfiguracji także klastra Redisowego, aby wyeliminować potencjalne problemy z brakiem dostępu/awarią Redisowego backendu.

Dodatkowo, kod źródłowy jest nieskomplikowany, pozwala na wiele opcji konfiguracyjnych. Programista może na przykład dodawać własne middleware’y, zawierające informacje, które mają być współdzielone pomiędzy wątkami workera, takie jak np. uchwyty do baz danych czy konfiguracje.

Kod źródłowy wykorzystanych przykładów można znaleźć na GitHubie.

Zdjęcie główne artykułu pochodzi z unsplash.com.

Zapraszamy do dyskusji

Patronujemy

 
 
More Stories
Sytuacja testerów na rynku IT. Dwa rozwiązania na brakujących specjalistów