16
16
17
17
package com .linkedin .parseq ;
18
18
19
+ import java .util .concurrent .TimeUnit ;
19
20
import java .util .concurrent .atomic .AtomicInteger ;
20
21
21
22
import static org .testng .Assert .*;
@@ -42,7 +43,7 @@ public void testExecuteMultipleTimes() {
42
43
43
44
assertEquals (counter .get (), 1 );
44
45
}
45
-
46
+
46
47
/**
47
48
* In this case the "increaser" task is not being executed because
48
49
* the "bob" task has already been resolved and test1 task is
@@ -54,38 +55,104 @@ public void testLastTaskAlreadyResolved() {
54
55
55
56
final Task <String > bob = Task .value ("bob" , "bob" );
56
57
runAndWait ("TestTaskReuse.testLastTaskResolved-bob" , bob );
57
-
58
+
58
59
Task <String > task = Task .callable ("increaser" , () -> {
59
60
counter .incrementAndGet ();
60
61
return "hello" ;
61
62
});
62
-
63
+
63
64
Task <String > test1 = task .andThen (bob );
64
-
65
+
65
66
runAndWait ("TestTaskReuse.testLastTaskResolved" , test1 );
66
67
67
68
assertEquals (counter .get (), 0 );
68
69
}
69
-
70
+
70
71
@ Test
71
72
public void testLastTaskAlreadyResolvedShareable () {
72
73
final AtomicInteger counter = new AtomicInteger ();
73
74
74
75
final Task <String > bob = Task .value ("bob" , "bob" );
75
76
runAndWait ("TestTaskReuse.testLastTaskAlreadyResolvedShareable-bob" , bob );
76
-
77
+
77
78
Task <String > task = Task .callable ("increaser" , () -> {
78
79
counter .incrementAndGet ();
79
80
return "hello" ;
80
81
});
81
-
82
+
82
83
Task <String > test1 = task .andThen (bob .shareable ());
83
-
84
+
84
85
runAndWait ("TestTaskReuse.testLastTaskAlreadyResolvedShareable" , test1 );
85
86
86
87
assertEquals (counter .get (), 1 );
87
88
}
88
-
89
+
90
+ @ Test
91
+ public void testShareableWithPar () {
92
+ final AtomicInteger counter = new AtomicInteger ();
93
+
94
+ Task <String > task = Task .callable ("increaser" , () -> {
95
+ counter .incrementAndGet ();
96
+ return "hello" ;
97
+ });
98
+
99
+ Task <String > test =
100
+ Task .par (task .shareable ().map (x -> x + "1" ), task .shareable ().map (x -> x + "2" )).map ((a , b ) -> a + b );
101
+
102
+ runAndWait ("TestTaskReuse.testShareableWithPar" , test );
103
+
104
+ assertEquals (counter .get (), 1 );
105
+ assertEquals (test .get (), "hello1hello2" );
106
+ }
107
+
108
+ @ Test
109
+ public void testCancellationPar () {
110
+
111
+ Task <String > task = delayedValue ("hello" ,50 , TimeUnit .MILLISECONDS );
112
+
113
+ Task <String > test1 =
114
+ Task .par (task .map (x -> x + "1" ), Task .failure (new RuntimeException ("ups" ))).map ((a , b ) -> a + b );
115
+
116
+ try {
117
+ runAndWait ("TestTaskReuse.testCancellationPar-test1" , test1 );
118
+ fail ("should have failed!" );
119
+ } catch (Exception ex ) {
120
+ assertTrue (test1 .isFailed ());
121
+ }
122
+
123
+ Task <String > test2 =
124
+ Task .par (task .map ("1" , x -> x + "1" ), task .map ("2" , x -> x + "2" )).map ((a , b ) -> a + b );
125
+
126
+ try {
127
+ runAndWait ("TestTaskReuse.testCancellationPar-test2" , test2 );
128
+ fail ("should have failed!" );
129
+ } catch (Exception ex ) {
130
+ assertTrue (test2 .isFailed ());
131
+ }
132
+ }
133
+
134
+ @ Test
135
+ public void testShareableCancellationPar () {
136
+
137
+ Task <String > task = delayedValue ("hello" ,50 , TimeUnit .MILLISECONDS );
138
+
139
+ Task <String > test1 =
140
+ Task .par (task .shareable ().map (x -> x + "1" ), Task .failure (new RuntimeException ("ups" ))).map ((a , b ) -> a + b );
141
+
142
+ try {
143
+ runAndWait ("TestTaskReuse.testCancellationPar-test1" , test1 );
144
+ fail ("should have failed!" );
145
+ } catch (Exception ex ) {
146
+ assertTrue (test1 .isFailed ());
147
+ }
148
+
149
+ Task <String > test2 =
150
+ Task .par (task .shareable ().map ("1" , x -> x + "1" ), task .shareable ().map ("2" , x -> x + "2" )).map ((a , b ) -> a + b );
151
+
152
+ runAndWait ("TestTaskReuse.testCancellationPar-test2" , test2 );
153
+ assertEquals (test2 .get (), "hello1hello2" );
154
+ }
155
+
89
156
@ Test
90
157
public void testTaskReusedByTwoPlans () {
91
158
final AtomicInteger counter = new AtomicInteger ();
@@ -142,7 +209,7 @@ public void testTaskReusedByTwoPlansAndMergedWithFlatMap() {
142
209
return "hello" ;
143
210
});
144
211
145
- Task <String > plan1 = task .flatMap ("+eatch " , s -> Task .callable (() -> s + " on earth!" ));
212
+ Task <String > plan1 = task .flatMap ("+earch " , s -> Task .callable (() -> s + " on earth!" ));
146
213
Task <String > plan2 = task .flatMap ("+moon" , s -> Task .callable (() -> s + " on moon!" ));
147
214
148
215
runAndWait ("TestTaskReuse.testTaskReusedByTwoPlansAndMergedWithFlatMap-plan1" , plan1 );
@@ -160,4 +227,6 @@ public void testTaskReusedByTwoPlansAndMergedWithFlatMap() {
160
227
assertEquals (countTasks (plan2 .getTrace ()), 4 );
161
228
}
162
229
230
+
231
+
163
232
}
0 commit comments