Skip to content

Commit f303e73

Browse files
committed
Add missing operators.
1 parent dfd782d commit f303e73

File tree

2 files changed

+285
-21
lines changed

2 files changed

+285
-21
lines changed

src/main/java/com/github/pgasync/impl/reactive/Observable.java

Lines changed: 224 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,22 @@
33
import org.reactivestreams.Publisher;
44
import org.reactivestreams.Subscriber;
55
import org.reactivestreams.Subscription;
6+
import rx.functions.Func1;
67

8+
import java.util.AbstractMap;
9+
import java.util.AbstractMap.SimpleImmutableEntry;
10+
import java.util.LinkedList;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Map.Entry;
14+
import java.util.concurrent.ArrayBlockingQueue;
15+
import java.util.concurrent.BlockingQueue;
16+
import java.util.concurrent.CountDownLatch;
17+
import java.util.concurrent.LinkedBlockingQueue;
718
import java.util.function.Consumer;
819
import java.util.function.Function;
20+
import java.util.logging.Level;
21+
import java.util.logging.Logger;
922

1023
public class Observable<T> implements Publisher<T> {
1124

@@ -22,6 +35,16 @@ public static <T> Observable<T> just(T item) {
2235
});
2336
}
2437

38+
public static <T> Observable<T> just(T item, T... items) {
39+
return new Observable<>(subscriber -> {
40+
subscriber.onNext(item);
41+
for(T t : items) {
42+
subscriber.onNext(t);
43+
}
44+
subscriber.onComplete();
45+
});
46+
}
47+
2548
public static <T> Observable<T> empty() {
2649
return new Observable<>(Subscriber::onComplete);
2750
}
@@ -44,42 +67,194 @@ public void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> on
4467
}
4568

4669
public void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Runnable onComplete) {
47-
publisher.subscribe(new Subscriber<T>() {
70+
publisher.subscribe(new Sub<T>() {
4871
@Override
49-
public void onSubscribe(Subscription subscription) {
72+
public void doOnSubscribe(Subscription subscription) {
5073
subscription.request(1);
5174
}
75+
5276
@Override
53-
public void onNext(T t) {
54-
if(onNext != null) {
77+
public void doOnNext(T t) {
78+
if (onNext != null) {
5579
onNext.accept(t);
5680
}
5781
}
82+
5883
@Override
59-
public void onError(Throwable throwable) {
60-
if(onError != null) {
84+
public void doOnError(Throwable throwable) {
85+
if (onError != null) {
6186
onError.accept(throwable);
6287
}
6388
}
89+
6490
@Override
65-
public void onComplete() {
66-
if(onComplete != null) {
91+
public void doOnComplete() {
92+
if (onComplete != null) {
6793
onComplete.run();
6894
}
6995
}
7096
});
7197
}
7298

99+
public <X> Observable<X> map(Function<T,X> f) {
100+
return new Observable<>(subscriber -> publisher.subscribe(new Forward<T,X>(subscriber) {
101+
@Override
102+
public void doOnNext(T t) {
103+
subscriber.onNext(f.apply(t));
104+
}
105+
}));
106+
}
107+
73108
public <X> Observable<X> flatMap(Function<T,Observable<X>> f) {
74109
return new Observable<>(subscriber -> publisher.subscribe(new Forward<T,X>(subscriber) {
75110
@Override
76-
public void onNext(T t) {
111+
public void doOnNext(T t) {
77112
f.apply(t).subscribe(new Forward<>(subscriber));
78113
}
79114
}));
80115
}
81116

82-
public static class Forward<F,R> implements Subscriber<F> {
117+
public <X> Observable<X> lift(Operator<X,T> operator) {
118+
return new Observable<>(subscriber -> publisher.subscribe(operator.apply(subscriber)));
119+
}
120+
121+
public Observable<T> single(Function<T,Boolean> f) {
122+
return new Observable<>(subscriber -> publisher.subscribe(new Forward<T,T>(subscriber) {
123+
@Override
124+
public void doOnNext(T t) {
125+
if(f.apply(t)) {
126+
super.doOnNext(t);
127+
}
128+
}
129+
}));
130+
}
131+
132+
public Observable<T> onErrorResumeNext(Function<Throwable, Observable<T>> f) {
133+
return new Observable<>(subscriber -> publisher.subscribe(new Forward<T,T>(subscriber) {
134+
@Override
135+
public void doOnError(Throwable throwable) {
136+
f.apply(throwable).subscribe(new Forward<>(subscriber));
137+
}
138+
}));
139+
}
140+
141+
public Observable<T> doOnTerminate(Runnable f) {
142+
return new Observable<>(subscriber -> publisher.subscribe(new Forward<T,T>(subscriber) {
143+
@Override
144+
public void doOnError(Throwable throwable) {
145+
f.run();
146+
subscriber.onError(throwable);
147+
}
148+
@Override
149+
public void doOnComplete() {
150+
f.run();
151+
subscriber.onComplete();
152+
}
153+
}));
154+
}
155+
156+
public Observable<T> doOnNext(Consumer<T> f) {
157+
return new Observable<>(subscriber -> publisher.subscribe(new Forward<T,T>(subscriber) {
158+
@Override
159+
public void doOnNext(T t) {
160+
f.accept(t);
161+
super.doOnNext(t);
162+
}
163+
}));
164+
}
165+
166+
public Observable<T> doOnError(Consumer<Throwable> f) {
167+
return new Observable<>(subscriber -> publisher.subscribe(new Forward<T,T>(subscriber) {
168+
@Override
169+
public void doOnError(Throwable throwable) {
170+
f.accept(throwable);
171+
subscriber.onError(throwable);
172+
}
173+
}));
174+
}
175+
176+
public Observable<T> doOnSubscribe(Runnable f) {
177+
return new Observable<>(subscriber -> publisher.subscribe(new Forward<T,T>(subscriber) {
178+
@Override
179+
public void doOnSubscribe(Subscription subscription) {
180+
f.run();
181+
subscriber.onSubscribe(subscription);
182+
}
183+
}));
184+
}
185+
186+
public Observable<T> doOnUnsubscribe(Runnable f) {
187+
throw new UnsupportedOperationException("Not implemented");
188+
}
189+
190+
public Observable<List<T>> toList() {
191+
return new Observable<>(subscriber -> {
192+
193+
List<T> list = new LinkedList<>();
194+
195+
publisher.subscribe(new Forward<T,List<T>>(subscriber) {
196+
@Override
197+
public void doOnNext(T t) {
198+
list.add(t);
199+
}
200+
@Override
201+
public void doOnComplete() {
202+
subscriber.onNext(list);
203+
subscriber.onComplete();
204+
}
205+
});
206+
});
207+
}
208+
209+
public BlockingObservable<T> toBlocking() {
210+
return new BlockingObservable<>(publisher);
211+
}
212+
213+
public interface Operator<R, T> extends Function<Subscriber<? super R>, Subscriber<? super T>> {
214+
215+
}
216+
217+
public abstract static class Sub<T> implements Subscriber<T> {
218+
219+
@Override
220+
public final void onSubscribe(Subscription s) {
221+
try {
222+
doOnSubscribe(s);
223+
} catch (Throwable t) {
224+
onError(t);
225+
}
226+
}
227+
@Override
228+
public final void onNext(T t) {
229+
try {
230+
doOnNext(t);
231+
} catch (Throwable throwable) {
232+
onError(throwable);
233+
}
234+
}
235+
@Override
236+
public final void onError(Throwable throwable) {
237+
try {
238+
doOnError(throwable);
239+
} catch (Throwable err) {
240+
Logger.getLogger(getClass().getName()).log(Level.SEVERE, "onError threw exception", err);
241+
}
242+
}
243+
@Override
244+
public final void onComplete() {
245+
try {
246+
doOnComplete();
247+
} catch (Throwable throwable) {
248+
onError(throwable);
249+
}
250+
}
251+
protected abstract void doOnSubscribe(Subscription s);
252+
protected abstract void doOnNext(T t);
253+
protected abstract void doOnError(Throwable throwable);
254+
protected abstract void doOnComplete();
255+
}
256+
257+
public static class Forward<F,R> extends Sub<F> {
83258

84259
protected final Subscriber<? super R> to;
85260

@@ -88,32 +263,65 @@ public Forward(Subscriber<? super R> to) {
88263
}
89264

90265
@Override
91-
public void onSubscribe(Subscription subscription) {
266+
public void doOnSubscribe(Subscription subscription) {
92267
to.onSubscribe(subscription);
93268
}
94269

95270
@Override
96271
@SuppressWarnings("unchecked")
97-
public void onNext(F t) {
272+
public void doOnNext(F t) {
98273
to.onNext((R) t);
99274
}
100275

101276
@Override
102-
public void onError(Throwable throwable) {
277+
public void doOnError(Throwable throwable) {
103278
to.onError(throwable);
104279
}
105280

106281
@Override
107-
public void onComplete() {
282+
public void doOnComplete() {
108283
to.onComplete();
109284
}
110285
}
111286

112287
public static class BlockingObservable<T> {
113288

114-
public T single() {
115-
return null;
289+
private final Publisher<T> source;
290+
private final BlockingQueue<Entry<T,Throwable>> result = new LinkedBlockingQueue<>();
291+
292+
public BlockingObservable(Publisher<T> source) {
293+
this.source = source;
116294
}
117295

296+
public T single() {
297+
298+
source.subscribe(new Subscriber<T>() {
299+
@Override
300+
public void onNext(T t) {
301+
result.add(new SimpleImmutableEntry<>(t, null));
302+
}
303+
@Override
304+
public void onError(Throwable t) {
305+
result.add(new SimpleImmutableEntry<>(null, t));
306+
}
307+
@Override
308+
public void onSubscribe(Subscription s) { }
309+
@Override
310+
public void onComplete() { }
311+
});
312+
313+
try {
314+
Entry<T, Throwable> entry = result.take();
315+
if(entry.getValue() != null) {
316+
throw entry.getValue() instanceof RuntimeException
317+
? (RuntimeException) entry.getValue()
318+
: new RuntimeException(entry.getValue());
319+
}
320+
return entry.getKey();
321+
322+
} catch (InterruptedException e) {
323+
throw new RuntimeException(e);
324+
}
325+
}
118326
}
119327
}
Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,72 @@
11
package com.github.pgasync.impl.reactive;
22

33
import org.junit.Test;
4+
import org.reactivestreams.Subscriber;
5+
import org.reactivestreams.Subscription;
6+
7+
import static java.util.Arrays.asList;
8+
import static org.junit.Assert.assertEquals;
49

510
public class ObservableTest {
611

712
@Test
813
public void shouldFlatMap() {
9-
Observable.create(subscriber -> {
10-
subscriber.onNext("foo");
11-
subscriber.onNext("bar");
12-
subscriber.onComplete();
13-
}).subscribe(System.out::println);
14+
assertEquals(asList(1, 2, 2, 4, 3, 6),
15+
Observable.just(1, 2, 3)
16+
.flatMap(n -> Observable.just(n, n * 2))
17+
.toList()
18+
.toBlocking()
19+
.single());
20+
}
21+
22+
@Test
23+
public void shouldMap() {
24+
assertEquals(asList(2, 3, 4),
25+
Observable.just(1, 2, 3)
26+
.map(n -> n + 1)
27+
.toList()
28+
.toBlocking()
29+
.single());
30+
}
31+
32+
@Test
33+
public void shouldLift() {
34+
assertEquals(asList(2, 4, 6),
35+
Observable.just(1, 2, 3)
36+
.lift(subscriber -> new Subscriber<Integer>() {
37+
@Override
38+
public void onSubscribe(Subscription s) {
39+
subscriber.onSubscribe(s);
40+
}
41+
@Override
42+
public void onNext(Integer integer) {
43+
subscriber.onNext(2 * integer);
44+
}
45+
@Override
46+
public void onError(Throwable t) {
47+
subscriber.onError(t);
48+
}
49+
@Override
50+
public void onComplete() {
51+
subscriber.onComplete();
52+
}
53+
})
54+
.toList()
55+
.toBlocking()
56+
.single());
57+
}
58+
59+
@Test
60+
public void shouldOnErrorResumeNext() {
61+
assertEquals(asList(1, 10, 20),
62+
Observable.create(subscriber -> {
63+
subscriber.onNext(1);
64+
subscriber.onError(new Exception());
65+
})
66+
.onErrorResumeNext(throwable -> Observable.just(10, 20))
67+
.toList()
68+
.toBlocking()
69+
.single());
1470
}
1571

1672
}

0 commit comments

Comments
 (0)