Data Lake na AWS

Data Lake na AWS-ie. Budowa, problemy i rozwiązania

Czy macie swoje repozytoria kodu? Co z danymi? Jak je zbieracie? Tak, wiemy, to wcale nie takie łatwe. Gdy dane pochodzą z wielu źródeł, powstały w różnych formatach i przychodzą z różną częstotliwością, wtedy trzeba specjalistycznych narzędzi do specjalnych zadań. Narzędzi jest wiele, jednak mamy do opowiedzenia wam o naszym produkcyjnym rozwiązaniu na AWS. Cel był jeden: sprowadzić dane do formy, która pozwoli później targetować reklamy na użytkownikach oglądających filmy na platformie klienta.

Noemi KowalewskaNoemi Kowalewska. Software Developer w Clearcode. Doświadczona programistka, pracująca od paru lat przy projektach związanych z AWS gdzie przetwarzane są ogromne ilości danych. Wielokrotna mentorka warsztatów z Pythona i lutowania. Poza tym uwielbia w wolnych chwilach chodzić po górach, zwiedzać industrialne obiekty i podziemia.

Tomasz KlemensTomasz Klemens. Software Developer w Clearcode. Karierę akademicką z zakresu chemii zmienił na programowanie, dzięki temu w końcu znajduje czas na warzenie piwa. Od ponad 2 lat zajmuje się Pythonem i backendem, chociaż hobbystycznie uczy się też frontendu (React, Typescript). Przez ostatnie kilka miesięcy sporo pracował przy projektach związanych z AWS-em, Big Data i przetwarzaniem danych za pomocą rozwiązań opartych o Apache Spark.

Czym jest Data Lake?

Podstawowe informacje

Na każdy kroku towarzyszą nam dane. Gdy włączamy telefon, laptop czy kupujemy pietruszkę, gdzieś jest zapisywana informacja o tych czynnościach. Trzy razy na sekundę wytwarzamy ilość danych porównywalną z zawartością wszystkich zbiorów biblioteki Kongresu USA – tak twierdzi Nate Silver, znany statystyk, redaktor naczelny FiveThirtyEight. Jednak przy dużej ilości danych pojawiają się problemy, co z nimi robić? Nie bez powodu dane nazywa się ropą XXI wieku, ponieważ można z nich wyciągać ogromną wartość. Najpierw jednak trzeba je uporządkować.

Wyobraźcie sobie korporację, która ma wiele różnych działów i w każdej inny system, przez co ogrom różnych danych w przeróżnych formatach:

  • pliki płaskie (CSV, logs, XML, JSON),
  • nieustrukturyzowane dane (e-maile, dokumenty, PDF-y), 
  • dane binarne (obrazki, dźwięk, video),
  • ustrukturyzowane – bazy danych.

Muszą dzielić się danymi między sobą i jest to straszne skomplikowane. Każdy ma swój sposób dzielenia się nimi, niektóre są już przetworzone, inne surowe. Totalny bałagan. Tutaj właśnie frustracje przechowywania i dzielenia się danymi ratuje Data Lake. Pozwala on na:

  • operacje na danych – odpytywanie, przetwarzanie, 
  • bezpieczeństwo – kontrolę dostępu uprawnionym osobom,
  • analitykę – bez przenoszenia możliwość analizy analitycznej,
  • nieustannych ruch danych – duży przyrost plików to nie problem,
  • łatwe zrozumienie zawartości przez katalogowanie i indeksowanie.

Duża elastyczność w tym rozwiązaniu jest spowodowana tym, że w odróżnieniu od hurtowni danych, gdzie stosuje się podejście Schema-on-Write (już przy zapisie decydujemy o strukturze), tutaj mamy Schema-on-Read (dopiero przy odczycie modelujemy schemat). Od lat 80 powszechnie uważa się  hurtownie danych za podstawowy sposób na wyciąganie wiedzy z dużych zbiorów. Od początku miały także jeden duży minus, jakim są koszty. W przedstawionym rozwiązaniu możemy magazynować tanio bardzo duże ilości danych, dopiero gdy pojawi się potrzeba dostosowania ich do oczekiwanych struktur.

Jeziora różnych dostawców

Azure:

Dane przechowywane są na Azure Blob Storage, z hierarchiczną przestrzenią nazw. Sama analiza i przetwarzanie danych odbywają się za pomocą narzędzi opartych o HDFS (Hadoop Distributed File System – rozproszony system plików Hadoop), takich jak: 

  • Azure Data Factory,
  • Azure HDInsight,
  • Azure Databricks,
  • Azure Synapse Analytics,
  • Azure Data Explorer,
  • Power BI.

Sam Azure Synapse oferuje też możliwość konstrukcji relacyjnych baz danych w formatach kolumnowych. Data Lake na Azure jest silnie związany z Azure Blob Storage, dlatego koszty Data Lake to koszty wykorzystywania usługi Blob Storage. Wykorzystywanie dodatkowych narzędzi do analizy i przetwarzania danych jest rozliczane osobno.

Qubole:

Kładzie bardzo duży nacisk na format OpenSource i brak uzależniania się od dostawcy (vendor lock-in). Dzięki temu oferowane przez Qubole rozwiązanie jest niezależne od chmury i możliwe do przeniesienia do dowolnego dostawcy usług w chmurze. Zintegrowane UI dostosowane jest pod różne role w projekcie (data science, data engineer). Rozwiązanie to dostępne jest na takich platformach jak AWS, Google Cloud, Microsoft Azure i Oracle Cloud. Posiada również sporo integracji z różnymi zewnętrznymi serwisami. Dane przechowywane są w takich formatach jak np. Parquet lub ORC w wybranej przestrzeni magazynowej (w chmurze lub on-premises). 

Do przetwarzania danych mogą zostać wykorzystane takie technologie, jak Apache Spark, z kolei metadane związane z przechowywanymi danymi mogą być przechowywane w katalogu danych, takim jak Apache Hive. Zarządzanie uprawnieniami do danych oraz ich bezpieczeństwem można osiągnąć za pomocą takich technologii jak Apache Ranger, oraz Apache Sentry. W przypadku tego rozwiązania płaci się za wykorzystanie jednostek obliczeniowych Qubole na godzinę, które są skalowane w zależności od wielkości instancji do przetwarzania danych. Za same koszty obliczeniowe wspomnianych instancji, a także networking i magazynowanie danych, trzeba zapłacić osobno wybranemu dostawcy usług w chmurze.

Intelligent Data Lake (Informatica):

Data Lake jest oparty na takich technologiach jak Apache Hive, Apache HDFS i Apache Hadoop. Posiada możliwość połączenia z różnymi innymi rozwiązaniami do przechowywania danych proponowanymi przez takich dostawców takich jak Microsoft Azure, AWS, Google Cloud Platform, czy Snowflake. Spory nacisk na łatwy w obsłudze interfejs i mało pisania kodu. Informacje o kosztach można uzyskać po kontakcie z działem marketingu.

Infor:

W tym ujęciu Data Lake Infor zapewnia własny katalog metadanych i wewnętrzne zarządzanie tym katalogiem, łącznie z tworzeniem metagrafu obrazującego zależności pomiędzy poszczególnymi źródłami lub zbiorami danych. Solidny monitoring i integracja z innymi usługami oferowanymi przez Infor ma zapewnić bezproblemowe działanie Data Lake. Przykładowa implementacja przedstawiona przez Infor jest oparta o chmurę AWS, jednak wykorzystuje jedynie instancje EC2, budując na nich całe rozwiązanie Infor. Informacje o kosztach można uzyskać po kontakcie z działem marketingu.

Delta Lake (Databricks):

Tutaj mamy twórcę Apache Sparka (platformę do obliczeń rozproszonych), który poza tym, że stworzył swoje rozwiązanie, to jeszcze połączył je z hurtownią danych nazywając Lakehouse. Chwalą się, że ich Delta Lake jest zgodny z zasadami ACID i pozwala na wersjonowanie danych, zmienianie, optymalizacje małych plików oraz walidację danych. Możliwość odpytywania tabel Delta Lake przez silnik Presto do rozproszonych zapytań SQL jest stale rozwijana i sprawia pewne problemy tylko w przypadku tabel z partycjonowanymi danymi. Koszt tego rozwiązania jest zależny od platformy chmurowej, z której się korzysta (dostępne są trzy: AWS, Azure, Google). Płaci się za wykorzystany DBU (Databricks Unit), czyli jednostkę reprezentującą zużycie mocy przetwarzania na platformie Databricks i oczywiście za zasoby chmurowe.

AWS (LakeFormation):

Propozycja AWS-a na Data Lake to połączenie kilku dostępnych na AWS-ie serwisów i zarządzanie pozwoleniami do danych za pomocą usługi AWS LakeFormation. Do przechowywania danych służy usługa S3. AWS Glue jest polecanym rozwiązaniem do katalogowania i przetwarzania danych (za pomocą technologii Apache Hive i Apache Spark). Jako interfejs dla danych używana jest usługa Amazon Athena (rozproszone zapytania SQL-owe oparte o Presto). Integracja z usługą EMR do przetwarzania danych znajduje się obecnie w wersji beta, jednak można jej już używać. Samo wykorzystanie LakeFormation jest bezpłatne, opłaty są natomiast związane z używaniem pozostałych usług AWS. 

AWS Lake Formation i inne, czyli przepis AWS na Data Lake

Dawniej (do sierpnia 2019) w zarządzaniu dostępem do tabel w katalogu metadanych Glue wykorzystywane były osobne pozwolenia IAM do tabel, baz danych i katalogów, nadawane rolom i użytkownikom. LakeFormation miał za zadanie scentralizować zarządzanie tymi dostępami, istnieje jednak opcja wyłączenia LakeFormation na poziomie konta AWS i powrotu do starego sposobu zarządzania dostępami.

Poniżej przykład potrzebnych uprawnień nadawanych użytkownikowi, który ma mieć dostęp do tabeli.

W starym podejściu rola lub użytkownik musi mieć odpowiednie uprawnienia do tabel oraz dostęp do źródła danych, które opisują tabele (np. bucket S3). W przypadku nowego podejścia źródło danych jest rejestrowane jako zasób Data Lake, podczas którego do danego źródła dołączana jest wewnętrzna rola serwisu LakeFormation, która ma do niego odpowiednie uprawnienia. Do tabel czy też baz danych nadawane są pozwolenia dla poszczególnych ról lub użytkowników w sposób bardzo podobny co przyznawanie uprawnień w SQL-owych bazach danych (np. SELECT, INSERT, DROP TABLE). 

W przypadku jakiegokolwiek wykorzystania takiej tabeli np. w trakcie przetwarzania danych, jeżeli dany użytkownik lub rola ma odpowiednie uprawnienia do tabeli, dostęp do źródła danych (np. S3) jest realizowany poprzez wewnętrzną rolę LakeFormation. Dzięki temu osoba pracująca na danych tabelach nie ma bezpośredniego dostępu do danych na S3, a może jedynie wykorzystywać tabele Glue jako interfejs do pracy z danymi.

Ponadto poszczególnym użytkownikom czy rolom można nadawać różne uprawnienia, a także uprawnienia do nadawania uprawnień innym użytkownikom/rolom. Zarządzanie dostępami staje się w tym wypadku wygodne i scentralizowane. Istnieje również możliwość nadawania uprawnień tylko do poszczególnych kolumn tabeli, bardzo ważna w przypadku pracy z wrażliwymi danymi. AWS posiada również kilka predefiniowanych szablonów do pobierania danych z baz relacyjnych 

LakeFormation jako względnie nowy produkt ma niestety swoje minusy. Integracja z usługą EMR znajduje się ciągle w wersji beta, jeśli więc, zamiast Glue wolisz wykorzystać czystego Sparka i zredukować koszty za pomocą wykorzystywania tańszych spotów na EC2, to musisz liczyć się potrzebą zrobienia pewnych obejść w używanej infrastrukturze (głównie jest to potrzeba nadawania bezpośrednich dostępów do bucketów S3). Jeśli EMR ma być wykorzystywany przez zespoły data science lub do analityki danych, EMR pozostaje dobrą opcją, ponieważ integracja z LakeFormation, jeśli użytkownicy korzystają z uwierzytelnienia za pomocą protokołu SAML, działa bez zarzutu.

Innym mankamentem tego rozwiązania jest brak automatyzacji w zarządzaniu samymi tabelami. W przeciwieństwie do rozwiązań DataBricks AWS-owe tabele Glue nie mają zaimplementowanych transakcji ACID oraz automatycznego scalania małych plików w większe, zwiększające wydajność przy odpytywaniu tabel. AWS jednak podchodzi rozwojowo do oferowanego przez siebie Data Lake i w opcji preview dostępne są już tabele typu “governed table”, które wprowadzają wspomniane transakcje ACID i automatyczne scalanie mniejszych plików.

Jaki był problem, który mieliśmy rozwiązać.

Naszym klientem była firma oferująca video-streaming na rynek azjatycki. Gdy zaczynaliśmy projekt, to posiadał ‘jedynie’ około 150 milionów aktywnych użytkowników, a aktualnie jest ich 300 milionów. Każdy z nich swoimi wyborami generuje mnóstwo informacji o upodobaniach. Przez upodobania mamy na myśli ulubione typy filmów, aktorów. To trochę mało, by targetować komuś reklamę dlatego, warto też analizować dane jak typ telefonu, płeć, wiek. Można to wykorzystać do stworzenia profilu użytkownika i pokazywać mu reklamy, którymi potencjalnie się zainteresuje. 

Klient poprosił nas o zbudowanie infrastruktury AdTech, która pozwoli mu stać się ‘Walled garden’, czyli bezpiecznego serwisu, który ma monopol na dane klientów i może zarobić na tym. Sam zaproponował, że w tym systemie potrzebny jest Data Lake. Tutaj omówimy tylko mały kawałek tego systemu, który miał za zadanie zbierać dane z różnych źródeł, co stanowiło bazę do tworzenia profilów użytkownik i ich klasyfikacji.

Wyzwania naszego problemu:

Problematyczne źródła danych

Nasz Data Lake opierał się na sześciu różnych źródłach. Część z nich była z różnych serwisów klienta, ale także pojawiły się dane pochodzące od zewnętrznych firm. To jest bardzo częsta praktyka, by móc rozszerzyć możliwości klasyfikacji. Połączyć takie dane można na przykład dzięki ciasteczkom lub analizie podobnych cech. Sposób, w jaki zbieraliśmy dane można podzielić wd. tego jak je dostawaliśmy:

  • bezpośrednie pobieranie z baz klienta,
  • wypychanie ich do naszych bucketów na s3 przez klienta
  • pobieranie ich przez nas z bucketów s3 klienta

Akurat przy tym projekcie nie było plików przetwarzanych w czasie rzeczywistym. Częstotliwość pobierania danych była nam z góry narzucona przez klienta lub zewnętrzne źródła. Od momentu otrzymania danych do przesłania ich do serwera serwującego reklamy mogło minąć w najgorszej sytuacji 3 dni, a czas ten obejmował pobranie plików, przetworzenie ich, stworzenie profili, grup audiencji i wysłanie do ad serverów. Podzieliliśmy przetrzymywane pliki na etapy:

  • brązowy — surowy, dane bezpośrednio ze źródła bez zmian
  • srebrny — czyszczenie, spłaszczanie zagnieżdżonych struktur, transformacje,

Połączenia z DynamoDB i MongoDB okazały się słabo obsługiwane przez AWS, dlatego postanowiliśmy, że łatwiej będzie gdy będzie nam wysyłał dane do naszych bucketów. Pliki pojawiały się u nas regularnie i dzięki temu mieliśmy u nas surowe dane. Mieliśmy pewność, że są nasze i nikt ich nie zmieni. Całkiem inna sytuacja była przy plikach, które mieliśmy sami pobierać z bucketu klienta. 

Z początku wpadliśmy na bardzo zły pomysł, jakim było traktowanie ich jako “nasz brązowy etap”. Pliki surowe służyły jako źródło do kolejnego etapu więc bezpośrednio na nich uruchamialiśmy skrypty ETL-owe. Nie przewidzieliśmy wtedy, że na tych plikach jest stosowany backfilling, czyli uzupełnianie starych plików nowymi danymi. Istniejące pliki, które dotyczyły wydarzeń sprzed 7 dni wstecz, były aktualizowane. Polegało to na usuwaniu starych plików i zastępowaniu ich nowymi, z tą samą nazwą. 

Nasz przepływ przetwarzania odbywał się co parę godzin, więc w ogóle nie korzystaliśmy z zaktualizowanych danych sprzed paru dni. Użycie tych zmian byłoby problematyczne, ponieważ musielibyśmy wiedzieć, co dokładnie się zmieniło w plikach, które już były przetworzone i przesłane do innych serwisów. Jednak poza utratą danych mieliśmy problem z błędami w trakcie transformacji, pyspark rzucał wyjątek, gdy okazywało się, że próbuje zrobić transformacje na plikach, które już nie istnieją. Rozwiązaliśmy to przez kopiowanie tych plików do naszego konta. Nie był to jedyny problem z tymi plikami. 

Były one strasznie zaśmiecone, dodanie metadanych, które pozwalały traktować je jako tabelę rozpoznawało około 2k kolumn. Pojawiały się kolumny typu: session, session_1, session_1_2, session_string, session_string_1. W każdej z tych kolumn mogło się znaleźć potrzebne id sesji dla profilu. Co wymagało dogłębnego przeanalizowaniu tych wszystkich kolumn czy na pewno tam nie ma potrzebnych dla profilu informacji.

Dobra separacja logiczna poszczególnych części systemu

W rozwijaniu naszego rozwiązania Data Lake stosowaliśmy AWS CDK – bibliotekę polecaną przez AWS, umożliwiającą definiowanie infrastruktury za pomocą kodu (obecnie wspierane języki to JavaScript, TypeScript, Python, Java i C#), budowanie szablonów Cloudformation na podstawie wspomnianego kodu oraz ich deployment. Początkowy wszystkie zasoby AWS-owe definiowaliśmy w jednym stacku Cloudformation, jednak w miarę jak projekt się powiększał, pojawiła się potrzeba logicznego podziału kodu, który poprawi jego czytelność oraz ułatwi development. Zdecydowaliśmy się na rozdzielenie stacku Cloudformation na kilka mniejszych, z dokładnie sprecyzowanymi funkcjami:

  • stack z rolami i użytkownikami – wspólny dla wszystkich deploymentów, w którym tworzeni są użytkownicy oraz role IAM, używane przez zewnętrzne seriwsy lub zewnętrznych użytkowników;
  • stack główny – w nim tworzone są wewnętrzne role używane przez serwisy takie jak Glue, LakeFormation, EMR lub EC2. Ponadto tworzone są także buckety S3, VPC, notyfikacje za pomocą SNS, grupy robocze Amazon Athena oraz główna baza danych w katalogu Glue;
  • stacki związane z poszczególnymi źródłami danych – tutaj tworzone są tabele Glue dotyczące poszczególnych źródeł danych, jak również całe przepływy danych (workflow) Glue, zawierające odpowiednie Triggery, Joby i Crawlery – wszystko potrzebne do przetwarzania danych i udostępnienia ich zewnętrznym serwisom;
  • stack związany z monitoringiem – tu tworzony jest dashboard w usłudze Cloudwatch, na którym prezentowane są wybrane metryki ważne dla Data Lake. Ponadto w tym stacku definiowane są też Event Rules Cloudwatcha, dzięki którym jesteśmy notyfikowani jeżeli jakiś Job lub Crawler zakończy swoje działanie z błędami;
  • stack związany ze zmiennymi MWAA – w pewnym momencie projektu zaszła potrzeba używania usługi MWAA (Managed Workflows for Apache Airflow), niedostępnej w tym samym regionie, w którym deployowaliśmy Data Lake. Ten stack ma za zadanie tworzyć parametry w usłudze Systems Manager, za pomocą których mogliśmy przekazywać zmienne dotyczące zasobów zdeployowanych w Data Lake do innego regionu, w którym działała usługa MWAA;
  • stack związany z nieużywanymi funkcjonalnościami – stack potrzebny do deployowania pewnych zmian do istniejących już stacków, więcej szczegółów na jego temat będzie opisanych poniżej.

Tworzenie wielu stacków w jednej aplikacji AWS CDK oraz przekazywanie zasobów utworzonych w jednym stacku do innego stacku jest względnie proste, niestety nie jest pozbawione problemów. Stacki są zależne od siebie, dlatego ich deployment musi zachować odpowiednią kolejność w związku z występującymi zależnościami. Zasoby są przekazywane pomiędzy stackami dzięki automatycznemu tworzeniu eksportów oraz importów Cloudformation przez AWS CDK. Jeżeli stacki są deployowane po raz pierwszy, wtedy nie sprawia to żadnego problemu. Jeśli jednak stacki już istnieją i deployment sprawia, że są uaktualniane, może się zdarzyć, że AWS CDK usiłuje usunąć eksport, który jest importowany w innym stacku. Powoduje to niestety niepowodzenie całego deploymentu.

Jak zaprezentowano na powyższym rysunku, stack B jest zależny od stacku A, więc w trakcie deploymentu najpierw zostanie zaktualizowany stack A, następnie stack B. Jeśli stack B nie potrzebuje już zasobu ze stacku A (np. bucketu S3), AWS CDK usunie z szablonu Cloudformation stacku A eksport bucketu, a z szablonu stacku B import bucketu. Jednak podczas deploymentu pierwszy będzie aktualizowany stack A i usunięcie z niego eksportu skończy się niepowodzeniem, ponieważ niezaktualizowany jeszcze stack B wciąż go potrzebuje. W tym wypadku deployment nawet nie dojdzie do etapu, na którym zaktualizuje stack B! Żeby rozwiązać ten problem, można po prostu usunąć stack B, wtedy stack A zaktualizuje się poprawnie, a następnie stack B zostanie stworzony od nowa. Na produkcji jednak może się zdarzyć, że nie możemy sobie pozwolić na usunięcie stacku, gdzie mamy już jakieś dane.

W celu rozwiązania tej zagwozdki wprowadziliśmy do tej układanki stack C (jest to stack związany z nieużywanymi funkcjonalnościami w naszym kodzie). Stack C nie posiada żadnych ważnych funkcjonalności i można w każdej chwili bezpiecznie go usunąć, a potem ponownie utworzyć od nowa. Dzięki temu jeśli stack B nie potrzebuje już zasobu ze stacku A, w pierwszym kroku dokonujemy zmiany, która usuwa import zasobu ze stacku B, natomiast dodaje go do stacku C. Deployment tej zmiany przebiegnie bez problemu, ponieważ eksport nie zostanie usunięty ze stacku A, natomiast import zniknie ze stacku B. 

W drugim kroku dokonujemy zmiany, która usuwa import zasobu ze stacku C — to pociągnie za sobą również usunięcie eksportu zasobu ze stacku A, ponieważ nigdzie nie jest on importowany. Jednak w tym wypadku przed deploymentem tej zmiany możemy bezpiecznie usunąć stack C, przecież taka jest jego rola. Wtedy eksport zostanie bezpiecznie usunięty ze stacku A, import ze stacku B został usunięty już w poprzednim kroku, a pomocniczy stack C zostanie ponownie utworzony bez importu. 

Sytuacje, w których problemy z przekazywaniem zasobu blokowały deployment były u nas dość rzadkie, zdarzyły się tylko kilka razy w ciągu całego roku pracy nad Data Lake, jednak warto być na nie przygotowanym.

Blueprinty tylko z konsoli AWS

Przy naszym podejściu, gdzie chcieliśmy wszystko mieć stawiane za pomocą kodu pojawił się problem z Blueprintami, które oferuje AWS do ściągania danych z zewnętrznych baz danych. Usługa bardzo ułatwia pracę, ponieważ nie wymaga zabaw ze sterownikami do najbardziej popularnych silników. Jednak aws cdk nie oferowało, żadnych konstruktów do tego serwisu. Przy rozpoczęciu implementacji tej funkcjonalności mieliśmy już postawione parę przepływów danych dla Glue. Postanowiliśmy wykorzystać je i rozszerzyć o możliwość użycia skryptów PySpark-a, które udostępniał AWS publicznie. Przez wrzucenie ich jako kod źródłowy job-a i stworzenia własnej abstrakcji byliśmy w stanie tworzyć “własne” Blueprinty z kodu.

Praca z AWS Glue — znajomość Pysparka mocno polecana

AWS bardzo mocno reklamuje usługę Glue jako proste rozwiązanie do tworzenia skryptów ETL, używając abstrakcji nad DataFrame Sparka — DynamicFrame. DynamicFrame z Glue jest tworem, który nie wymaga uprzednio zdefiniowanej schemy, tylko tworzy ją automatycznie na podstawie wczytywanych danych. Jest to bez wątpienia podejście ułatwiające pracę z mało ustrukturyzowanymi danymi, jednak mnóstwo rzeczy dzieje się pod spodem. Po przepisaniu skryptu napisanego pod Glue na Pysparka może się okazać, że trzeba ręcznie dodawać rzeczy, które DynamicFrame robił automatycznie nawet bez naszej wiedzy. Dlaczego jednak wspominamy tutaj o Pysparku, jeśli można skrypt całkowicie oprzeć o metody udostępniane przez DynamicFrame? Otóż okazuje się, że korzystanie tylko z DynamicFrame może być bardzo niewydajne przy transformacjach większych ilości danych. W pracy z Glue zdarzało nam się przepisywać transformacje z tych natywnych dla Glue na czystego SparkSQL-a, co zdecydowanie przyspieszało czas przeprowadzenia transformacji oraz redukowało zużycie pamięci egzekutorów Sparka.

Nie trzeba od razu porzucać możliwości oferowanych przez DynamicFrame Glue, jednak warto pamiętać o tym, że warto schodzić również do poziomu sparkowych DataFrame jeśli myśli się o optymalizacji swoich ETL-i. Z DynamicFrame można w każdej chwili przeskoczyć na DataFrame i z powrotem do DynamicFrame, w międzyczasie używając metod dostępnych dla DataFrame, żeby cache-ować pośredni wynik używany później w kilku miejscach skryptu lub równomiernie rozdzielić dane między partycjami przerabianymi przez egzekutory Sparka. Korzystamy wtedy z dobrodziejstw obu światów i można dzięki temu bardzo usprawnić swoje ETL-e oraz zredukować ich koszty, co jest naprawdę ważne w kontekście braku tanich spotów dla Jobów Glue. 

Nie można jednak zapomnieć o jednym sporym mankamencie użytkowania Glue — zapisywanie danych do partycjonowanych tabel Glue było po prostu problematyczne. Dane poprawnie zapisywały się na S3, jednak partycje nie były dodawane do Hive metastore tabeli Glue, co efektywnie sprawiało, że dane nie były widoczne w tabeli. Dodawanie partycji podczas zapisu danych działało w przypadku stosowania tabel z klasyfikacją “glueparquet”, jednak zdecydowanie zwiększało ilość zasobów zużywanych przez Job Glue. W jednym przypadku ze względu na duże problemy z wydajnością jednego z Jobów zdecydowaliśmy się (zgodnie z radą supportu AWS) po prostu dodać do naszego przepływu danych Crawler dodający nowe partycje do tabeli Glue. Mam nadzieję, że ten dość uciążliwy problem zostanie w pewnym momencie przez AWS rozwiązany.

Można też przyjąć inną strategię i stosować Glue wyłącznie jako miejsce do opisu metadanych, czyli używać katalogu danych Glue jako metastore. Podczas pracy z Data Lake jeden z naszych przepływów danych musieliśmy przenieść na usługę EMR, dzięki czemu mogliśmy używać tanich instancji typu spot. Połączenie EMR-a z katalogiem Glue było względnie bezproblemowe, jednak warto tutaj wspomnieć o kilku ważnych szczegółach, które mogą sprawić trochę problemów przy łączeniu Glue z EMR-em. Pierwszym z nich jest potrzeba nadania roli IAM wykorzystywanej przez Sparka uprawnień database_creator w AWS LakeFormation. 

Spark tworzy dla siebie bazę tymczasową global_temp, a bez odpowiednich uprawnień LakeFormation sprawi to, że job Sparkowy bardzo szybko przestanie działać, rzucając błędami związanymi z uprawnieniami. Drugi problem to pobieranie listy baz danych z katalogu Glue. Nawet jeśli chcemy tylko ustawić domyślną bazę danych na taką, z której będziemy korzystać, Spark próbuje najpierw ściągnąć listę dostępnych mu baz danych i bazy “default”, niezależnie czy ma do niej dostęp, czy nie. Jeśli nie ma dostępu, wtedy znowu job rzuca błędem. Ostatnim problemem, z którym się spotkaliśmy była potrzeba dodania lokalizacji S3 do bazy danych Glue. Jest to opcjonalny parametr, jednak okazuje się, że jeśli jest on pusty, zapisywanie do tabel Glue kończy się niepowodzeniem. Teoretycznie jako lokalizacja S3 powinien być użyty parametr ustawiony w tabeli Glue. Jeśli baza danych posiada lokalizację S3, rzeczywiście jest ona ignorowana i do zapisu wykorzystuje się lokalizację S3 tabeli. Jeśli jednak lokalizacja S3 bazy jest pusta, dostajemy błędy.

Problemy z Atheną — partycjonowanie i spora liczba małych plików

Dzięki Athenie mamy dostęp do plików z s3 i możemy o nie odpytywać za pomocą SQL, tak jakby były bazami relacyjnymi. Przy korzystaniu z Atheny płaci się za ilość danych przeskanowanych, ale można to zoptymalizować za pomocą partycji. Z poziomu użytkownika wygląda to jak dodatkowe kolumny do odpytywania, ale pod spodem jest to podział plików za pomocą folderów. Bardzo przydatnym partycjonowaniem może być za pomocą dat — czyli w UI widzimy kolumny year, month, day. Pod spodem mamy strukturę katalogów:

  • year=2020
    • month=04
      • day=01
    • month=05
  • year=2021

Dzięki takiemu podejściu Athena skanuje mniej plików, by otrzymać wynik, co jest tańsze i szybsze. Jednym z problemów, które napotkaliśmy w trakcie użytkowania tego narzędzia, były małe pliki, które tworzyły nasze skrypty. Podczas transformacji PySpark rozdziela transformacje na równoległe procesy. Przy okazji tworząc osobne pliki, które nie powinny być za małe > 128 MB, ponieważ każdy plik to dodatkowy czas na:

  • otwarcie go,
  • listowanie,
  • pobieranie metadanych,
  • wczytywanie nagłówków i słowników kompresji.

Pliki też nie mogą być zbyt duże, bo wtedy nie można ich szybko odczytywać równolegle.

Podejście GitOps przy developmencie Data Lake

Każdy programista w teorii powinien pushować swoje zmiany tak często jak to możliwe. Jeśli coś mu się stanie, to mamy pewność, że inny pracownik może przejąć jego pracę. Jednak przyjmowanie i sprawdzanie czyjejś pracy nie jest takie proste. Skąd mamy wiedzieć, czy jego kod działa? Czy można go bez problemu wypuścić na produkcję? Czy przechodzą testy? Oczywiście można to zrobić manualnie, ale nowoczesne systemy mają CI/CD. Czyli automatycznie puszczane skrypty, po pushu do repozytorium które:

  • uruchamiają testy,
  • stawiają aplikację.

Jeżeli któryś z kroków się nie powiedzie, to wiemy, że trzeba zacząć od naprawy tego.

Przy naszym systemie mogliśmy stawiać infrastrukturę z kodu, więc nic nie stało na przeszkodzie, by stawiać go z każdego brancha. Każdy programista mógł stworzyć swojego Data Lake i testować swoje zmiany na nim. Wielu programistów niestety nie czyściło po sobie postawionych stacków co powodowało, że dobijaliśmy do limitów zasobów na koncie AWS. Rozwiązaliśmy to przez skrypty kasujące, które usuwały wszystkie zasoby developerów.

Testy

W naszym systemie mieliśmy 3 rodzaje testów:

  • sprawdzające szablony Cloudformation,
  • kontrakty do tabel, z których korzystały inne zespoły,
  • weryfikacyjne danych na końcu każdego przepływu danych.

Zwykle przy unit testach sprawdzamy wynik danej funkcji, czy się zgadza z oczekiwanym rezultatem. Tutaj nie było możliwości w większości zrobić takich testów, bo szablon generował się za jednym zamachem dla całego kodu. Jednak to nie przeszkodziło nam by pisać testy. Za pomocą weryfikowania odpowiednich elementów w szablonie byliśmy w stanie testować:

  • listę uprawnień przydzielonych do ról,
  • parametry przekazywane do jobów,
  • konfigurację serwisów,

Drugi tym testów, czyli kontrakty, których jedynym zadaniem było pilnowanie odpowiednich schemat tabel, które były udostępniane dla innych zespołów. Przed stworzeniem tabel, udostępniliśmy te testy w formie JSON-ów zespołowi, który głównie korzystał z nich. Robiliśmy to, by się upewnić, że jest tam wszystko, czego potrzebują w formacie, który ułatwi im pracę. Ostatnim typem testów były weryfikacje przetworzonych danych/tabel. Opierały się na sprawdzaniu wcześniej stworzonych wymagań co do konkretnych danych, które w teorii powinny być spełnione przez nasze skrypty PySpark-owe. Było tam sprawdzane m.in.:

  • unikalność wierszy,
  • czy kolumny zawierają tylko oczekiwane wartości (np. male, female, undefined),
  • oczekiwany format dat,
  • brak null-i w danej kolumnie.

Dla kogo to jest dobre rozwiązanie?

Jeżeli potrzebujesz ściągać dane z różnych źródeł w różnych formatach i do tego z różną częstotliwością to Data Lake jest rozwiązaniem. Zwiększająca się liczba plików, która jest liczona w TB, może przysporzyć problemy, gdy chcemy przetrzymywać dane na własnych serwerach. Rozwiązania chmurowe odciążają nas z tego problemu i zapewniają bezpieczeństwo, jakim jest trzymanie plików w paru miejscach. Jest już parę firm, które proponują swoje rozwiązania i integrację z chmurami. 

Wybór jest bardzo szeroki, jednak  nie każdy pozwala na stworzenie całej infrastruktury z kodu. Wyklikana architektura z pewnością będzie działać, jednak wgryzienie się w jej wszystkie konfiguracje może być niemożliwe. Data Lake to nie jest rozwiązanie na wszystkie bolączki związane z big data. Ta koncepcja jest dla konkretnych problemów i nieprzemyślane używanie jej może rodzić wiele problemów.


Zdjęcie główne artykułu pochodzi z unsplash.com.

Zapraszamy do dyskusji

Patronujemy

 
 
More Stories
Elixir language - język funkcyjny w 2020 iEx
Elixir – funkcyjny język José Valima w 2020