0% found this document useful (0 votes)
72 views

Froscon 2017 Final Reactive Programming in Java

The document discusses reactive programming in Java. It provides an overview of reactive streams and frameworks like RxJava 2 and Spring Reactor 3. It also includes an example of loading photo metadata and files reactively from different sources using techniques like filtering, mapping, merging, and joining streams. Reactive programming can provide performance benefits over traditional approaches by processing data asynchronously and in a non-blocking manner.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
72 views

Froscon 2017 Final Reactive Programming in Java

The document discusses reactive programming in Java. It provides an overview of reactive streams and frameworks like RxJava 2 and Spring Reactor 3. It also includes an example of loading photo metadata and files reactively from different sources using techniques like filtering, mapping, merging, and joining streams. Reactive programming can provide performance benefits over traditional approaches by processing data asynchronously and in a non-blocking manner.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 68

Reactive Programming in Java

by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH


ip.labs GmbH

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Agenda

• Reactive Programming in general

• Reactive Streams and JDK 9 Flow API

• RxJava 2

• Spring Reactor 3

• Demo of reactive application with Spring 5, Spring Boot 2, Netty, MongoDB


and Thymeleaf technology stack

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Reactive Streams

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Subscriber/Publisher

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Subscription

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Publisher Implementations

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
RxJava 2

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Classic vs. Reactive (Cold Publisher)
public List<Photo> getPhotos() {
List<Photo> result = … for (Item item : getPhoto()) {
while (...) { System.out.println(item.toString());
result.add(...); }
}
return result;
}

Observable<Photo> publisher = Observable.create(subscriber -> {


while (...) {
Photo photo = ...
subscriber.doNext(photo); publisher.subscribe(item->{
} System.out.println(item.toString());
subscriber.onCompleted(); });
});

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
RxJava 2 Basics

Observable<Customer> customers = ...


Observable<Customer> adults = customers.filter(c -> c.age > 18);

Observable<Address> addresses = adults.map(c -> c.getDefaultAddress());

Observable<Order> orders = customers.flatMap(c ->


Observable.fromArray(c.getOrders())
);

Observable<Picture> uploadedPhotos = customers.flatMap(c ->


Observable.fromArray(dao.loadPhotosByCustomer(c.getId()))
);

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Example. Print Photo Book

Fetch photo IDs from Book 0 ms

Load meta data from Database (ID, Dimentions, Source etc.) 10 x 100ms
Load images from server HDD if avaliable
90 x 50ms
Load images from source (for example Cloud Service)
10 x 500ms
Validate Image
100 x 50ms
+

User authentication & authorisation max 500ms

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Loading Meta Data. Buffer.

Observable<Integer> ids = Observable.fromIterable(book.getPhotoIDs());

Observable<List<Integer>> idBatches = ids.buffer(10);

Observable<MetaData> metadata =
idBatches.flatMap(ids -> metadataDao.loadingMetadata(ids));

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Loading Picture from HDD. FlatMap with at most a single result

Observable<PictureFile> fromDisk = ids.flatMap(id -> loadingFromDisk(id));

public Observable<PictureFile> loadingFromDisk(Integer id) {


Observable<PictureFile> result = Observable.create(s -> {
try {
s.onNext(pictureDao.loadFromHdd(id));
} catch (PictureNotFound e) {
System.out.println("Not found on disk " + id);
}
s.onComplete();
}

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Loading from Cloud. Filter

Observable<PictureFile> fromCloud =
metadata.filter(m->m.cloud).
flatMap(id -> loadingFromCloud(id.id));

metadata 1 disk-file 1 cloud-file 3


metadata 2 disk-file 2 cloud-file 5
metadata 3 disk-file 4
metadata 4
metadata 5

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Merging Cloud and HDD

Observable<PictureFile> allPictures = fromDisk.mergeWith(fromCloud);

metadata 1 disk-file 1
metadata 2 cloud-file 3
metadata 3 disk-file 2
metadata 4 disk-file 4
metadata 5 cloud-file 5

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Joining Metadata with Picture Files

Observable<Pair<MetaData, PictureFile>> pairs =


metadata.join(allPictures, (m)->lastMeta, (f)->lastFile, Pair::of);

metadata 1 + disk-file 1
metadata 2 + disk-file 1
metadata 3 + disk-file 1
metadata 2 + disk-file 2
metadata 3 + cloud-file 3

pairs = pairs.filter(p->p.fst.id == p.snd.id)

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Joining Metadata with Picture Files

Observable<Picture> pictures =
pairs.filter(p->p.fst.id == p.snd.id).map(p->new Picture(p.snd,
p.fst));

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Validating Result

Observable<Picture> validated = pictures.flatMap(p-


>validation(p));

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Adding Authentication

Observable<Boolean> auth = auth();

Observable<Pair<Boolean, Picture>> result =


Observable.combineLatest(auth, validated, Pair::of);

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Back in the Real World

BlockingObservableNext<Pair<Boolean, Picture>> b =
new BlockingObservableNext<>(result);

for (Pair<Boolean, Picture> item : b) {


System.out.println("Finished " + item.snd.id);
}

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Compare Results
Observable<Integer> ids = loadingIds();
Observable<MetaData> metadata = ids.buffer(10).flatMap(id -> loadingMetadata(id));

Observable<PictureFile> fromDisk = ids.flatMap(id -> loadingFromDisk(id));

Classic -> 20 sec


Observable<PictureFile> fromCloud = metadata.filter(m->m.cloud).flatMap(id ->
loadingFromCloud(id.id));

Observable<PictureFile> allPictures = fromDisk.mergeWith(fromCloud);

Reactor -> 1 sec


ObservableSource<MetaData> lastMeta = (l) -> metadata.takeLast(1);

ObservableSource<PictureFile> lastFile = (l) -> allPictures.takeLast(1);

Observable<Pair<MetaData, PictureFile>> pairs = metadata.join(allPictures, (m)->lastMeta, (f)-


>lastFile, Pair::of);

Observable<Picture> pictures = pairs.filter(p->p.fst.id == p.snd.id).map(p->new Picture(p.snd, p.fst));

Observable<Picture> validated = pictures.flatMap(p->validation(p));

Observable<Boolean> auth = auth();


Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor Timeline

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Big Picture

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Reactive Stream Interfaces

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Main Types

Flux implements Publisher


is capable of emitting of 0 or more items

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Main Types

Mono implements Publisher


can emit at most once item

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Publisher creation

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");


Mono<String> productTitles = Mono.just("Print 9x13");

Flux<String> productTitles = Flux.fromIterable(Arrays.asList("Print 9x13",


"Photobook A4", "Calendar A4"));
Flux<String> productTitles = Flux.fromArray(new String[]{"Print 9x13",
"Photobook A4", "Calendar A4"});
Flux<String> productTitles = Flux.fromStream(Stream.of("Print 9x13",
"Photobook A4", "Calendar A4“));

Mono.fromCallable(), Mono.fromRunnable(), Mono.fromFuture()


Flux.empty(), Mono.empty(), Flux.error(), Mono.error()

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Event subscription

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");

productTitles.subscribe(System.out::println);

Output:

Print 9x13
Photobook A4
Calendar A4

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Logging

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");


productTitles.log().subscribe(System.out::println);

Output:

INFO reactor.Flux.Array.2 - | onSubscribe()


INFO reactor.Flux.Array.2 - | request(unbounded)
INFO reactor.Flux.Array.2 - | onNext(Print 9x13)
Print 9x13
INFO reactor.Flux.Array.2 - | onNext(Photobook A4)
Photobook A4
INFO reactor.Flux.Array.2 - | onNext(Calendar A4)
Calendar A4
INFO reactor.Flux.Array.2 - | onComplete()
Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Event subscription with own Subscriber

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");


productTitles.subscribe(new Subscriber<String>() {

@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String t) {
System.out.println(t);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Event subscription with custom Subscriber with back-pressure

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");


productTitles.subscribe(new Subscriber<String>() {

private long count = 0;


private Subscription subscription;

public void onSubscribe(Subscription subscription) {


this.subscription = subscription;
subscription.request(2);
}

public void onNext(String t) {


count++;
if (count>=2) {
count = 0;
subscription.request(2);
}
}
...
Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Event subscription with custom Subscriber with back-pressure

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");


productTitles.log().subscribe(new Subscriber<String>()
{subscription.request(2);..}

Output:
INFO reactor.Flux.Array.2 - | onSubscribe()
INFO reactor.Flux.Array.2 - | request(2)
INFO reactor.Flux.Array.2 - | onNext(Print 9x13)
Print 9x13
INFO reactor.Flux.Array.2 - | onNext(Photobook A4)
Photobook A4
INFO reactor.Flux.Array.2 - | request(2)
INFO reactor.Flux.Array.2 - | onNext(Calendar A4)
Calendar A4
INFO reactor.Flux.Array.2 - | onComplete()
Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Operations

Transforming (map, scan)


Combining (merge, startWith)
Filtering (last, skip)
Mathematical (count, average, max)
Boolean (every, some, includes)

Source: http://rxmarbles.com

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Elements filtering

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");

Flux<String> productTitlesStartingWithP =
productTitles.filter(productTitle-> productTitle.startsWith("P"));

productTitlesStartingWithP.subscribe(System.out::println);

Output:

Print 9x13
Photobook A4

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Elements counter

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");

Mono<Long> productTitlesCount = productTitles.count();

productTitlesCount.subscribe(System.out::println);

Output:

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Check if all elements match a condition

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");

Mono<Boolean> allProductTitlesLengthBiggerThan5=
productTitles.all(productTitle-> productTitle.length() > 5);

allProductTitlesLengthBiggerThan5.subscribe(System.out::println);

Output:

true

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Element mapping

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");

Flux<Integer> productTitlesLength =
productTitles.map(productTitle-> productTitle.length()) ;

productTitlesLength.subscribe(System.out::println);

Output:

10
12
11

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Concatenation of 2 Publishers

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");


Mono<String> anotherProductTitle=Mono.just("Teddy");

Flux<String> concatProductTitles =
productTitles.concatWith(anotherProductTitle);
concatProductTitles.subscribe(System.out::println);

Output:

Print 9x13
Photobook A4
Calendar A4
Teddy

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Zipping of 2 Publishers

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");

Flux<Double> productPrices = Flux.fromIterable(Arrays.asList(0.09, 29.99, 15.99));

Flux<Tuple2<String, Double>> zippedFlux = Flux.zip(productTitles,


productPrices);
zippedFlux.subscribe(System.out::println);

Output:

[Print 9x13,0.09]
[Photobook A4,29.99]
[Calendar A4,15.99]

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Parallel processing

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");


Flux<Double> productPrices = Flux.fromIterable(Arrays.asList(0.09, 29.99, 15.99));

Flux.zip(productTitles, productPrices)
.parallel() //returns ParallelFlux, uses all available CPUs or call
parallel(numberOfCPUs)
.runOn(Schedulers.parallel())
.sequential()
.subscribe(System.out::println);

Output:
[Print 9x13,0.09]
[Photobook A4,29.99]
[Calendar A4,15.99]
Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Blocking Publisher

Blocking Flux:
Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");

Iterable<String> blockingConcatProductTitles =
productTitles.concatWith(anotherProductTitle) .toIterable();
or
Stream<String> blockingConcatProductTitles =
productTitles.concatWith(anotherProductTitle) .toStream();

Blocking Mono:
String blockingProductTitles = Mono.just("Print 9x13") .block();
or
CompletableFuture blockingProductTitles = Mono.just("Print 9x13").toFuture();

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
LMAX Disruptor

RingBuffer with multiple producers and consumers

Source: https://github.com/LMAX-Exchange/disruptor/wiki/Introduction

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Test the publishers

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");

Duration verificationDuration = StepVerifier.create(productTitles).


expectNextMatches(productTitle -> productTitle.equals("Print 9x13")).
expectNextMatches(productTitle -> productTitle.equals("Photobook A4")).
expectNextMatches(productTitle -> productTitle.equals("Calendar A4")).
expectComplete().
verify());

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor 3 Examples

Test the publishers

Flux<String> productTitles = Flux.just("Print 9x13", "Photobook A4", "Calendar A4");

StepVerifier.create(productTitles).
expectNextMatches(productTitle -> productTitle.equals("Print 9x13")).
expectNextMatches(productTitle -> productTitle.equals("Photobook A4")).
expectNextMatches(productTitle -> productTitle.equals("Calendar A4")).
expectComplete().
verify())

Output:
Exception in thread "main" java.lang.AssertionError: expectation
"expectComplete" failed (expected: onComplete(); actual: onNext(Calendar
A4));

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring Reactor Compatibility to Java 9 Flow API

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
RxJava 2 vs Spring Reactor 3

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Comparing reactive and streaming implementations

Source: http://akarnokd.blogspot.de/2016/12/the-reactive-scrabble-benchmarks.html
Source: https://twitter.com/akarnokd/status/808995627237601280
Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
The future of Reactive StreamAPIs

4th generation already


5th generation will heavily make use of advanced „operator
fusion“

Reactive Programming in Java


Source by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
http://akarnokd.blogspot.de/2016/03/operator-fusion-part-1.html
Spring 5 / Spring Boot 2 / Netty /
MongoDB / Thymeleaf
Demo

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring 5 / Spring Web Reactive

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Spring 5 / Spring Web Reactive

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Dependencies

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Non-blocking Netty Web Client

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
MongoDB Reactive Database Template

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Model

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Repository

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Thymeleaf Reactive View Resolver

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Controller

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
View

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Non-blocking JSON Parsing with Jackson

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Testing

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Benefits of the Reactive Applications

• Efficient resource utilization (spending less money on


servers and data centres)

• Processing higher loads with fewer threads

Source: https://spring.io/blog/2016/06/07/notes-on-reactive-programming-part-i-the-reactive-landscape

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Uses Cases for Reactive Applications

• External service calls

• Highly concurrent message consumers

Source: https://spring.io/blog/2016/06/07/notes-on-reactive-programming-part-i-the-reactive-landscape

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Pitfalls of reactive programming

• For the wrong problem, this makes things worse

• Hard to debug (no control over executing thread)

• Mistakenly blocking a single request leads to increased


latency for all requests -> blocking all requests brings a
server to its knees

Source: https://spring.io/blog/2016/07/20/notes-on-reactive-programming-part-iii-a-simple-http-server-application

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Questions?

Reactive Programming in Java by Vadym Kazulkin and Rodion Alukhanov, ip.labs GmbH
Contact

Vadym Kazulkin :

Email : v.kazulkin@iplabs.de
Xing : https://www.xing.com/profile/Vadym_Kazulkin

Rodion Alukhanov :

Email : r.alukhanov@iplabs.de
Xing : https://www.xing.com/profile/Rodion_Alukhanov
Th an k You !

w w w.i p l ab s.d e

You might also like