Zastosowanie Stream API z Java 8. Przykłady

Collect

Z collect() zetknęliśmy się w jednym z poprzednich przykładów. Bardzo często, gdy skończymy „przetwarzanie” strumienia chcielibyśmy przyjrzeć elementom strumienia zwróconego w postaci innej struktury danych np. List, Set lub Map. Z pomocą przychodzi nam niezwykle użyteczna metoda collect(), która wykonuje szereg operacji („przepakowanie” elementów strumienia do innej struktury danych i wykonanie dodatkowych operacji na elementach np. konkatenacji). 

Metoda collect() korzysta z Collector, na który to z kolei składają się cztery „składniki” w postaci supplier, accumulator, combiner i finisher (tu znów angielska nomenklatura). Brzmi to może mało zachęcająco, ale Java 8 dostarcza wielu wbudowanych kolektorów przez klasę Collectors, by ułatwić nam życie.

Na poniższym listingu chyba najbardziej powszechny przypadek użycia – dostajemy listę z „przefiltrowanymi” pracownikami pod kątem zarobków.

Jak widzimy nie jest trudno stworzyć listę (List) pracowników z elementów strumienia (przy okazji: uzyskane listy i sety są niemutowalne). Jak było to pokazane w jednym z wcześniejszych listingów, jeśli zamiast listy potrzebowalibyśmy kolekcji typu Set wystarczy, że użyjemy kolektora Collectors.toSet()

Naturalnie nic nie stoi na przeszkodzie by dokonać transformacji elementów strumienia na mapę (Map). W tym wypadku należy określić jaki atrybut elementu strumienia ma być kluczem oraz czego chcemy użyć jako wartości. Musimy pamiętać, by klucze posiadały unikalne wartości w przeciwnym razie zostanie rzucony wyjątek IllegalStateException. Na poniższym listingu jako naturalnego kandydata na wartość klucza użyto id pracownika.   

Dzięki takiej transformacji możemy mieć wgląd na każdego zatrudnionego po jego id. Warto w tym miejscu wspomnieć, że jeśli chcemy jako wartości dla klucza użyć bieżącego elementu kolekcji to jako drugiej funkcji należy użyć Function.identity(). Dodatkowo ze względu na czytelność wyniku na konsoli rozmiar strumienia i tym samym mapy został ograniczony do dwóch elementów (operacja limit). 

Collectors umożliwia nam też grupowanie elementów strumienia po wartości wybranego atrybutu. Poniższy listing grupuje nam pracowników po dywizji/departamencie, w którym są zatrudnieni. Naturalnie grupowania takiego moglibyśmy także dokonać ze względu na zarobki albo wymiar etatu.

Inną, mniej oczywistą możliwością zastosowania grupowania jest np. podzielenie kolekcji wejściowej (List lub Set) na szereg kolekcji o mniejszym rozmiarze składających z tych samych elementów co kolekcja wejściowa. Na poniższym przykładowym listingu użyto kolekcji składającej się z obiektów typu Integer dla prostoty przykładu.

Jak widzimy wejściowa kolekcja integers została podzielona na mniejsze kolekcje, których rozmiar nie przekraczał wartości wskazywanej przez parametr chunkSize. W moim przypadku tego typu zabieg np. pozwolił na „obejście” limitu Oracle’a związanego z ilością elementów w klauzuli IN (ORA-01795: maximum number of expressions in a list is 1000).   

Nie jest to jedyna z możliwości Collectors, możemy się pokusić na przykład o złożone statystyki dotyczącą zarobków poszczególnych pracowników – kolektor może nam zwrócić wbudowany obiekt z takimi statystykami jak na poniższym listingu.

Oczywiście kolektor też może dostarczyć np. średniej zarobków, jeśli nie potrzebujemy statystyki w postaci wbudowanego obiektu. Wystarczy że użyjemy kolektora jak poniżej.

Kolejną funkcją kolektora, o której warto wspomnieć jest joining(). Na poniższym listingu zobaczmy jak Collectors.joining() działa.

Jak widzimy Collectors.joining() łączy elementy strumienia w jeden łańcuch znakowy używając podanego przez nas delimitera oraz jako opcjonalnych parametrów prefix („Employess with id’s: ”) oraz suffix („work full-time”). W powyższym przykładzie wykorzystaliśmy Collectors.joining() do wydrukowania identyfikatorów pracowników zatrudnionych na pełen etat.  

FlatMap

Jak wiemy strumień może składać się z obiektów o bardziej złożonej strukturze danych, przykładowo może to być strumień zdefiniowany jako Stream<List<Integer>>, gdzie tego typu strumień stworzyliśmy dzieląc kolekcję wejściowej (List lub Set) na szereg kolekcji o mniejszym rozmiarze z wykorzystaniem operacji grupowania (Collectors.groupingBy()). 

Czasem jednak zachodzi potrzeba uproszczenia takiej struktury i przekształcenia takiego strumienia w np. Steram<Integer>. Innymi słowy zamiast strumienia w postaci […[2, 4, 6], [8, 10, 12], [14, 16]…] chcemy uzyskać strumień w postaci  […2, 4, 6, 8, 10, 12, 14, 16…]. W tym wypadku flatMap() okazuje się przydatną operacją by taką strukturę uprościć i tym samym ułatwić dalsze operacje na elementach strumienia. Poniższy listing pokazuje przykładowe zastosowanie flatMap() , by uzyskać pożądany efekt.  

Reduce

Operacje redukcji łączą wszystkie elementy strumienia w jeden wynikowy element, czyli zwracają „odpowiedź” ze strumienia danych. Redukcje są operacjami typu terminal – „redukują” strumień do wartości nie będącej strumieniem. Przykładami operacji redukcji są np. findFirst(), min(), max() – wszystkie zwracają jako wynik Optional<T> jako iż ten typ Optional jest znacznie lepszym sposobem wskazania iż zwrócona wartość jest pusta np. gdy dany strumień był pusty. W ogólności operacje redukcji możemy przedstawić jako:

T reduce(T identity, BinaryOperator<T> accumulator)

ZOBACZ TEŻ:  Java czy C++? Wady i zalety obu języków programowania

gdzie identity jest wartością początkową redukcji lub wartością domyślną, gdy w strumieniu nie ma żadnych elementów, accumulator zaś jest funkcją (BiFunction), która posiada dwa formalne parametry. Pierwszy to cząstkowy wynik redukcji oraz kolejny element strumienia. Tyle definicji. Na poniższym listingu zsumujemy zarobki wszystkich zatrudnionych na pełen etat używając reduce() na naszym strumieniu.

W powyższym przykładzie można było użyć specjalizowanego strumienia DoubleStream i wykonać operację DoubleStream.sum(), by osiągnąć dokładnie ten sam efekt. 

W kolejnym przykładzie na poniższym listingu wykorzystamy operację redukcji z użyciem akumulatora (BiFunction), gdzie obydwa operandy są tego samego typu. Porównujemy zarobki osób zatrudnionych na pełny etat i zwracamy dane pracownika z najwyższą pensją.

Dla zainteresowanych w Java 9 dodatkowo dla Stream API pojawiają się takeWhile, dropWhile oraz iterate, gdzie możemy użyć trzech argumentów. Więcej o omawianych rozszerzeniach w Steam API przeczytasz tutaj.

Parallel Streams

W wypadku strumieni, które zawierają dużą ilość elementów do przetworzenia, operacje na strumieniu w celu zwiększenia wydajności mogą być zostać „zrównoleglone” – innymi słowy mogą być jednocześnie wykonywane przez kilka wątków. By taki efekt osiągnąć potrzebujemy przede wszystkim tzw. parallel stream’u. Strumień taki możemy uzyskać z jakiejkolwiek kolekcji używając metody Collection.parallelStream() w kontradykcji do sekwencyjnego strumienia danych utworzonego poprzez Collection.stream()

Założeniem oczywiście jest, że operacje wykonywane równolegle na strumieniu dadzą taki sam rezultat jakby były wykonywane sekwencyjnie. Odpowiedzialnością programisty jest by każda funkcja przekazana do operacji na strumieniu mogła być „bezpiecznie” wykonana z punktu widzenia wielowątkowości (spójność danych – race condition).

Jeśli wykonamy kod z poniższego listingu, który na pierwszy rzut oka wygląda na poprawny, to będziemy mieli do czynienia z race condition. Po każdym wykonaniu kodu nasza lista pensji, każdorazowo drukowana w ramach operacji forEach() będzie wyglądać inaczej po każdym uruchomieniu metody. W ramach ćwiczenia możemy zastąpić parallelStream() metodą stream() i zobaczyć jaki będzie miało to efekt, gdy będziemy poniższy kod uruchamiali wiele razy.

W momencie gdyby kolekcja salaries nie była zsynchronizowana, to efektem działania byłby jedynie zrzucony wyjątek w postaci  ConcurrentModificationException (jak na poniższym listingu) spowodowany modyfikacją kolekcji, w momencie gdy kolejny wątek próbuje trawersować tę kolekcję używając jej iteratora – System.out.println(salaries)

Powyższe przykłady pokazują iż w wypadku stosowania strumieni równoległych (przetwarzanych wielowątkowo) pojawiają się dodatkowe aspekty, na które musimy zwrócić uwagę – pierwsze co się rzuca w oczy to operacje wykonywane równolegle (w oddzielnych wątkach) nie modyfikują współdzielonych zasobów. 

Przy okazji omawiania strumieni równoległych warto przyjrzeć się jak działa takie równoległe przetwarzanie. Strumienie te wykorzystują ForkJoinPool, w Java 8 jest to domyślna pula wątków, którą wykorzystuję się do realizacji zadań, które można podzielić. Być może bardziej namacalnie zobaczyć jak zrównoleglone przetwarzanie działa w kontekście wielowątkowości/strumieni równoległych spójrzmy na poniższy listing.

Patrząc na powyższy listing, a w zasadzie na to co pojawiło na konsoli widzimy, które wątki zostały użyte do wykonania operacji na strumieniu. Jak widzimy zostały wykorzystane 4 wątki z puli ForkJoinPool. To co zostało wydrukowane na konsole może po każdorazowym uruchomieniu się różnić z racji tego iż przydzielanie wątków z puli jest niedeterministyczne (o problemach z paralles stream przeczytasz tutaj).

Podsumowanie

Mam nadzieję iż ten tutorial trochę przybliżył tematykę strumieni w Java 8. Starałem się pokazać szereg różnych operacji wspieranych przez strumienie wykorzystując też przykład wzięty z jednego z projektów. W przykładach starałem się używać wyrażeń lambda oraz tzw. pipelines (szeregi operacji), by kod był możliwie najbardziej zwarty. 

Wspomnieliśmy także o charakterystycznych właściwościach strumieni jak laziness (albo jak kto woli lazy evaluation), a także o strumieniach równoległych. W kontekście wspomnianej charakterystyki powiedzieliśmy sobie czym są operacje typu pośredniego i końcowego (intermediate operations oraz terminal operations)

W zamieszczonych przykładach starałem się użyć najczęściej wykonywanych operacji na strumieniach oraz tych bardziej zaawansowanych. Mam nadzieję iż tutorial ten okaże się użytecznym i pomocnym oraz, że dobór i prezentacja materiału i przykładów nie okazały się dla Was drogą przez mękę, a sprawiły trochę frajdy. I tak już na zakończenie pozostaje mi życzyć miłego kodowania. 


Zdjęcie główne artykułu pochodzi z unsplash.com.

Zapraszamy do dyskusji

Patronujemy

 
 
More Stories
Zalety i wady korzystania z TensorFlow w środowisku produkcyjnym