38
38
import java .util .concurrent .ConcurrentHashMap ;
39
39
import java .util .concurrent .ConcurrentMap ;
40
40
import java .util .concurrent .LinkedBlockingDeque ;
41
+ import java .util .concurrent .atomic .AtomicReference ;
41
42
import java .util .function .Consumer ;
42
43
43
44
/**
@@ -71,7 +72,7 @@ public NettyPgProtocolStream(EventLoopGroup group, SocketAddress address, boolea
71
72
72
73
@ Override
73
74
public Observable <Message > connect (StartupMessage startup ) {
74
- return protocolObservable (subscriber -> {
75
+ return Observable . create (subscriber -> {
75
76
76
77
pushSubscriber (subscriber );
77
78
new Bootstrap ()
@@ -80,12 +81,13 @@ public Observable<Message> connect(StartupMessage startup) {
80
81
.handler (newProtocolInitializer (newStartupHandler (startup )))
81
82
.connect (address )
82
83
.addListener (onError );
83
- });
84
+
85
+ }).lift (throwErrorResponses ());
84
86
}
85
87
86
88
@ Override
87
89
public Observable <Message > send (Message ... messages ) {
88
- return protocolObservable (subscriber -> {
90
+ return Observable . create (subscriber -> {
89
91
90
92
if (!isConnected ()) {
91
93
subscriber .onError (new IllegalStateException ("Channel is closed" ));
@@ -94,7 +96,8 @@ public Observable<Message> send(Message... messages) {
94
96
95
97
pushSubscriber (subscriber );
96
98
write (messages );
97
- });
99
+
100
+ }).lift (throwErrorResponses ());
98
101
}
99
102
100
103
@ Override
@@ -128,7 +131,7 @@ public void close() {
128
131
129
132
private void pushSubscriber (Subscriber <? super Message > subscriber ) {
130
133
if (!subscribers .offer (subscriber )) {
131
- throw new IllegalStateException ("Pipelining not enabled" );
134
+ throw new IllegalStateException ("Pipelining not enabled " + subscribers . peek () );
132
135
}
133
136
}
134
137
@@ -146,32 +149,39 @@ private void publishNotification(NotificationResponse notification) {
146
149
}
147
150
}
148
151
149
- private static <T > Observable <T > protocolObservable (Observable .OnSubscribe <T > onSubscribe ) {
150
- return Observable .create (onSubscribe )
151
- .lift (subscriber -> new Subscriber <T >() {
152
- @ Override
153
- public void onCompleted () {
154
- subscriber .onCompleted ();
155
- }
156
- @ Override
157
- public void onError (Throwable e ) {
158
- subscriber .onError (e );
159
- }
160
- @ Override
161
- public void onNext (T message ) {
162
- if (message instanceof ErrorResponse ) {
163
- ErrorResponse error = (ErrorResponse ) message ;
164
- subscriber .onError (new SqlException (error .getLevel ().name (), error .getCode (), error .getMessage ()));
165
- subscriber .unsubscribe ();
166
- return ;
167
- }
168
- subscriber .onNext (message );
169
- }
170
- });
152
+ private static Observable .Operator <Message ,? super Object > throwErrorResponses () {
153
+ return subscriber -> new Subscriber <Object >() {
154
+
155
+ SqlException sqlException ;
156
+
157
+ @ Override
158
+ public void onCompleted () {
159
+ if (sqlException != null ) {
160
+ subscriber .onError (sqlException );
161
+ return ;
162
+ }
163
+ subscriber .onCompleted ();
164
+ }
165
+
166
+ @ Override
167
+ public void onError (Throwable e ) {
168
+ subscriber .onError (e );
169
+ }
170
+
171
+ @ Override
172
+ public void onNext (Object message ) {
173
+ if (message instanceof ErrorResponse ) {
174
+ ErrorResponse error = (ErrorResponse ) message ;
175
+ sqlException = new SqlException (error .getLevel ().name (), error .getCode (), error .getMessage ());
176
+ return ;
177
+ }
178
+ subscriber .onNext ((Message ) message );
179
+ }
180
+ };
171
181
}
172
182
173
183
private static boolean isCompleteMessage (Object msg ) {
174
- return msg instanceof ReadyForQuery
184
+ return msg == ReadyForQuery . INSTANCE
175
185
|| (msg instanceof Authentication && !((Authentication ) msg ).isAuthenticationOk ());
176
186
}
177
187
@@ -245,10 +255,8 @@ public void channelRead(ChannelHandlerContext context, Object msg) throws Except
245
255
246
256
if (isCompleteMessage (msg )) {
247
257
Subscriber <? super Message > subscriber = subscribers .remove ();
248
- if (!subscriber .isUnsubscribed ()) {
249
- subscriber .onNext ((Message ) msg );
250
- subscriber .onCompleted ();
251
- }
258
+ subscriber .onNext ((Message ) msg );
259
+ subscriber .onCompleted ();
252
260
return ;
253
261
}
254
262
0 commit comments