Skip to content

Commit 74eb8ee

Browse files
author
jodzga
committed
Added tests for Task.shareable().
1 parent f317f2e commit 74eb8ee

File tree

2 files changed

+80
-11
lines changed

2 files changed

+80
-11
lines changed

src/com/linkedin/parseq/Exceptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
public class Exceptions {
88

9-
public static final Exception EARLY_FINISH_EXCEPTION = sanitize(new EarlyFinishException("Task cancelled because parent was already finished"));
9+
public static final Exception EARLY_FINISH_EXCEPTION = sanitize(new EarlyFinishException("Task execution cancelled because it's promise was already completed"));
1010
public static final Exception TIMEOUT_EXCEPTION = sanitize(new TimeoutException());
1111
public static final Exception NO_SUCH_ELEMENT_EXCEPTION = sanitize(new NoSuchElementException());
1212

test/com/linkedin/parseq/TestTaskReuse.java

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.linkedin.parseq;
1818

19+
import java.util.concurrent.TimeUnit;
1920
import java.util.concurrent.atomic.AtomicInteger;
2021

2122
import static org.testng.Assert.*;
@@ -42,7 +43,7 @@ public void testExecuteMultipleTimes() {
4243

4344
assertEquals(counter.get(), 1);
4445
}
45-
46+
4647
/**
4748
* In this case the "increaser" task is not being executed because
4849
* the "bob" task has already been resolved and test1 task is
@@ -54,38 +55,104 @@ public void testLastTaskAlreadyResolved() {
5455

5556
final Task<String> bob = Task.value("bob", "bob");
5657
runAndWait("TestTaskReuse.testLastTaskResolved-bob", bob);
57-
58+
5859
Task<String> task = Task.callable("increaser", () -> {
5960
counter.incrementAndGet();
6061
return "hello";
6162
});
62-
63+
6364
Task<String> test1 = task.andThen(bob);
64-
65+
6566
runAndWait("TestTaskReuse.testLastTaskResolved", test1);
6667

6768
assertEquals(counter.get(), 0);
6869
}
69-
70+
7071
@Test
7172
public void testLastTaskAlreadyResolvedShareable() {
7273
final AtomicInteger counter = new AtomicInteger();
7374

7475
final Task<String> bob = Task.value("bob", "bob");
7576
runAndWait("TestTaskReuse.testLastTaskAlreadyResolvedShareable-bob", bob);
76-
77+
7778
Task<String> task = Task.callable("increaser", () -> {
7879
counter.incrementAndGet();
7980
return "hello";
8081
});
81-
82+
8283
Task<String> test1 = task.andThen(bob.shareable());
83-
84+
8485
runAndWait("TestTaskReuse.testLastTaskAlreadyResolvedShareable", test1);
8586

8687
assertEquals(counter.get(), 1);
8788
}
88-
89+
90+
@Test
91+
public void testShareableWithPar() {
92+
final AtomicInteger counter = new AtomicInteger();
93+
94+
Task<String> task = Task.callable("increaser", () -> {
95+
counter.incrementAndGet();
96+
return "hello";
97+
});
98+
99+
Task<String> test =
100+
Task.par(task.shareable().map(x -> x + "1"), task.shareable().map(x -> x + "2")).map((a, b) -> a + b);
101+
102+
runAndWait("TestTaskReuse.testShareableWithPar", test);
103+
104+
assertEquals(counter.get(), 1);
105+
assertEquals(test.get(), "hello1hello2");
106+
}
107+
108+
@Test
109+
public void testCancellationPar() {
110+
111+
Task<String> task = delayedValue("hello",50, TimeUnit.MILLISECONDS);
112+
113+
Task<String> test1 =
114+
Task.par(task.map(x -> x + "1"), Task.failure(new RuntimeException("ups"))).map((a, b) -> a + b);
115+
116+
try {
117+
runAndWait("TestTaskReuse.testCancellationPar-test1", test1);
118+
fail("should have failed!");
119+
} catch (Exception ex) {
120+
assertTrue(test1.isFailed());
121+
}
122+
123+
Task<String> test2 =
124+
Task.par(task.map("1", x -> x + "1"), task.map("2", x -> x + "2")).map((a, b) -> a + b);
125+
126+
try {
127+
runAndWait("TestTaskReuse.testCancellationPar-test2", test2);
128+
fail("should have failed!");
129+
} catch (Exception ex) {
130+
assertTrue(test2.isFailed());
131+
}
132+
}
133+
134+
@Test
135+
public void testShareableCancellationPar() {
136+
137+
Task<String> task = delayedValue("hello",50, TimeUnit.MILLISECONDS);
138+
139+
Task<String> test1 =
140+
Task.par(task.shareable().map(x -> x + "1"), Task.failure(new RuntimeException("ups"))).map((a, b) -> a + b);
141+
142+
try {
143+
runAndWait("TestTaskReuse.testCancellationPar-test1", test1);
144+
fail("should have failed!");
145+
} catch (Exception ex) {
146+
assertTrue(test1.isFailed());
147+
}
148+
149+
Task<String> test2 =
150+
Task.par(task.shareable().map("1", x -> x + "1"), task.shareable().map("2", x -> x + "2")).map((a, b) -> a + b);
151+
152+
runAndWait("TestTaskReuse.testCancellationPar-test2", test2);
153+
assertEquals(test2.get(), "hello1hello2");
154+
}
155+
89156
@Test
90157
public void testTaskReusedByTwoPlans() {
91158
final AtomicInteger counter = new AtomicInteger();
@@ -142,7 +209,7 @@ public void testTaskReusedByTwoPlansAndMergedWithFlatMap() {
142209
return "hello";
143210
});
144211

145-
Task<String> plan1 = task.flatMap("+eatch", s -> Task.callable(() -> s + " on earth!"));
212+
Task<String> plan1 = task.flatMap("+earch", s -> Task.callable(() -> s + " on earth!"));
146213
Task<String> plan2 = task.flatMap("+moon", s -> Task.callable(() -> s + " on moon!"));
147214

148215
runAndWait("TestTaskReuse.testTaskReusedByTwoPlansAndMergedWithFlatMap-plan1", plan1);
@@ -160,4 +227,6 @@ public void testTaskReusedByTwoPlansAndMergedWithFlatMap() {
160227
assertEquals(countTasks(plan2.getTrace()), 4);
161228
}
162229

230+
231+
163232
}

0 commit comments

Comments
 (0)