3
3
import org .reactivestreams .Publisher ;
4
4
import org .reactivestreams .Subscriber ;
5
5
import org .reactivestreams .Subscription ;
6
+ import rx .functions .Func1 ;
6
7
8
+ import java .util .AbstractMap ;
9
+ import java .util .AbstractMap .SimpleImmutableEntry ;
10
+ import java .util .LinkedList ;
11
+ import java .util .List ;
12
+ import java .util .Map ;
13
+ import java .util .Map .Entry ;
14
+ import java .util .concurrent .ArrayBlockingQueue ;
15
+ import java .util .concurrent .BlockingQueue ;
16
+ import java .util .concurrent .CountDownLatch ;
17
+ import java .util .concurrent .LinkedBlockingQueue ;
7
18
import java .util .function .Consumer ;
8
19
import java .util .function .Function ;
20
+ import java .util .logging .Level ;
21
+ import java .util .logging .Logger ;
9
22
10
23
public class Observable <T > implements Publisher <T > {
11
24
@@ -22,6 +35,16 @@ public static <T> Observable<T> just(T item) {
22
35
});
23
36
}
24
37
38
+ public static <T > Observable <T > just (T item , T ... items ) {
39
+ return new Observable <>(subscriber -> {
40
+ subscriber .onNext (item );
41
+ for (T t : items ) {
42
+ subscriber .onNext (t );
43
+ }
44
+ subscriber .onComplete ();
45
+ });
46
+ }
47
+
25
48
public static <T > Observable <T > empty () {
26
49
return new Observable <>(Subscriber ::onComplete );
27
50
}
@@ -44,42 +67,194 @@ public void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> on
44
67
}
45
68
46
69
public void subscribe (Consumer <? super T > onNext , Consumer <? super Throwable > onError , Runnable onComplete ) {
47
- publisher .subscribe (new Subscriber <T >() {
70
+ publisher .subscribe (new Sub <T >() {
48
71
@ Override
49
- public void onSubscribe (Subscription subscription ) {
72
+ public void doOnSubscribe (Subscription subscription ) {
50
73
subscription .request (1 );
51
74
}
75
+
52
76
@ Override
53
- public void onNext (T t ) {
54
- if (onNext != null ) {
77
+ public void doOnNext (T t ) {
78
+ if (onNext != null ) {
55
79
onNext .accept (t );
56
80
}
57
81
}
82
+
58
83
@ Override
59
- public void onError (Throwable throwable ) {
60
- if (onError != null ) {
84
+ public void doOnError (Throwable throwable ) {
85
+ if (onError != null ) {
61
86
onError .accept (throwable );
62
87
}
63
88
}
89
+
64
90
@ Override
65
- public void onComplete () {
66
- if (onComplete != null ) {
91
+ public void doOnComplete () {
92
+ if (onComplete != null ) {
67
93
onComplete .run ();
68
94
}
69
95
}
70
96
});
71
97
}
72
98
99
+ public <X > Observable <X > map (Function <T ,X > f ) {
100
+ return new Observable <>(subscriber -> publisher .subscribe (new Forward <T ,X >(subscriber ) {
101
+ @ Override
102
+ public void doOnNext (T t ) {
103
+ subscriber .onNext (f .apply (t ));
104
+ }
105
+ }));
106
+ }
107
+
73
108
public <X > Observable <X > flatMap (Function <T ,Observable <X >> f ) {
74
109
return new Observable <>(subscriber -> publisher .subscribe (new Forward <T ,X >(subscriber ) {
75
110
@ Override
76
- public void onNext (T t ) {
111
+ public void doOnNext (T t ) {
77
112
f .apply (t ).subscribe (new Forward <>(subscriber ));
78
113
}
79
114
}));
80
115
}
81
116
82
- public static class Forward <F ,R > implements Subscriber <F > {
117
+ public <X > Observable <X > lift (Operator <X ,T > operator ) {
118
+ return new Observable <>(subscriber -> publisher .subscribe (operator .apply (subscriber )));
119
+ }
120
+
121
+ public Observable <T > single (Function <T ,Boolean > f ) {
122
+ return new Observable <>(subscriber -> publisher .subscribe (new Forward <T ,T >(subscriber ) {
123
+ @ Override
124
+ public void doOnNext (T t ) {
125
+ if (f .apply (t )) {
126
+ super .doOnNext (t );
127
+ }
128
+ }
129
+ }));
130
+ }
131
+
132
+ public Observable <T > onErrorResumeNext (Function <Throwable , Observable <T >> f ) {
133
+ return new Observable <>(subscriber -> publisher .subscribe (new Forward <T ,T >(subscriber ) {
134
+ @ Override
135
+ public void doOnError (Throwable throwable ) {
136
+ f .apply (throwable ).subscribe (new Forward <>(subscriber ));
137
+ }
138
+ }));
139
+ }
140
+
141
+ public Observable <T > doOnTerminate (Runnable f ) {
142
+ return new Observable <>(subscriber -> publisher .subscribe (new Forward <T ,T >(subscriber ) {
143
+ @ Override
144
+ public void doOnError (Throwable throwable ) {
145
+ f .run ();
146
+ subscriber .onError (throwable );
147
+ }
148
+ @ Override
149
+ public void doOnComplete () {
150
+ f .run ();
151
+ subscriber .onComplete ();
152
+ }
153
+ }));
154
+ }
155
+
156
+ public Observable <T > doOnNext (Consumer <T > f ) {
157
+ return new Observable <>(subscriber -> publisher .subscribe (new Forward <T ,T >(subscriber ) {
158
+ @ Override
159
+ public void doOnNext (T t ) {
160
+ f .accept (t );
161
+ super .doOnNext (t );
162
+ }
163
+ }));
164
+ }
165
+
166
+ public Observable <T > doOnError (Consumer <Throwable > f ) {
167
+ return new Observable <>(subscriber -> publisher .subscribe (new Forward <T ,T >(subscriber ) {
168
+ @ Override
169
+ public void doOnError (Throwable throwable ) {
170
+ f .accept (throwable );
171
+ subscriber .onError (throwable );
172
+ }
173
+ }));
174
+ }
175
+
176
+ public Observable <T > doOnSubscribe (Runnable f ) {
177
+ return new Observable <>(subscriber -> publisher .subscribe (new Forward <T ,T >(subscriber ) {
178
+ @ Override
179
+ public void doOnSubscribe (Subscription subscription ) {
180
+ f .run ();
181
+ subscriber .onSubscribe (subscription );
182
+ }
183
+ }));
184
+ }
185
+
186
+ public Observable <T > doOnUnsubscribe (Runnable f ) {
187
+ throw new UnsupportedOperationException ("Not implemented" );
188
+ }
189
+
190
+ public Observable <List <T >> toList () {
191
+ return new Observable <>(subscriber -> {
192
+
193
+ List <T > list = new LinkedList <>();
194
+
195
+ publisher .subscribe (new Forward <T ,List <T >>(subscriber ) {
196
+ @ Override
197
+ public void doOnNext (T t ) {
198
+ list .add (t );
199
+ }
200
+ @ Override
201
+ public void doOnComplete () {
202
+ subscriber .onNext (list );
203
+ subscriber .onComplete ();
204
+ }
205
+ });
206
+ });
207
+ }
208
+
209
+ public BlockingObservable <T > toBlocking () {
210
+ return new BlockingObservable <>(publisher );
211
+ }
212
+
213
+ public interface Operator <R , T > extends Function <Subscriber <? super R >, Subscriber <? super T >> {
214
+
215
+ }
216
+
217
+ public abstract static class Sub <T > implements Subscriber <T > {
218
+
219
+ @ Override
220
+ public final void onSubscribe (Subscription s ) {
221
+ try {
222
+ doOnSubscribe (s );
223
+ } catch (Throwable t ) {
224
+ onError (t );
225
+ }
226
+ }
227
+ @ Override
228
+ public final void onNext (T t ) {
229
+ try {
230
+ doOnNext (t );
231
+ } catch (Throwable throwable ) {
232
+ onError (throwable );
233
+ }
234
+ }
235
+ @ Override
236
+ public final void onError (Throwable throwable ) {
237
+ try {
238
+ doOnError (throwable );
239
+ } catch (Throwable err ) {
240
+ Logger .getLogger (getClass ().getName ()).log (Level .SEVERE , "onError threw exception" , err );
241
+ }
242
+ }
243
+ @ Override
244
+ public final void onComplete () {
245
+ try {
246
+ doOnComplete ();
247
+ } catch (Throwable throwable ) {
248
+ onError (throwable );
249
+ }
250
+ }
251
+ protected abstract void doOnSubscribe (Subscription s );
252
+ protected abstract void doOnNext (T t );
253
+ protected abstract void doOnError (Throwable throwable );
254
+ protected abstract void doOnComplete ();
255
+ }
256
+
257
+ public static class Forward <F ,R > extends Sub <F > {
83
258
84
259
protected final Subscriber <? super R > to ;
85
260
@@ -88,32 +263,65 @@ public Forward(Subscriber<? super R> to) {
88
263
}
89
264
90
265
@ Override
91
- public void onSubscribe (Subscription subscription ) {
266
+ public void doOnSubscribe (Subscription subscription ) {
92
267
to .onSubscribe (subscription );
93
268
}
94
269
95
270
@ Override
96
271
@ SuppressWarnings ("unchecked" )
97
- public void onNext (F t ) {
272
+ public void doOnNext (F t ) {
98
273
to .onNext ((R ) t );
99
274
}
100
275
101
276
@ Override
102
- public void onError (Throwable throwable ) {
277
+ public void doOnError (Throwable throwable ) {
103
278
to .onError (throwable );
104
279
}
105
280
106
281
@ Override
107
- public void onComplete () {
282
+ public void doOnComplete () {
108
283
to .onComplete ();
109
284
}
110
285
}
111
286
112
287
public static class BlockingObservable <T > {
113
288
114
- public T single () {
115
- return null ;
289
+ private final Publisher <T > source ;
290
+ private final BlockingQueue <Entry <T ,Throwable >> result = new LinkedBlockingQueue <>();
291
+
292
+ public BlockingObservable (Publisher <T > source ) {
293
+ this .source = source ;
116
294
}
117
295
296
+ public T single () {
297
+
298
+ source .subscribe (new Subscriber <T >() {
299
+ @ Override
300
+ public void onNext (T t ) {
301
+ result .add (new SimpleImmutableEntry <>(t , null ));
302
+ }
303
+ @ Override
304
+ public void onError (Throwable t ) {
305
+ result .add (new SimpleImmutableEntry <>(null , t ));
306
+ }
307
+ @ Override
308
+ public void onSubscribe (Subscription s ) { }
309
+ @ Override
310
+ public void onComplete () { }
311
+ });
312
+
313
+ try {
314
+ Entry <T , Throwable > entry = result .take ();
315
+ if (entry .getValue () != null ) {
316
+ throw entry .getValue () instanceof RuntimeException
317
+ ? (RuntimeException ) entry .getValue ()
318
+ : new RuntimeException (entry .getValue ());
319
+ }
320
+ return entry .getKey ();
321
+
322
+ } catch (InterruptedException e ) {
323
+ throw new RuntimeException (e );
324
+ }
325
+ }
118
326
}
119
327
}
0 commit comments