Skip to content

Commit 8e63087

Browse files
committed
[add] add fork/join examples
1 parent 3f85759 commit 8e63087

File tree

4 files changed

+262
-0
lines changed

4 files changed

+262
-0
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.brianway.learning.java8.streamapi.parallel;
2+
3+
import java.util.concurrent.ForkJoinTask;
4+
import java.util.concurrent.RecursiveTask;
5+
import java.util.stream.LongStream;
6+
7+
/**
8+
* 使用分支/合并框架执行并行求和
9+
*/
10+
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
11+
12+
public static final long THRESHOLD = 10_000;
13+
14+
private final long[] numbers;
15+
private final int start;
16+
private final int end;
17+
18+
public ForkJoinSumCalculator(long[] numbers) {
19+
this(numbers, 0, numbers.length);
20+
}
21+
22+
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
23+
this.numbers = numbers;
24+
this.start = start;
25+
this.end = end;
26+
}
27+
28+
@Override
29+
protected Long compute() {
30+
int length = end - start;
31+
if (length <= THRESHOLD) {
32+
return computeSequentially();
33+
}
34+
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
35+
leftTask.fork();
36+
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
37+
Long rightResult = rightTask.compute();
38+
Long leftResult = leftTask.join();
39+
return leftResult + rightResult;
40+
}
41+
42+
private long computeSequentially() {
43+
long sum = 0;
44+
for (int i = start; i < end; i++) {
45+
sum += numbers[i];
46+
}
47+
return sum;
48+
}
49+
50+
public static long forkJoinSum(long n) {
51+
long[] numbers = LongStream.rangeClosed(1, n).toArray();
52+
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
53+
return ParallelStreamsHarness.FORK_JOIN_POOL.invoke(task);
54+
}
55+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.brianway.learning.java8.streamapi.parallel;
2+
3+
import java.util.stream.LongStream;
4+
import java.util.stream.Stream;
5+
6+
/**
7+
* 并行流计算 1~n 的和
8+
* 分别使用指令式,串行迭代流,并行迭代流,基本类型流,有副作用的流
9+
*/
10+
public class ParallelStreams {
11+
12+
public static long iterativeSum(long n) {
13+
long result = 0;
14+
for (long i = 0; i <= n; i++) {
15+
result += i;
16+
}
17+
return result;
18+
}
19+
20+
public static long sequentialSum(long n) {
21+
return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get();
22+
}
23+
24+
public static long parallelSum(long n) {
25+
return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();
26+
}
27+
28+
public static long rangedSum(long n) {
29+
return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();
30+
}
31+
32+
public static long parallelRangedSum(long n) {
33+
return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();
34+
}
35+
36+
public static long sideEffectSum(long n) {
37+
Accumulator accumulator = new Accumulator();
38+
LongStream.rangeClosed(1, n).forEach(accumulator::add);
39+
return accumulator.total;
40+
}
41+
42+
public static long sideEffectParallelSum(long n) {
43+
Accumulator accumulator = new Accumulator();
44+
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
45+
return accumulator.total;
46+
}
47+
48+
public static class Accumulator {
49+
private long total = 0;
50+
51+
public void add(long value) {
52+
total += value;
53+
}
54+
}
55+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.brianway.learning.java8.streamapi.parallel;
2+
3+
import java.util.concurrent.ForkJoinPool;
4+
import java.util.function.Function;
5+
6+
public class ParallelStreamsHarness {
7+
8+
public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();
9+
10+
public static void main(String[] args) {
11+
System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs");
12+
System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs");
13+
System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs");
14+
System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs");
15+
System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs");
16+
System.out.println("ForkJoin sum done in: " + measurePerf(ForkJoinSumCalculator::forkJoinSum, 10_000_000L) + " msecs");
17+
System.out.println("SideEffect sum done in: " + measurePerf(ParallelStreams::sideEffectSum, 10_000_000L) + " msecs");
18+
System.out.println("SideEffect prallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + " msecs");
19+
}
20+
21+
public static <T, R> long measurePerf(Function<T, R> f, T input) {
22+
long fastest = Long.MAX_VALUE;
23+
for (int i = 0; i < 10; i++) {
24+
long start = System.nanoTime();
25+
R result = f.apply(input);
26+
long duration = (System.nanoTime() - start) / 1_000_000;
27+
System.out.println("Result: " + result);
28+
if (duration < fastest) fastest = duration;
29+
}
30+
return fastest;
31+
}
32+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package com.brianway.learning.java8.streamapi.parallel;
2+
3+
import java.util.Spliterator;
4+
import java.util.function.Consumer;
5+
import java.util.stream.Stream;
6+
import java.util.stream.StreamSupport;
7+
8+
/**
9+
* Spliterator: splitable iterator
10+
*/
11+
public class WordCount {
12+
13+
public static final String SENTENCE =
14+
" Nel mezzo del cammin di nostra vita " +
15+
"mi ritrovai in una selva oscura" +
16+
" che la dritta via era smarrita ";
17+
18+
public static void main(String[] args) {
19+
System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");
20+
System.out.println("Found " + countWords(SENTENCE) + " words");
21+
}
22+
23+
public static int countWordsIteratively(String s) {
24+
int counter = 0;
25+
boolean lastSpace = true;
26+
for (char c : s.toCharArray()) {
27+
if (Character.isWhitespace(c)) {
28+
lastSpace = true;
29+
} else {
30+
if (lastSpace) counter++;
31+
lastSpace = false;
32+
}
33+
}
34+
return counter;
35+
}
36+
37+
public static int countWords(String s) {
38+
// Stream<Character> stream = IntStream.range(0, s.length())
39+
// .mapToObj(SENTENCE::charAt).parallel();
40+
Spliterator<Character> spliterator = new WordCounterSpliterator(s);
41+
Stream<Character> stream = StreamSupport.stream(spliterator, true);
42+
43+
return countWords(stream);
44+
}
45+
46+
private static int countWords(Stream<Character> stream) {
47+
WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
48+
WordCounter::accumulate,
49+
WordCounter::combine);
50+
return wordCounter.getCounter();
51+
}
52+
53+
private static class WordCounter {
54+
private final int counter;
55+
private final boolean lastSpace;
56+
57+
public WordCounter(int counter, boolean lastSpace) {
58+
this.counter = counter;
59+
this.lastSpace = lastSpace;
60+
}
61+
62+
public WordCounter accumulate(Character c) {
63+
if (Character.isWhitespace(c)) {
64+
return lastSpace ? this : new WordCounter(counter, true);
65+
} else {
66+
return lastSpace ? new WordCounter(counter + 1, false) : this;
67+
}
68+
}
69+
70+
public WordCounter combine(WordCounter wordCounter) {
71+
return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace);
72+
}
73+
74+
public int getCounter() {
75+
return counter;
76+
}
77+
}
78+
79+
private static class WordCounterSpliterator implements Spliterator<Character> {
80+
81+
private final String string;
82+
private int currentChar = 0;
83+
84+
private WordCounterSpliterator(String string) {
85+
this.string = string;
86+
}
87+
88+
@Override
89+
public boolean tryAdvance(Consumer<? super Character> action) {
90+
action.accept(string.charAt(currentChar++));
91+
return currentChar < string.length();
92+
}
93+
94+
@Override
95+
public Spliterator<Character> trySplit() {
96+
int currentSize = string.length() - currentChar;
97+
if (currentSize < 10) {
98+
return null;
99+
}
100+
for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {
101+
if (Character.isWhitespace(string.charAt(splitPos))) {
102+
Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
103+
currentChar = splitPos;
104+
return spliterator;
105+
}
106+
}
107+
return null;
108+
}
109+
110+
@Override
111+
public long estimateSize() {
112+
return string.length() - currentChar;
113+
}
114+
115+
@Override
116+
public int characteristics() {
117+
return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)