Backend

Zastosowanie Stream API z Java 8. Przykłady

Java Stream API jest jedną z głównych funkcjonalności dodanych w Java 8. Nie należy mylić Java 8 Streams z I/O Streams – jeden z drugim nie mają za wiele wspólnego. Java 8 Stream raczej opakowuje źródło danych umożliwiając wykonanie operacji na wspomnianym źródle danych. Stream API dostarcza szeregu metod, gdzie przetwarzanie poszczególnych elementów (np. kolekcji czy tablicy) staje się łatwe i szybkie.

Piotr Goławski. Senior Java Developer w Bosch Polska. Jego pierwsze poważne zadania programistyczne pojawiły się w 2002 roku w ramach projektów dla sektora bankowego (projekt do zarządzania przetwarzaniem płatności krajowych i zagranicznych czy też projekty wdrażające usługi cash management dla największych klientów banku). Potem były to duże projekty dla firm z branży FMCG oraz rozwiązania e-commerce dla zagranicznych instytucji finansowych oparte na technologiach Java. Miał okazję również trenować początkujących programistów na szkoleniach z Java.


Należy zaznaczyć iż strumienie jako takie nie przechowują danych i w tym sensie nie są żadną strukturą, ponadto też nie modyfikują/zmieniają źródła, na którym operują. Java 8 Stream API w znaczący sposób wykorzystuje/wspiera funkcyjny styl programowania przy wykonywaniu operacji na poszczególnych elementach strumienia np. w wypadku transformacji elementów kolekcji.

Jak to działa

Strumień w kontekście Java API zawiera sekwencje elementów, i jak wyżej wspomniałem, umożliwia wykonanie różnych operacji na elementach strumienia. Poniżej utworzymy przykładowy strumień z kilkoma elementami i powiemy trochę więcej na temat tworzenia strumieni, ich charakterystyki oraz operacji na elementach strumienia. Pokażemy tym samym, co z taką sekwencją elementów w strumieniu możemy zrobić.

Stream<String> namesStream = Stream.of("John", "Marry", "George", "Paul", "Alice", "Ann");

        namesStream
                .filter(e -> e.startsWith("A"))
                .map(String::toUpperCase)
                .sorted()
                .forEach(System.out::println);

Do utworzenia strumienia została użyta statyczna metoda Stream.of. Metoda ta posiada parametr varargs toteż możemy stworzyć w ten sposób strumień składający się z dowolnej liczby elementów. Warto podkreślić, że w Java 8 dodano nową metodę stream() do interfejsu Collection, stąd też taki strumień można w następujący sposób wykonać:

List<String> namesList = Arrays.asList("John", "Marry", "George", "Paul", "Alice", "Ann");

        namesList
                .stream()
                .filter(e -> e.startsWith("A"))
                .map(String::toUpperCase)
                .sorted()
                .forEach(System.out::println);

Oczywiście w obu wypadkach rezultat otrzymany w wyniku działania kodu będzie ten sam:

ALICE
ANN

Teraz kilka słów na temat typów operacji na elementach strumienia. Generalnie możemy je podzielić na operacje pośrednie i końcowe (intermediate operations oraz terminal operations), gdzie w dalszej części będę używał angielskich odpowiedników nazw typów operacji. Operacje typu terminal są operacjami, które w wyniku działania nic nie zwracają (void), bądź też wynik działania takiej operacji nie jest już strumieniem.

Operacje typu intermediate w wyniku działania nadal zwracają strumień w postaci sekwencji elementów, stąd też takie operacje można ze sobą łączyć bez używania średnika w kodzie. W naszym przykładzie filter, map oraz sorted są operacjami typu intermediate natomiast forEach jest operacją typu terminal.

Dodatkową oraz ważną cechą operacji typu intermediate jest laziness (tu też będę się trzymał angielskiego terminu, bo tłumaczenie na lenistwo nie najlepiej brzmi w tym kontekście). Żeby to wyjaśnić spójrzmy na poniższy przykład, gdzie nie pojawia się operacja typu terminal.

List<String> namesList = Arrays.asList("John", "Marry", "George", "Paul", "Alice", "Ann");

        namesList
                .stream()
                .filter(e -> {
                    System.out.println("filter: " + e);
                    return true;
                });

W wyniku działania powyższego kodu nic nie zostanie wydrukowane na konsoli, ponieważ operacje typu intermediate zostaną wykonane tylko wtedy, gdy pojawi się operacja typu terminal forEach jak na poniższym przykładzie:

List<String> namesList = Arrays.asList("John", "Marry", "George", "Paul", "Alice", "Ann");

        namesList
                .stream()
                .filter(e -> {
                    System.out.println("filter: " + e);
                    return true;
                })
                .forEach(e -> System.out.println("forEach: " + e));

Tym razem na konsoli pojawi się wynik, którego się spodziewamy:

filter: John
forEach: John
filter: Marry
forEach: Marry
filter: George
forEach: George
filter: Paul
forEach: Paul
filter: Alice
forEach: Alice
filter: Ann
forEach: Ann

Dzięki temu podejściu streamy mogą optymalizować ilość wywołań poszczególnych metod (więcej na ten temat tutaj).

Strumienie „specjalizowane”

Tak dla przypomnienia map() może nam dostarczyć nowy strumień po wykonaniu określonej operacji dla każdego elementu oryginalnego strumienia. Nowy strumień może być strumieniem innego typu w porównaniu z oryginalnym, by krócej to ująć metoda map() przekonwertuje obiekty w strumieniu z jednego typu na inny typ.

Warto też wspomnieć o metodzie filter(), która dostarcza nam „przefiltrowany” strumień zawierający tylko elementy z oryginalnego strumienia, spełniające określony warunek, który został wyspecyfikowany przez Predicate.

Jak wynika z powyższego przykładu Stream (rozumiany jako obiekt) jest strumieniem (sekwencją) referencji do obiektów w naszym akurat wypadku typu String. W ogólności strumienie mogą być tworzone z różnego rodzaju źródeł danych, głównie z kolekcji – poprzez wspomnianą wyżej metodą stream() dodana do interfejsu Collection.

Poza strumieniami składającymi się z „regularnych” obiektów Java 8 dostarcza specjalizowanych strumieni działających na prymitywnych typach danych typu int, long czy też double. Są to IntStream, LongStream oraz DoubleStream. Te specjalizowane strumienie okazują się przydatne, gdy mamy do czynienia z dużą ilością danych numerycznych.

Jeśli spojrzymy na API Java 8 to okaże się, że zarówno IntStream, jak i LongStream oraz DoubleStream nie dziedziczą po interfejsie Stream, lecz po BaseStream, które jest też interfejsem nadrzędnym w hierarchii dziedziczenia w stosunku do Stream. Istotną konsekwencją tego faktu jest to iż nie wszystkie operacje/metody z API interfejsu Stream są wspierane przez implementacje dla IntStream, LongStream oraz DoubleStream.

Dla przykładu, gdy spojrzymy na min() oraz max() dla Stream, dowiemy się, że metody te jako parametru formalnego używają comparatora, zaś w wypadku strumieni specjalizowanych parametr tego typu się nie pojawia. Dodatkowo warto zauważyć iż wspomniane wyżej strumienie specjalizowane wspierają dodatkowo operacje agregujące typu terminal tj. sum() oraz average().

Przykładowo by utworzyć IntStream możemy użyć metody mapToInt() dla istniejącego strumienia.

List<String> strings = Arrays.asList("a1", "a2", "b3", "b4", "c5", "c6");

        strings
              .stream()
              .map(string -> string.substring(1))
              .mapToInt(Integer::parseInt)
              .average()
              .ifPresent(System.out::println); // 3.5

W powyższym przykładzie użyłem operacji agregującej average() typu terminal. Wynik jej działania to, jak łatwo wywnioskować po nazwie, średnia arytmetyczna wyliczona dla wszystkich elementów typu int w strumieniu.

Możemy też w tym celu użyć statycznej metody of() by stworzyć IntStream:

IntStream.of(1, 2, 3, 4, 5, 6)

lub też posłużyć statyczną metodą range():

IntStream.range(1, 6)
                .forEach(System.out::println);

        // 1
        // 2
        // 3
        // 4
        // 5

Jak widzimy pierwsza wartość jest inclusive, zaś ostatnia exclusive.

Ponowne użycie strumieni w Java 8

Strumienie w Java 8 nie mogą być ponownie użyte. Wywołanie jakiejkolwiek operacji typu terminal dla strumienia spowoduje wyjątek illegalStateExcpetion, jak na poniższym przykładzie.

        Stream<String> namesStream =
              Stream.of("John", "Marry", "George", "Paul", "Alice", "Ann");

        Predicate<String>  hasName = name -> name.equals("Alice");

        namesStream.anyMatch(hasName);  // ok
        namesStream.noneMatch(hasName); // exception

        // Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
        // at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
        // at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
        // at bosch.com.article.StreamApiDemo.fourthDemo(StreamDemo.java:88)
        // at bosch.com.article.StreamApiDemo.main(StreamDemo.java:488)

W powyższym przykładzie użyto dwóch operacji typu terminal anyMatch() oraz noneMatch(). Wywołanie pierwszej z nich (anyMatch()) spowodowało „zamknięcie” strumienia, a wywołanie kolejnej na „zamkniętym” strumieniu skutkowało wygenerowaniem wyjątku.

By takie ograniczenie w pewien sposób obejść po każdej operacji typu terminal wykonanej na strumieniu należałoby utworzyć nowy strumień. W naszym wypadku możemy skorzystać z interfejsu Supplier, który został wprowadzony wraz z Java 8 (pakiet java.util.function). Interfejs ten reprezentuje funkcję, która nie używa żadnego argumentu, a wynikiem jej działania jest „wyprodukowanie” wartości typu T. Jedyną metodą zdefiniowaną dla tego interfejsu jest bezargumentowa metoda get() – interfejs ten wspiera programowanie funkcyjne.

Na poniższym przykładzie zastosowanie tego interfejsu dla naszych potrzeb:

Supplier<Stream<String>> namesStreamSupplier =
                () -> Stream.of("John", "Marry", "George", "Paul", "Alice", "Ann");

        Predicate<String>  hasName = name -> name.equals("Alice");

        namesStreamSupplier.get().anyMatch(hasName);  // ok
        namesStreamSupplier.get().noneMatch(hasName); // ok

Jak widzimy każdorazowe wywołanie metody get() dostarcza/produkuje nam „nowy” strumień i bez problemu na takim strumieniu możemy wykonać operację typu terminal bez obaw iż zostanie rzucony wyjątek.

Zaawansowane operacje na strumieniach

Jeśli spojrzymy na Stream API Javy 8 to zauważymy jak wiele operacji (metod) jest wspieranych przez ten interfejs. W powyższych przykładach zostały wykorzystane jedne z najważniejszych i chyba najczęściej stosowanych w postaci map() i filter().

W poniższych przykładach będę korzystał do celów prezentacji z następującej definicji listy zatrudnionych. Dla przejrzystości kodu pominięte zostały settery i gettery w poniższym listingu.

public class Employee {
    private int id;
    private double salary;
    private String division;
    private DayJob dayJob;

    public Employee(int id, double salary, String division, DayJob dayJob) {
        this.id = id;
        this.salary = salary;
        this.division = division;
        this.dayJob = dayJob;
    }
    
    public enum DayJob {
        FULL_TIME("Full-time job"),
        PART_TIME("Part-time job");

        String dayJobDescription;

        DayJob(String dayJobDescription) {
            this.dayJobDescription = dayJobDescription;
        }
    }

    @Override
    public String toString() {
        final StringBuilder stringBuilder = new StringBuilder("Employee {");
        stringBuilder.append(" nid: ").append(this.id);
        stringBuilder.append(", nsalary: ").append(this.salary);
        stringBuilder.append(", ndivision: ").append(this.division);
        stringBuilder.append(", ndayJob: ").append(this.dayJob);
        stringBuilder.append("}");

        return stringBuilder.toString();
    }
List<Employee> employees =
                Arrays.asList(
                        new Employee(1, 2000d, "Risk Department", Employee.DayJob.FULL_TIME),
                        new Employee(2,2500d, "Scoring Department", Employee.DayJob.FULL_TIME),
                        new Employee(3,2600d, "Scoring Department", Employee.DayJob.FULL_TIME),
                        new Employee(4,2700d, "Credit Department", Employee.DayJob.FULL_TIME),
                        new Employee(5,2700d, "Credit Department", Employee.DayJob.PART_TIME)
                );

Poniżej przykład konwersji strumienia danego typu na strumień innego typu – tutaj przekonwertujemy strumień typu Employee (obiektów typu Employee) na strumień typu String zawierający nazwy poszczególnych departamentów, w których są zatrudnieni pracownicy.

employees
     .stream()
     .map(employee -> employee.getDivision())
     .forEach(System.out::println);

        // Risk Department
        // Scoring Department
        // Scoring Department
        // Credit Department
        // Credit Department

Poniższy listing filtruje oryginalny strumień pod kątem pracowników, których pensja jest większa niż 2600.

employees
     .stream()
     .filter(employee -> employee.getSalary() > 2600d)
     .forEach(employee -> System.out.println(employee.toString()));

         // Employee {
         //      id: 4,
         //      salary: 2700.0,
         //      division: Credit Department,
         //      dayJob: FULL_TIME}
         // Employee {
         //     id: 5,
         //     salary: 2700.0,
         //     division: Credit Department,
         //     dayJob: PART_TIME}

Przejdźmy do nieco bardziej zaawansowanych operacji na strumieniach.

Collect

Z collect() zetknęliśmy się w jednym z poprzednich przykładów. Bardzo często, gdy skończymy „przetwarzanie” strumienia chcielibyśmy przyjrzeć elementom strumienia zwróconego w postaci innej struktury danych np. List, Set lub Map. Z pomocą przychodzi nam niezwykle użyteczna metoda collect(), która wykonuje szereg operacji („przepakowanie” elementów strumienia do innej struktury danych i wykonanie dodatkowych operacji na elementach np. konkatenacji).

Metoda collect() korzysta z Collector, na który to z kolei składają się cztery „składniki” w postaci supplier, accumulator, combiner i finisher (tu znów angielska nomenklatura). Brzmi to może mało zachęcająco, ale Java 8 dostarcza wielu wbudowanych kolektorów przez klasę Collectors, by ułatwić nam życie.

Na poniższym listingu chyba najbardziej powszechny przypadek użycia – dostajemy listę z „przefiltrowanymi” pracownikami pod kątem zarobków.

Predicate<Employee> salaryPredicate = employee -> employee.getSalary() > 2600d;

        List<Employee> filteredEmployeeList =
                employees
                      .stream()
                      .filter(salaryPredicate)
                      .collect(Collectors.toList());

Jak widzimy nie jest trudno stworzyć listę (List) pracowników z elementów strumienia (przy okazji: uzyskane listy i sety są niemutowalne). Jak było to pokazane w jednym z wcześniejszych listingów, jeśli zamiast listy potrzebowalibyśmy kolekcji typu Set wystarczy, że użyjemy kolektora Collectors.toSet().

Naturalnie nic nie stoi na przeszkodzie by dokonać transformacji elementów strumienia na mapę (Map). W tym wypadku należy określić jaki atrybut elementu strumienia ma być kluczem oraz czego chcemy użyć jako wartości. Musimy pamiętać, by klucze posiadały unikalne wartości w przeciwnym razie zostanie rzucony wyjątek IllegalStateException. Na poniższym listingu jako naturalnego kandydata na wartość klucza użyto id pracownika.

        Map<Integer, Employee> employeeMap =
                employees
                    .stream()
                    .limit(2)
                    .collect(Collectors.toMap(
                        Employee::getId,
                        Function.identity(),
                        (key1, key2) -> {throw new IllegalStateException(String.format("duplicate key value found %s", key1));}
                    ));

        System.out.println(employeeMap);

         // {1=Employee {
         //     id: 1,
         //     salary: 2000.0,
         //     division: Risk Department,
         //     dayJob: FULL_TIME},
         //  2=Employee {
         //     id: 2,
         //     salary: 2500.0,
         //     division: Scoring Department,
         //     dayJob: FULL_TIME}}

Dzięki takiej transformacji możemy mieć wgląd na każdego zatrudnionego po jego id. Warto w tym miejscu wspomnieć, że jeśli chcemy jako wartości dla klucza użyć bieżącego elementu kolekcji to jako drugiej funkcji należy użyć Function.identity(). Dodatkowo ze względu na czytelność wyniku na konsoli rozmiar strumienia i tym samym mapy został ograniczony do dwóch elementów (operacja limit).

Collectors umożliwia nam też grupowanie elementów strumienia po wartości wybranego atrybutu. Poniższy listing grupuje nam pracowników po dywizji/departamencie, w którym są zatrudnieni. Naturalnie grupowania takiego moglibyśmy także dokonać ze względu na zarobki albo wymiar etatu.

Map<String, List<Employee>> employeesGroupedByDivision = employees
                .stream()
                .collect(Collectors.groupingBy(employee -> employee.getDivision()));

        employeesGroupedByDivision
                .forEach((division, workers) -> System.out.println(String.format("Division: %s %s", division, workers)));

         // Division: Risk Department [Employee {
         //     id: 1,
         //     salary: 2000.0,
         //     division: Risk Department,
         //     dayJob: FULL_TIME}]
         // Division: Scoring Department [Employee {
         //     id: 2,
         //     salary: 2500.0,
         //     division: Scoring Department,
         //     dayJob: FULL_TIME}, Employee {
         //     id: 3,
         //     salary: 2600.0,
         //     division: Scoring Department,
         //     dayJob: FULL_TIME}]
         // Division: Credit Department [Employee {
         //     id: 4,
         //     salary: 2700.0,
         //     division: Credit Department,
         //     dayJob: FULL_TIME}, Employee {
         //     id: 5,
         //     salary: 2700.0,
         //     division: Credit Department,
         //     dayJob: PART_TIME}]

Inną, mniej oczywistą możliwością zastosowania grupowania jest np. podzielenie kolekcji wejściowej (List lub Set) na szereg kolekcji o mniejszym rozmiarze składających z tych samych elementów co kolekcja wejściowa. Na poniższym przykładowym listingu użyto kolekcji składającej się z obiektów typu Integer dla prostoty przykładu.

final int chunkSize = 3;
        final List<Integer> integers =
                Arrays.asList(2, 4, 6, 8, 10, 12, 14, 16);

        AtomicInteger counter = new AtomicInteger(0);

        Stream<List<Integer>> integerListStream =
                integers
                    .stream()
                    .collect(Collectors.groupingBy(integer -> counter.getAndIncrement()/chunkSize))
                    .entrySet()
                    .stream()
                    .map(Map.Entry::getValue);

        List<List<Integer>> chunkIntegersList =
                integerListStream
                .collect(Collectors.toList());

        System.out.println(chunkIntegersList);
        // [[2, 4, 6], [8, 10, 12], [14, 16]]

Jak widzimy wejściowa kolekcja integers została podzielona na mniejsze kolekcje, których rozmiar nie przekraczał wartości wskazywanej przez parametr chunkSize. W moim przypadku tego typu zabieg np. pozwolił na „obejście” limitu Oracle’a związanego z ilością elementów w klauzuli IN (ORA-01795: maximum number of expressions in a list is 1000).

Nie jest to jedyna z możliwości Collectors, możemy się pokusić na przykład o złożone statystyki dotyczącą zarobków poszczególnych pracowników – kolektor może nam zwrócić wbudowany obiekt z takimi statystykami jak na poniższym listingu.

DoubleSummaryStatistics salarySummarising =
                employees
                    .stream()
                    .collect(Collectors.summarizingDouble(employee -> employee.getSalary()));

        System.out.println(salarySummarising);
        //DoubleSummaryStatistics{count=5, sum=12500,000000, min=2000,000000, average=2500,000000, max=2700,000000}

Oczywiście kolektor też może dostarczyć np. średniej zarobków, jeśli nie potrzebujemy statystyki w postaci wbudowanego obiektu. Wystarczy że użyjemy kolektora jak poniżej.

Double averageSalary =
                employees
                        .stream()
                        .collect(Collectors.averagingDouble(employee -> employee.getSalary()));

        System.out.println("Average salary: " + averageSalary);
        // Average salary: 2500.0

Kolejną funkcją kolektora, o której warto wspomnieć jest joining(). Na poniższym listingu zobaczmy jak Collectors.joining() działa.

String fullTimeEmployees =
                employees
                      .stream()
                      .filter(employee -> employee.getDayJob().equals(Employee.DayJob.FULL_TIME))
                      .map(employee -> Integer.toString(employee.getId()))
                      .collect(Collectors.joining(" , ", "Employees with id's: ", " work full-time"));

        System.out.println(fullTimeEmployees);
        // Employees with id's: 1 , 2 , 3 , 4 work full-time

Jak widzimy Collectors.joining() łączy elementy strumienia w jeden łańcuch znakowy używając podanego przez nas delimitera oraz jako opcjonalnych parametrów prefix („Employess with id’s: ”) oraz suffix („work full-time”). W powyższym przykładzie wykorzystaliśmy Collectors.joining() do wydrukowania identyfikatorów pracowników zatrudnionych na pełen etat.

FlatMap

Jak wiemy strumień może składać się z obiektów o bardziej złożonej strukturze danych, przykładowo może to być strumień zdefiniowany jako Stream<List<Integer>>, gdzie tego typu strumień stworzyliśmy dzieląc kolekcję wejściowej (List lub Set) na szereg kolekcji o mniejszym rozmiarze z wykorzystaniem operacji grupowania (Collectors.groupingBy()).

Czasem jednak zachodzi potrzeba uproszczenia takiej struktury i przekształcenia takiego strumienia w np. Steram<Integer>. Innymi słowy zamiast strumienia w postaci […[2, 4, 6], [8, 10, 12], [14, 16]…] chcemy uzyskać strumień w postaci […2, 4, 6, 8, 10, 12, 14, 16…]. W tym wypadku flatMap() okazuje się przydatną operacją by taką strukturę uprościć i tym samym ułatwić dalsze operacje na elementach strumienia. Poniższy listing pokazuje przykładowe zastosowanie flatMap() , by uzyskać pożądany efekt.

final List<List<Integer>> slicedIntegers = Arrays.asList(
                Arrays.asList(2, 4, 6),
                Arrays.asList(8, 10, 12),
                Arrays.asList(14, 16)
        );

        final List<Integer> simpleIntegerList =
                slicedIntegers
                        .stream()
                        .flatMap(Collection::stream)
                        .collect(Collectors.toList());

        System.out.println("slicedIntegers: " + slicedIntegers);
        //slicedIntegers: [[2, 4, 6], [8, 10, 12], [14, 16]]

        System.out.println("simpleIntegerList: " + simpleIntegerList);
        //simpleIntegerList: [2, 4, 6, 8, 10, 12, 14, 16]

Reduce

Operacje redukcji łączą wszystkie elementy strumienia w jeden wynikowy element, czyli zwracają „odpowiedź” ze strumienia danych. Redukcje są operacjami typu terminal – „redukują” strumień do wartości nie będącej strumieniem. Przykładami operacji redukcji są np. findFirst(), min(), max() – wszystkie zwracają jako wynik Optional<T> jako iż ten typ Optional jest znacznie lepszym sposobem wskazania iż zwrócona wartość jest pusta np. gdy dany strumień był pusty. W ogólności operacje redukcji możemy przedstawić jako:

T reduce(T identity, BinaryOperator<T> accumulator)

gdzie identity jest wartością początkową redukcji lub wartością domyślną, gdy w strumieniu nie ma żadnych elementów, accumulator zaś jest funkcją (BiFunction), która posiada dwa formalne parametry. Pierwszy to cząstkowy wynik redukcji oraz kolejny element strumienia. Tyle definicji. Na poniższym listingu zsumujemy zarobki wszystkich zatrudnionych na pełen etat używając reduce() na naszym strumieniu.

Double salariesSumForFullTimeEmployees =
                employees
                      .stream()
                      .filter(employee -> employee.getDayJob().equals(Employee.DayJob.FULL_TIME))
                      .map(Employee::getSalary)
                      .reduce(0.0, Double::sum);

        System.out.println("Salaries sum for full-time workers: " + salariesSumForFullTimeEmployees);
        //Salaries sum for full-time workers: 9800.0

W powyższym przykładzie można było użyć specjalizowanego strumienia DoubleStream i wykonać operację DoubleStream.sum(), by osiągnąć dokładnie ten sam efekt.

W kolejnym przykładzie na poniższym listingu wykorzystamy operację redukcji z użyciem akumulatora (BiFunction), gdzie obydwa operandy są tego samego typu. Porównujemy zarobki osób zatrudnionych na pełny etat i zwracamy dane pracownika z najwyższą pensją.

employees
       .stream()
       .filter(employee -> employee.getDayJob().equals(Employee.DayJob.FULL_TIME))
       .reduce(((employee1, employee2) -> employee1.getSalary() > employee2.getSalary() ? employee1 : employee2))
       .ifPresent(System.out::println);

             // Employee {
             //     id: 4,
             //     salary: 2700.0,
             //     division: Credit Department,
             //     dayJob: FULL_TIME
             // }

Dla zainteresowanych w Java 9 dodatkowo dla Stream API pojawiają się takeWhile, dropWhile oraz iterate, gdzie możemy użyć trzech argumentów. Więcej o omawianych rozszerzeniach w Steam API przeczytasz tutaj.

Parallel Streams

W wypadku strumieni, które zawierają dużą ilość elementów do przetworzenia, operacje na strumieniu w celu zwiększenia wydajności mogą być zostać „zrównoleglone” – innymi słowy mogą być jednocześnie wykonywane przez kilka wątków. By taki efekt osiągnąć potrzebujemy przede wszystkim tzw. parallel stream’u. Strumień taki możemy uzyskać z jakiejkolwiek kolekcji używając metody Collection.parallelStream() w kontradykcji do sekwencyjnego strumienia danych utworzonego poprzez Collection.stream().

Założeniem oczywiście jest, że operacje wykonywane równolegle na strumieniu dadzą taki sam rezultat jakby były wykonywane sekwencyjnie. Odpowiedzialnością programisty jest by każda funkcja przekazana do operacji na strumieniu mogła być „bezpiecznie” wykonana z punktu widzenia wielowątkowości (spójność danych – race condition).

Jeśli wykonamy kod z poniższego listingu, który na pierwszy rzut oka wygląda na poprawny, to będziemy mieli do czynienia z race condition. Po każdym wykonaniu kodu nasza lista pensji, każdorazowo drukowana w ramach operacji forEach() będzie wyglądać inaczej po każdym uruchomieniu metody. W ramach ćwiczenia możemy zastąpić parallelStream() metodą stream() i zobaczyć jaki będzie miało to efekt, gdy będziemy poniższy kod uruchamiali wiele razy.

final List<Double> salaries = Collections.synchronizedList(new ArrayList<>());

        employees
                .parallelStream()
                .forEach(employee -> {
                    if(employee.getSalary() > 2000d) {
                        salaries.add(employee.getSalary();
                        System.out.println(salaries);
                    }
                });

        // First run:
        // [2600.0]
        // [2600.0, 2700.0, 2500.0, 2700.0]
        // [2600.0, 2700.0]
        // [2600.0, 2700.0, 2500.0]

        // Second run:
        // [2600.0]
        // [2600.0, 2700.0, 2700.0, 2500.0]
        // [2600.0, 2700.0, 2700.0]
        // [2600.0, 2700.0]

W momencie gdyby kolekcja salaries nie była zsynchronizowana, to efektem działania byłby jedynie zrzucony wyjątek w postaci ConcurrentModificationException (jak na poniższym listingu) spowodowany modyfikacją kolekcji, w momencie gdy kolejny wątek próbuje trawersować tę kolekcję używając jej iteratora – System.out.println(salaries).

final List<Double> salaries = new ArrayList<>();

        employees
                .parallelStream()
                .forEach(employee -> {
                    if(employee.getSalary() > 2000d) {
                        salaries.add(employee.getSalary());
                        System.out.println(salaries);
                    }
                });
Exception in thread "main" java.util.ConcurrentModificationException
	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
	at java.util.ArrayList$Itr.next(ArrayList.java:851)
	at java.util.AbstractCollection.toString(AbstractCollection.java:461)
	at java.lang.String.valueOf(String.java:2994)
	at java.io.PrintStream.println(PrintStream.java:821)
	at bosch.com.article.StreamApiDemo.lambda$seventeenthDemo$25(StreamApiDemo.java:9)

Powyższe przykłady pokazują iż w wypadku stosowania strumieni równoległych (przetwarzanych wielowątkowo) pojawiają się dodatkowe aspekty, na które musimy zwrócić uwagę – pierwsze co się rzuca w oczy to operacje wykonywane równolegle (w oddzielnych wątkach) nie modyfikują współdzielonych zasobów.

Przy okazji omawiania strumieni równoległych warto przyjrzeć się jak działa takie równoległe przetwarzanie. Strumienie te wykorzystują ForkJoinPool, w Java 8 jest to domyślna pula wątków, którą wykorzystuję się do realizacji zadań, które można podzielić. Być może bardziej namacalnie zobaczyć jak zrównoleglone przetwarzanie działa w kontekście wielowątkowości/strumieni równoległych spójrzmy na poniższy listing.

employees
              .parallelStream()
              .filter(employee -> {
                  System.out.println(String.format("Filter person with id %s thread [%s]"
                          , employee.getId(), Thread.currentThread().getName()));
                  return true;
              })
              .map(employee -> {
                  System.out.println(String.format("Map person id %s thread [%s]"
                          , employee.getId(), Thread.currentThread().getName()));
                  return employee.getId();
              })
              .forEach(employeeId -> System.out.println(String.format("For each employee id %s thread [%s]"
                      , employeeId, Thread.currentThread().getName())));

         // Filter person with id 4 thread [ForkJoinPool.commonPool-worker-4]
         // Filter person with id 2 thread [ForkJoinPool.commonPool-worker-1]
         // Filter person with id 3 thread [main]
         // Filter person with id 5 thread [ForkJoinPool.commonPool-worker-2]
         // Map person id 2 thread [ForkJoinPool.commonPool-worker-1]
         // Map person id 3 thread [main]
         // Map person id 5 thread [ForkJoinPool.commonPool-worker-2]
         // For each employee id 2 thread [ForkJoinPool.commonPool-worker-1]
         // For each employee id 5 thread [ForkJoinPool.commonPool-worker-2]
         // For each employee id 3 thread [main]
         // Filter person with id 1 thread [ForkJoinPool.commonPool-worker-3]
         // Map person id 4 thread [ForkJoinPool.commonPool-worker-4]
         // Map person id 1 thread [ForkJoinPool.commonPool-worker-3]
         // For each employee id 4 thread [ForkJoinPool.commonPool-worker-4]
         // For each employee id 1 thread [ForkJoinPool.commonPool-worker-3]

Patrząc na powyższy listing, a w zasadzie na to co pojawiło na konsoli widzimy, które wątki zostały użyte do wykonania operacji na strumieniu. Jak widzimy zostały wykorzystane 4 wątki z puli ForkJoinPool. To co zostało wydrukowane na konsole może po każdorazowym uruchomieniu się różnić z racji tego iż przydzielanie wątków z puli jest niedeterministyczne (o problemach z paralles stream przeczytasz tutaj).

Podsumowanie

Mam nadzieję iż ten tutorial trochę przybliżył tematykę strumieni w Java 8. Starałem się pokazać szereg różnych operacji wspieranych przez strumienie wykorzystując też przykład wzięty z jednego z projektów. W przykładach starałem się używać wyrażeń lambda oraz tzw. pipelines (szeregi operacji), by kod był możliwie najbardziej zwarty.

Wspomnieliśmy także o charakterystycznych właściwościach strumieni jak laziness (albo jak kto woli lazy evaluation), a także o strumieniach równoległych. W kontekście wspomnianej charakterystyki powiedzieliśmy sobie czym są operacje typu pośredniego i końcowego (intermediate operations oraz terminal operations).

W zamieszczonych przykładach starałem się użyć najczęściej wykonywanych operacji na strumieniach oraz tych bardziej zaawansowanych. Mam nadzieję iż tutorial ten okaże się użytecznym i pomocnym oraz, że dobór i prezentacja materiału i przykładów nie okazały się dla Was drogą przez mękę, a sprawiły trochę frajdy. I tak już na zakończenie pozostaje mi życzyć miłego kodowania.


Zdjęcie główne artykułu pochodzi z unsplash.com.

Podobne artykuły

[wpdevart_facebook_comment curent_url="https://geek.justjoin.it/zastosowanie-stream-api-z-java-8-przyklady/" order_type="social" width="100%" count_of_comments="8" ]