0% found this document useful (0 votes)
24 views9 pages

Streams

Download as pdf or txt
Download as pdf or txt
Download as pdf or txt
You are on page 1/ 9

07/07/2023

ADVANCED PROGRAMMING

CONTENTS

• Streams
• Mappers
• Reducers
• Collectors

https://www.gartner.com/en/documents/3906723/stream-processing-the-new-data-processing-paradigm

1
07/07/2023

STREAM<T>

• A sequence of elements supporting


(sequential and parallel) aggregate operations.

• Provides compact notation of pipelined processing of a series of values

• Conceptually similar to:


• InputStream and OutputStream in Java.IO
• Queue in Java.util

see: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/stream/Stream.html

STREAMS

2
07/07/2023

STREAM<T>

intermediate
values terminal
stream …
source operation
operations

7.3, 4.5, 9.1, 8.0, 7.3, 1.2, 4.5, 9.1, 8.0, 1.2 7.3, 9.1, 8.0, 7.3, 9.1, 8.0
List<double> Stream<double> Stream<double> 6
grades .stream() .filter(x à x >= 5.5) .count() 6
Stream<double> Stream<double> int
List<double>

int numPassed = grades.stream()


.filter( x à x >= 5 )
.count();

STREAMS ARE LAZY

• Computation on the source data is only performed when the terminal operation is initiated.

• Source elements are consumed only as needed.


• Preferably, one element passes the complete pipeline before the next is polled.
• Enhanced compute performance may be observed (due to CPU-cache affinity).

• Memory footprint of most provided stream operations is O(1)


• ensuring that large data sources can be processed with low memory footprint.

3
07/07/2023

METHOD-CHAINING IN STREAMS

• Stream operations can be ‘method-chained’: int numPassed = grades● stream()


(a.k.a. ‘dot-chaining’) ● filter( x à x >= 5 )
● count();

• This contributes to
• flexibility of applying combinations of operations in any order
• brevity of code
• readability of code

See also: https://en.wikipedia.org/wiki/Method_chaining


https://martinfowler.com/bliki/FluentInterface.html

EVERY COLLECTION CAN BE STREAMED

• The interface provides default implementations


of stream() and parallelStream()

• parallelStream() will try to leverage multi-threading


on multiple processor cores to speed-up execution
• This requires all code to be ‘thread-safe’
• This is a topic for courses on ‘Parallel Computing’
(do not use parallel streams for now…)

4
07/07/2023

STREAM OPERATIONS:
MAPPERS AND REDUCERS

STREAM OPERATIONS:
INTERMEDIATE MAPPERS AND FINAL REDUCERS
• mapper: • reducer:
selects and transforms values aggregates/combines values
• .map(Function<T,R>) • .anyMatch(Predicate<T>),
.mapToInt(), .mapToDouble() .allMatch(...), .noneMatch(…)
.filter(Predicate<T>) .sum()
.distinct() .count()
.limit(maxSize) .reduce(BinaryOperator<T>)
.flatMap(Function<T,Stream<R>>

• delivers a data value


• delivers another Stream

10

10

5
07/07/2023

EXAMPLE

• Can you check whether we have books without


an author ?

start
Stream<Book>

reducer Predicate<Book>

11

11

EXAMPLE

• Calculate the sum of pages of all books:

mapper:
get numPages from
each processed book

instance method
reducer: for ToIntFunction<Book>
adds all numbers of pages

12

12

6
07/07/2023

EXAMPLE

• Calculate the maximum number of pages in any


of the books:

get numPages from


each processed book
empty streams
have no max

default for
a missing Optional

13

13

see also: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/Optional.html


OPTIONAL<T>

• Some reducer methods deliver Optional<T> values.


• If a Stream is empty, or has become empty because
of a filter, the Optional<T> result will also be empty

Optional<T>

method result if value is available result if no value is available


get() existing value of type T NoSuchElementException
orElse(defaultValue) existing value of type T defaultValue of type T
isEmpty() false true

14

14

7
07/07/2023

GENERAL PURPOSE .REDUCE METHOD


sum = 4 + 7 + 1 + 2
• Implements incremental data aggregation.
= 11 + 1 + 2
• Requires an associative BinaryOperation<T> = 12 + 2
= 14

reducer result type


reduce( Optional<T> reduction starts empty
BinaryOperator<T> accumulator)
reduce(T identity, T uses identity as the start value
BinaryOperator<T> accumulator) (e.g. identity=0 for sum reduction)
reduce(U identity, U integrates mapping and accumulation
BiFunction<U, ? super T, U> accumulator from type T to type U
BinaryOperator<T> combiner

15

15

EXAMPLE

• Find the book with the longest title:

BinaryOperator<Book>
which each time retains the book
get the final surviving book with the longest title
from Optional<Book>

16

16

8
07/07/2023

STREAM<BOOK> REDUCER

• Find the book with the longest title

Book-abc
Book-defgh
Book-i
Book-jk Book-lmnopq
Book-jk Book-defgh
Book-abc
Book-i
Book-lmnopq
Set<Book> Stream<Book> Optional<Book>

books .stream() .reduce((t1,t2) à ... ? t1 : t2) .get()


Stream<Book> Optional<Book> Book
Set<Book>

17

17

You might also like