Ghid pentru Stream.reduce ()

1. Prezentare generală

API-ul Stream oferă un repertoriu bogat de funcții intermediare, de reducere și terminale, care acceptă, de asemenea, paralelizarea.

Mai precis, operațiunile fluxului de reducere ne permit să producem un singur rezultat dintr-o secvență de elemente , aplicând în mod repetat o operație de combinare elementelor din secvență.

In acest tutorial, ne vom uita la general scopul Stream.reduce () operarea și vedea în unele cazuri de utilizare de beton.

2. Conceptele cheie: identitate, acumulator și combinator

Înainte să ne uităm mai profund la utilizarea operației Stream.reduce () , să împărțim elementele participante la operație în blocuri separate. În acest fel, vom înțelege mai ușor rolul pe care îl joacă fiecare:

  • Identitate - un element care reprezintă valoarea inițială a operației de reducere și rezultatul implicit dacă fluxul este gol
  • Acumulator - o funcție care ia doi parametri: un rezultat parțial al operației de reducere și următorul element al fluxului
  • Combinator - o funcție utilizată pentru a combina rezultatul parțial al operației de reducere atunci când reducerea este paralelizată sau când există o nepotrivire între tipurile de argumente ale acumulatorului și tipurile de implementare a acumulatorului

3. Utilizarea Stream.reduce ()

Pentru a înțelege mai bine funcționalitatea elementelor de identitate, acumulator și combinator, să analizăm câteva exemple de bază:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int result = numbers .stream() .reduce(0, (subtotal, element) -> subtotal + element); assertThat(result).isEqualTo(21);

În acest caz, Integer valoarea 0 este identitatea. Stochează valoarea inițială a operației de reducere și, de asemenea, rezultatul implicit atunci când fluxul de valori întregi este gol.

La fel, expresia lambda :

subtotal, element -> subtotal + element

este acumulatorul , deoarece ia suma parțială a valorilor întregi și următorul element din flux.

Pentru a face codul și mai concis, putem folosi o referință de metodă, în loc de o expresie lambda:

int result = numbers.stream().reduce(0, Integer::sum); assertThat(result).isEqualTo(21);

Desigur, putem folosi o operație de reducere () pe fluxuri care dețin alte tipuri de elemente.

De exemplu, putem folosi reduce () pe o matrice de elemente String și le putem uni într-un singur rezultat:

List letters = Arrays.asList("a", "b", "c", "d", "e"); String result = letters .stream() .reduce("", (partialString, element) -> partialString + element); assertThat(result).isEqualTo("abcde");

În mod similar, putem trece la versiunea care utilizează o metodă de referință:

String result = letters.stream().reduce("", String::concat); assertThat(result).isEqualTo("abcde");

Să folosim operația reduce () pentru unirea elementelor cu majuscule ale matricei de litere :

String result = letters .stream() .reduce( "", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase()); assertThat(result).isEqualTo("ABCDE");

În plus, putem folosi reduce () într-un flux paralelizat (mai multe despre acest lucru mai târziu):

List ages = Arrays.asList(25, 30, 45, 28, 32); int computedAges = ages.parallelStream().reduce(0, a, b -> a + b, Integer::sum);

Când un flux se execută în paralel, runtime-ul Java împarte fluxul în mai multe sub fluxuri. În astfel de cazuri, trebuie să folosim o funcție pentru a combina rezultatele fluxurilor secundare într-una singură . Acesta este rolul combinatorului - în fragmentul de mai sus, este referința metodei Integer :: sum .

Destul de amuzant, acest cod nu va compila:

List users = Arrays.asList(new User("John", 30), new User("Julie", 35)); int computedAges = users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge()); 

În acest caz, avem un flux de obiecte Utilizator , iar tipurile de argumente ale acumulatorului sunt Număr întreg și Utilizator. Cu toate acestea, implementarea acumulatorului este o sumă de numere întregi, astfel încât compilatorul nu poate deduce tipul parametrului utilizator .

Putem rezolva această problemă folosind un combinator:

int result = users.stream() .reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); assertThat(result).isEqualTo(65);

Mai simplu spus, dacă folosim fluxuri secvențiale și tipurile de argumente ale acumulatorului și tipurile de implementare se potrivesc, nu este nevoie să folosim un combinator .

4. Reducerea în paralel

După cum am învățat anterior, putem folosi reduce () pe fluxuri paralelizate.

Când folosim fluxuri paralelizate, ar trebui să ne asigurăm că reduce () sau orice alte operațiuni agregate executate pe fluxuri sunt:

  • asociativ : rezultatul nu este afectat de ordinea operanzilor
  • fără interferențe : operația nu afectează sursa de date
  • apatrid și determinist : operațiunea nu are stare și produce aceeași ieșire pentru o intrare dată

Ar trebui să îndeplinim toate aceste condiții pentru a preveni rezultate imprevizibile.

Așa cum era de așteptat, operațiunile efectuate pe fluxuri paralelizate, inclusiv reduce (), sunt executate în paralel, profitând astfel de arhitecturi hardware multi-core.

Din motive evidente, fluxurile paralelizate sunt mult mai performante decât omologii secvențiali . Chiar și așa, pot fi exagerate dacă operațiunile aplicate fluxului nu sunt scumpe sau numărul de elemente din flux este mic.

Desigur, fluxurile paralelizate sunt calea corectă atunci când trebuie să lucrăm cu fluxuri mari și să efectuăm operațiuni agregate costisitoare.

Să creăm un test de referință simplu JMH (Java Microbenchmark Harness) și să comparăm timpul de execuție respectiv atunci când se utilizează operația reduce () pe un flux secvențial și paralel:

@State(Scope.Thread) private final List userList = createUsers(); @Benchmark public Integer executeReduceOnParallelizedStream() { return this.userList .parallelStream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } @Benchmark public Integer executeReduceOnSequentialStream() { return this.userList .stream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } 

In the above JMH benchmark, we compare execution average times. We simply create a List containing a large number of User objects. Next, we call reduce() on a sequential and a parallelized stream and check that the latter performs faster than the former (in seconds-per-operation).

These are our benchmark results:

Benchmark Mode Cnt Score Error Units JMHStreamReduceBenchMark.executeReduceOnParallelizedStream avgt 5 0,007 ± 0,001 s/op JMHStreamReduceBenchMark.executeReduceOnSequentialStream avgt 5 0,010 ± 0,001 s/op

5. Throwing and Handling Exceptions While Reducing

In the above examples, the reduce() operation doesn't throw any exceptions. But it might, of course.

For instance, say that we need to divide all the elements of a stream by a supplied factor and then sum them:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int divider = 2; int result = numbers.stream().reduce(0, a / divider + b / divider); 

This will work, as long as the divider variable is not zero. But if it is zero, reduce() will throw an ArithmeticException exception: divide by zero.

We can easily catch the exception and do something useful with it, such as logging it, recovering from it and so forth, depending on the use case, by using a try/catch block:

public static int divideListElements(List values, int divider) { return values.stream() .reduce(0, (a, b) -> { try { return a / divider + b / divider; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return 0; }); }

While this approach will work, we polluted the lambda expression with the try/catch block. We no longer have the clean one-liner that we had before.

To fix this issue, we can use the extract function refactoring technique, and extract the try/catch block into a separate method:

private static int divide(int value, int factor) { int result = 0; try { result = value / factor; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return result } 

Now, the implementation of the divideListElements() method is again clean and streamlined:

public static int divideListElements(List values, int divider) { return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider)); } 

Assuming that divideListElements() is a utility method implemented by an abstract NumberUtils class, we can create a unit test to check the behavior of the divideListElements() method:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21); 

Let's also test the divideListElements() method, when the supplied List of Integer values contains a 0:

List numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21); 

Finally, let's test the method implementation when the divider is 0, too:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 0)).isEqualTo(0);

6. Complex Custom Objects

We can also use Stream.reduce() with custom objects that contain non-primitive fields. To do so, we need to provide a relevant identity, accumulator, and combiner for the data type.

Suppose our User is part of a review website. Each of our Users can possess one Rating, which is averaged over many Reviews.

First, let's start with our Review object. Each Review should contain a simple comment and score:

public class Review { private int points; private String review; // constructor, getters and setters }

Next, we need to define our Rating, which will hold our reviews alongside a points field. As we add more reviews, this field will increase or decrease accordingly:

public class Rating { double points; List reviews = new ArrayList(); public void add(Review review) { reviews.add(review); computeRating(); } private double computeRating() { double totalPoints = reviews.stream().map(Review::getPoints).reduce(0, Integer::sum); this.points = totalPoints / reviews.size(); return this.points; } public static Rating average(Rating r1, Rating r2) { Rating combined = new Rating(); combined.reviews = new ArrayList(r1.reviews); combined.reviews.addAll(r2.reviews); combined.computeRating(); return combined; } }

We have also added an average function to compute an average based on the two input Ratings. This will work nicely for our combiner and accumulator components.

Next, let's define a list of Users, each with their own sets of reviews.

User john = new User("John", 30); john.getRating().add(new Review(5, "")); john.getRating().add(new Review(3, "not bad")); User julie = new User("Julie", 35); john.getRating().add(new Review(4, "great!")); john.getRating().add(new Review(2, "terrible experience")); john.getRating().add(new Review(4, "")); List users = Arrays.asList(john, julie); 

Acum, că John și Julie sunt contabilizați, să folosim Stream.reduce () pentru a calcula o evaluare medie pentru ambii utilizatori. Ca identitate , să returnăm o nouă evaluare dacă lista noastră de intrări este goală :

Rating averageRating = users.stream() .reduce(new Rating(), (rating, user) -> Rating.average(rating, user.getRating()), Rating::average);

Dacă facem calculele, ar trebui să constatăm că scorul mediu este de 3,6:

assertThat(averageRating.getPoints()).isEqualTo(3.6);

7. Concluzie

În acest tutorial, am învățat cum să folosim operația Stream.reduce () . În plus, am învățat cum să realizăm reduceri pe fluxuri secvențiale și paralelizate și cum să gestionăm excepțiile în timp ce reducem .

Ca de obicei, toate exemplele de cod afișate în acest tutorial sunt disponibile pe GitHub.