Data Lake na AWS

Data Lake na AWS-ie. Budowa, problemy i rozwiązania

Czy macie swoje repozytoria kodu? Co z danymi? Jak je zbieracie? Tak, wiemy, to wcale nie takie łatwe. Gdy dane pochodzą z wielu źródeł, powstały w różnych formatach i przychodzą z różną częstotliwością, wtedy trzeba specjalistycznych narzędzi do specjalnych zadań. Narzędzi jest wiele, jednak mamy do opowiedzenia wam o naszym produkcyjnym rozwiązaniu na AWS. Cel był jeden: sprowadzić dane do formy, która pozwoli później targetować reklamy na użytkownikach oglądających filmy na platformie klienta.

Jaki był problem, który mieliśmy rozwiązać.

Naszym klientem była firma oferująca video-streaming na rynek azjatycki. Gdy zaczynaliśmy projekt, to posiadał ‘jedynie’ około 150 milionów aktywnych użytkowników, a aktualnie jest ich 300 milionów. Każdy z nich swoimi wyborami generuje mnóstwo informacji o upodobaniach. Przez upodobania mamy na myśli ulubione typy filmów, aktorów. To trochę mało, by targetować komuś reklamę dlatego, warto też analizować dane jak typ telefonu, płeć, wiek. Można to wykorzystać do stworzenia profilu użytkownika i pokazywać mu reklamy, którymi potencjalnie się zainteresuje. 

Klient poprosił nas o zbudowanie infrastruktury AdTech, która pozwoli mu stać się ‘Walled garden’, czyli bezpiecznego serwisu, który ma monopol na dane klientów i może zarobić na tym. Sam zaproponował, że w tym systemie potrzebny jest Data Lake. Tutaj omówimy tylko mały kawałek tego systemu, który miał za zadanie zbierać dane z różnych źródeł, co stanowiło bazę do tworzenia profilów użytkownik i ich klasyfikacji.

Wyzwania naszego problemu:

Problematyczne źródła danych

Nasz Data Lake opierał się na sześciu różnych źródłach. Część z nich była z różnych serwisów klienta, ale także pojawiły się dane pochodzące od zewnętrznych firm. To jest bardzo częsta praktyka, by móc rozszerzyć możliwości klasyfikacji. Połączyć takie dane można na przykład dzięki ciasteczkom lub analizie podobnych cech. Sposób, w jaki zbieraliśmy dane można podzielić wd. tego jak je dostawaliśmy:

  • bezpośrednie pobieranie z baz klienta,
  • wypychanie ich do naszych bucketów na s3 przez klienta
  • pobieranie ich przez nas z bucketów s3 klienta

Akurat przy tym projekcie nie było plików przetwarzanych w czasie rzeczywistym. Częstotliwość pobierania danych była nam z góry narzucona przez klienta lub zewnętrzne źródła. Od momentu otrzymania danych do przesłania ich do serwera serwującego reklamy mogło minąć w najgorszej sytuacji 3 dni, a czas ten obejmował pobranie plików, przetworzenie ich, stworzenie profili, grup audiencji i wysłanie do ad serverów. Podzieliliśmy przetrzymywane pliki na etapy:

  • brązowy — surowy, dane bezpośrednio ze źródła bez zmian
  • srebrny — czyszczenie, spłaszczanie zagnieżdżonych struktur, transformacje,

Połączenia z DynamoDB i MongoDB okazały się słabo obsługiwane przez AWS, dlatego postanowiliśmy, że łatwiej będzie gdy będzie nam wysyłał dane do naszych bucketów. Pliki pojawiały się u nas regularnie i dzięki temu mieliśmy u nas surowe dane. Mieliśmy pewność, że są nasze i nikt ich nie zmieni. Całkiem inna sytuacja była przy plikach, które mieliśmy sami pobierać z bucketu klienta. 

Z początku wpadliśmy na bardzo zły pomysł, jakim było traktowanie ich jako “nasz brązowy etap”. Pliki surowe służyły jako źródło do kolejnego etapu więc bezpośrednio na nich uruchamialiśmy skrypty ETL-owe. Nie przewidzieliśmy wtedy, że na tych plikach jest stosowany backfilling, czyli uzupełnianie starych plików nowymi danymi. Istniejące pliki, które dotyczyły wydarzeń sprzed 7 dni wstecz, były aktualizowane. Polegało to na usuwaniu starych plików i zastępowaniu ich nowymi, z tą samą nazwą. 

Nasz przepływ przetwarzania odbywał się co parę godzin, więc w ogóle nie korzystaliśmy z zaktualizowanych danych sprzed paru dni. Użycie tych zmian byłoby problematyczne, ponieważ musielibyśmy wiedzieć, co dokładnie się zmieniło w plikach, które już były przetworzone i przesłane do innych serwisów. Jednak poza utratą danych mieliśmy problem z błędami w trakcie transformacji, pyspark rzucał wyjątek, gdy okazywało się, że próbuje zrobić transformacje na plikach, które już nie istnieją. Rozwiązaliśmy to przez kopiowanie tych plików do naszego konta. Nie był to jedyny problem z tymi plikami. 

Były one strasznie zaśmiecone, dodanie metadanych, które pozwalały traktować je jako tabelę rozpoznawało około 2k kolumn. Pojawiały się kolumny typu: session, session_1, session_1_2, session_string, session_string_1. W każdej z tych kolumn mogło się znaleźć potrzebne id sesji dla profilu. Co wymagało dogłębnego przeanalizowaniu tych wszystkich kolumn czy na pewno tam nie ma potrzebnych dla profilu informacji.

Dobra separacja logiczna poszczególnych części systemu

W rozwijaniu naszego rozwiązania Data Lake stosowaliśmy AWS CDK – bibliotekę polecaną przez AWS, umożliwiającą definiowanie infrastruktury za pomocą kodu (obecnie wspierane języki to JavaScript, TypeScript, Python, Java i C#), budowanie szablonów Cloudformation na podstawie wspomnianego kodu oraz ich deployment. Początkowy wszystkie zasoby AWS-owe definiowaliśmy w jednym stacku Cloudformation, jednak w miarę jak projekt się powiększał, pojawiła się potrzeba logicznego podziału kodu, który poprawi jego czytelność oraz ułatwi development. Zdecydowaliśmy się na rozdzielenie stacku Cloudformation na kilka mniejszych, z dokładnie sprecyzowanymi funkcjami:

  • stack z rolami i użytkownikami – wspólny dla wszystkich deploymentów, w którym tworzeni są użytkownicy oraz role IAM, używane przez zewnętrzne seriwsy lub zewnętrznych użytkowników;
  • stack główny – w nim tworzone są wewnętrzne role używane przez serwisy takie jak Glue, LakeFormation, EMR lub EC2. Ponadto tworzone są także buckety S3, VPC, notyfikacje za pomocą SNS, grupy robocze Amazon Athena oraz główna baza danych w katalogu Glue;
  • stacki związane z poszczególnymi źródłami danych – tutaj tworzone są tabele Glue dotyczące poszczególnych źródeł danych, jak również całe przepływy danych (workflow) Glue, zawierające odpowiednie Triggery, Joby i Crawlery – wszystko potrzebne do przetwarzania danych i udostępnienia ich zewnętrznym serwisom;
  • stack związany z monitoringiem – tu tworzony jest dashboard w usłudze Cloudwatch, na którym prezentowane są wybrane metryki ważne dla Data Lake. Ponadto w tym stacku definiowane są też Event Rules Cloudwatcha, dzięki którym jesteśmy notyfikowani jeżeli jakiś Job lub Crawler zakończy swoje działanie z błędami;
  • stack związany ze zmiennymi MWAA – w pewnym momencie projektu zaszła potrzeba używania usługi MWAA (Managed Workflows for Apache Airflow), niedostępnej w tym samym regionie, w którym deployowaliśmy Data Lake. Ten stack ma za zadanie tworzyć parametry w usłudze Systems Manager, za pomocą których mogliśmy przekazywać zmienne dotyczące zasobów zdeployowanych w Data Lake do innego regionu, w którym działała usługa MWAA;
  • stack związany z nieużywanymi funkcjonalnościami – stack potrzebny do deployowania pewnych zmian do istniejących już stacków, więcej szczegółów na jego temat będzie opisanych poniżej.

Tworzenie wielu stacków w jednej aplikacji AWS CDK oraz przekazywanie zasobów utworzonych w jednym stacku do innego stacku jest względnie proste, niestety nie jest pozbawione problemów. Stacki są zależne od siebie, dlatego ich deployment musi zachować odpowiednią kolejność w związku z występującymi zależnościami. Zasoby są przekazywane pomiędzy stackami dzięki automatycznemu tworzeniu eksportów oraz importów Cloudformation przez AWS CDK. Jeżeli stacki są deployowane po raz pierwszy, wtedy nie sprawia to żadnego problemu. Jeśli jednak stacki już istnieją i deployment sprawia, że są uaktualniane, może się zdarzyć, że AWS CDK usiłuje usunąć eksport, który jest importowany w innym stacku. Powoduje to niestety niepowodzenie całego deploymentu.

Jak zaprezentowano na powyższym rysunku, stack B jest zależny od stacku A, więc w trakcie deploymentu najpierw zostanie zaktualizowany stack A, następnie stack B. Jeśli stack B nie potrzebuje już zasobu ze stacku A (np. bucketu S3), AWS CDK usunie z szablonu Cloudformation stacku A eksport bucketu, a z szablonu stacku B import bucketu. Jednak podczas deploymentu pierwszy będzie aktualizowany stack A i usunięcie z niego eksportu skończy się niepowodzeniem, ponieważ niezaktualizowany jeszcze stack B wciąż go potrzebuje. W tym wypadku deployment nawet nie dojdzie do etapu, na którym zaktualizuje stack B! Żeby rozwiązać ten problem, można po prostu usunąć stack B, wtedy stack A zaktualizuje się poprawnie, a następnie stack B zostanie stworzony od nowa. Na produkcji jednak może się zdarzyć, że nie możemy sobie pozwolić na usunięcie stacku, gdzie mamy już jakieś dane.

W celu rozwiązania tej zagwozdki wprowadziliśmy do tej układanki stack C (jest to stack związany z nieużywanymi funkcjonalnościami w naszym kodzie). Stack C nie posiada żadnych ważnych funkcjonalności i można w każdej chwili bezpiecznie go usunąć, a potem ponownie utworzyć od nowa. Dzięki temu jeśli stack B nie potrzebuje już zasobu ze stacku A, w pierwszym kroku dokonujemy zmiany, która usuwa import zasobu ze stacku B, natomiast dodaje go do stacku C. Deployment tej zmiany przebiegnie bez problemu, ponieważ eksport nie zostanie usunięty ze stacku A, natomiast import zniknie ze stacku B. 

W drugim kroku dokonujemy zmiany, która usuwa import zasobu ze stacku C — to pociągnie za sobą również usunięcie eksportu zasobu ze stacku A, ponieważ nigdzie nie jest on importowany. Jednak w tym wypadku przed deploymentem tej zmiany możemy bezpiecznie usunąć stack C, przecież taka jest jego rola. Wtedy eksport zostanie bezpiecznie usunięty ze stacku A, import ze stacku B został usunięty już w poprzednim kroku, a pomocniczy stack C zostanie ponownie utworzony bez importu. 

Sytuacje, w których problemy z przekazywaniem zasobu blokowały deployment były u nas dość rzadkie, zdarzyły się tylko kilka razy w ciągu całego roku pracy nad Data Lake, jednak warto być na nie przygotowanym.

Blueprinty tylko z konsoli AWS

Przy naszym podejściu, gdzie chcieliśmy wszystko mieć stawiane za pomocą kodu pojawił się problem z Blueprintami, które oferuje AWS do ściągania danych z zewnętrznych baz danych. Usługa bardzo ułatwia pracę, ponieważ nie wymaga zabaw ze sterownikami do najbardziej popularnych silników. Jednak aws cdk nie oferowało, żadnych konstruktów do tego serwisu. Przy rozpoczęciu implementacji tej funkcjonalności mieliśmy już postawione parę przepływów danych dla Glue. Postanowiliśmy wykorzystać je i rozszerzyć o możliwość użycia skryptów PySpark-a, które udostępniał AWS publicznie. Przez wrzucenie ich jako kod źródłowy job-a i stworzenia własnej abstrakcji byliśmy w stanie tworzyć “własne” Blueprinty z kodu.

Praca z AWS Glue — znajomość Pysparka mocno polecana

AWS bardzo mocno reklamuje usługę Glue jako proste rozwiązanie do tworzenia skryptów ETL, używając abstrakcji nad DataFrame Sparka — DynamicFrame. DynamicFrame z Glue jest tworem, który nie wymaga uprzednio zdefiniowanej schemy, tylko tworzy ją automatycznie na podstawie wczytywanych danych. Jest to bez wątpienia podejście ułatwiające pracę z mało ustrukturyzowanymi danymi, jednak mnóstwo rzeczy dzieje się pod spodem. Po przepisaniu skryptu napisanego pod Glue na Pysparka może się okazać, że trzeba ręcznie dodawać rzeczy, które DynamicFrame robił automatycznie nawet bez naszej wiedzy. Dlaczego jednak wspominamy tutaj o Pysparku, jeśli można skrypt całkowicie oprzeć o metody udostępniane przez DynamicFrame? Otóż okazuje się, że korzystanie tylko z DynamicFrame może być bardzo niewydajne przy transformacjach większych ilości danych. W pracy z Glue zdarzało nam się przepisywać transformacje z tych natywnych dla Glue na czystego SparkSQL-a, co zdecydowanie przyspieszało czas przeprowadzenia transformacji oraz redukowało zużycie pamięci egzekutorów Sparka.

Nie trzeba od razu porzucać możliwości oferowanych przez DynamicFrame Glue, jednak warto pamiętać o tym, że warto schodzić również do poziomu sparkowych DataFrame jeśli myśli się o optymalizacji swoich ETL-i. Z DynamicFrame można w każdej chwili przeskoczyć na DataFrame i z powrotem do DynamicFrame, w międzyczasie używając metod dostępnych dla DataFrame, żeby cache-ować pośredni wynik używany później w kilku miejscach skryptu lub równomiernie rozdzielić dane między partycjami przerabianymi przez egzekutory Sparka. Korzystamy wtedy z dobrodziejstw obu światów i można dzięki temu bardzo usprawnić swoje ETL-e oraz zredukować ich koszty, co jest naprawdę ważne w kontekście braku tanich spotów dla Jobów Glue. 

Nie można jednak zapomnieć o jednym sporym mankamencie użytkowania Glue — zapisywanie danych do partycjonowanych tabel Glue było po prostu problematyczne. Dane poprawnie zapisywały się na S3, jednak partycje nie były dodawane do Hive metastore tabeli Glue, co efektywnie sprawiało, że dane nie były widoczne w tabeli. Dodawanie partycji podczas zapisu danych działało w przypadku stosowania tabel z klasyfikacją “glueparquet”, jednak zdecydowanie zwiększało ilość zasobów zużywanych przez Job Glue. W jednym przypadku ze względu na duże problemy z wydajnością jednego z Jobów zdecydowaliśmy się (zgodnie z radą supportu AWS) po prostu dodać do naszego przepływu danych Crawler dodający nowe partycje do tabeli Glue. Mam nadzieję, że ten dość uciążliwy problem zostanie w pewnym momencie przez AWS rozwiązany.

Można też przyjąć inną strategię i stosować Glue wyłącznie jako miejsce do opisu metadanych, czyli używać katalogu danych Glue jako metastore. Podczas pracy z Data Lake jeden z naszych przepływów danych musieliśmy przenieść na usługę EMR, dzięki czemu mogliśmy używać tanich instancji typu spot. Połączenie EMR-a z katalogiem Glue było względnie bezproblemowe, jednak warto tutaj wspomnieć o kilku ważnych szczegółach, które mogą sprawić trochę problemów przy łączeniu Glue z EMR-em. Pierwszym z nich jest potrzeba nadania roli IAM wykorzystywanej przez Sparka uprawnień database_creator w AWS LakeFormation. 

Spark tworzy dla siebie bazę tymczasową global_temp, a bez odpowiednich uprawnień LakeFormation sprawi to, że job Sparkowy bardzo szybko przestanie działać, rzucając błędami związanymi z uprawnieniami. Drugi problem to pobieranie listy baz danych z katalogu Glue. Nawet jeśli chcemy tylko ustawić domyślną bazę danych na taką, z której będziemy korzystać, Spark próbuje najpierw ściągnąć listę dostępnych mu baz danych i bazy “default”, niezależnie czy ma do niej dostęp, czy nie. Jeśli nie ma dostępu, wtedy znowu job rzuca błędem. Ostatnim problemem, z którym się spotkaliśmy była potrzeba dodania lokalizacji S3 do bazy danych Glue. Jest to opcjonalny parametr, jednak okazuje się, że jeśli jest on pusty, zapisywanie do tabel Glue kończy się niepowodzeniem. Teoretycznie jako lokalizacja S3 powinien być użyty parametr ustawiony w tabeli Glue. Jeśli baza danych posiada lokalizację S3, rzeczywiście jest ona ignorowana i do zapisu wykorzystuje się lokalizację S3 tabeli. Jeśli jednak lokalizacja S3 bazy jest pusta, dostajemy błędy.

Problemy z Atheną — partycjonowanie i spora liczba małych plików

Dzięki Athenie mamy dostęp do plików z s3 i możemy o nie odpytywać za pomocą SQL, tak jakby były bazami relacyjnymi. Przy korzystaniu z Atheny płaci się za ilość danych przeskanowanych, ale można to zoptymalizować za pomocą partycji. Z poziomu użytkownika wygląda to jak dodatkowe kolumny do odpytywania, ale pod spodem jest to podział plików za pomocą folderów. Bardzo przydatnym partycjonowaniem może być za pomocą dat — czyli w UI widzimy kolumny year, month, day. Pod spodem mamy strukturę katalogów:

  • year=2020
    • month=04
      • day=01
    • month=05
  • year=2021

Dzięki takiemu podejściu Athena skanuje mniej plików, by otrzymać wynik, co jest tańsze i szybsze. Jednym z problemów, które napotkaliśmy w trakcie użytkowania tego narzędzia, były małe pliki, które tworzyły nasze skrypty. Podczas transformacji PySpark rozdziela transformacje na równoległe procesy. Przy okazji tworząc osobne pliki, które nie powinny być za małe > 128 MB, ponieważ każdy plik to dodatkowy czas na:

  • otwarcie go,
  • listowanie,
  • pobieranie metadanych,
  • wczytywanie nagłówków i słowników kompresji.

Pliki też nie mogą być zbyt duże, bo wtedy nie można ich szybko odczytywać równolegle.

Podejście GitOps przy developmencie Data Lake

Każdy programista w teorii powinien pushować swoje zmiany tak często jak to możliwe. Jeśli coś mu się stanie, to mamy pewność, że inny pracownik może przejąć jego pracę. Jednak przyjmowanie i sprawdzanie czyjejś pracy nie jest takie proste. Skąd mamy wiedzieć, czy jego kod działa? Czy można go bez problemu wypuścić na produkcję? Czy przechodzą testy? Oczywiście można to zrobić manualnie, ale nowoczesne systemy mają CI/CD. Czyli automatycznie puszczane skrypty, po pushu do repozytorium które:

  • uruchamiają testy,
  • stawiają aplikację.

Jeżeli któryś z kroków się nie powiedzie, to wiemy, że trzeba zacząć od naprawy tego.

Przy naszym systemie mogliśmy stawiać infrastrukturę z kodu, więc nic nie stało na przeszkodzie, by stawiać go z każdego brancha. Każdy programista mógł stworzyć swojego Data Lake i testować swoje zmiany na nim. Wielu programistów niestety nie czyściło po sobie postawionych stacków co powodowało, że dobijaliśmy do limitów zasobów na koncie AWS. Rozwiązaliśmy to przez skrypty kasujące, które usuwały wszystkie zasoby developerów.

Testy

W naszym systemie mieliśmy 3 rodzaje testów:

  • sprawdzające szablony Cloudformation,
  • kontrakty do tabel, z których korzystały inne zespoły,
  • weryfikacyjne danych na końcu każdego przepływu danych.

Zwykle przy unit testach sprawdzamy wynik danej funkcji, czy się zgadza z oczekiwanym rezultatem. Tutaj nie było możliwości w większości zrobić takich testów, bo szablon generował się za jednym zamachem dla całego kodu. Jednak to nie przeszkodziło nam by pisać testy. Za pomocą weryfikowania odpowiednich elementów w szablonie byliśmy w stanie testować:

  • listę uprawnień przydzielonych do ról,
  • parametry przekazywane do jobów,
  • konfigurację serwisów,

Drugi tym testów, czyli kontrakty, których jedynym zadaniem było pilnowanie odpowiednich schemat tabel, które były udostępniane dla innych zespołów. Przed stworzeniem tabel, udostępniliśmy te testy w formie JSON-ów zespołowi, który głównie korzystał z nich. Robiliśmy to, by się upewnić, że jest tam wszystko, czego potrzebują w formacie, który ułatwi im pracę. Ostatnim typem testów były weryfikacje przetworzonych danych/tabel. Opierały się na sprawdzaniu wcześniej stworzonych wymagań co do konkretnych danych, które w teorii powinny być spełnione przez nasze skrypty PySpark-owe. Było tam sprawdzane m.in.:

  • unikalność wierszy,
  • czy kolumny zawierają tylko oczekiwane wartości (np. male, female, undefined),
  • oczekiwany format dat,
  • brak null-i w danej kolumnie.

Dla kogo to jest dobre rozwiązanie?

Jeżeli potrzebujesz ściągać dane z różnych źródeł w różnych formatach i do tego z różną częstotliwością to Data Lake jest rozwiązaniem. Zwiększająca się liczba plików, która jest liczona w TB, może przysporzyć problemy, gdy chcemy przetrzymywać dane na własnych serwerach. Rozwiązania chmurowe odciążają nas z tego problemu i zapewniają bezpieczeństwo, jakim jest trzymanie plików w paru miejscach. Jest już parę firm, które proponują swoje rozwiązania i integrację z chmurami. 

Wybór jest bardzo szeroki, jednak  nie każdy pozwala na stworzenie całej infrastruktury z kodu. Wyklikana architektura z pewnością będzie działać, jednak wgryzienie się w jej wszystkie konfiguracje może być niemożliwe. Data Lake to nie jest rozwiązanie na wszystkie bolączki związane z big data. Ta koncepcja jest dla konkretnych problemów i nieprzemyślane używanie jej może rodzić wiele problemów.


Zdjęcie główne artykułu pochodzi z unsplash.com.

Zapraszamy do dyskusji

Patronujemy

 
 
More Stories
Mozilla wspiera Apple w planach ochrony przed śledzeniem użytkowników
Mozilla wspiera Apple w planach ochrony przed śledzeniem użytkowników. Facebook przeciwny