26
26
import io .netty .handler .ssl .SslContext ;
27
27
import io .netty .handler .ssl .SslHandshakeCompletionEvent ;
28
28
import io .netty .handler .ssl .util .InsecureTrustManagerFactory ;
29
+ import io .netty .util .concurrent .*;
30
+ import io .netty .util .concurrent .Future ;
29
31
30
32
import java .net .SocketAddress ;
31
- import java .util .ArrayList ;
32
- import java .util .List ;
33
- import java .util .Map ;
34
- import java .util .UUID ;
35
- import java .util .concurrent .ConcurrentHashMap ;
36
- import java .util .concurrent .ConcurrentMap ;
37
- import java .util .Queue ;
38
- import java .util .concurrent .ArrayBlockingQueue ;
39
- import java .util .concurrent .LinkedBlockingDeque ;
33
+ import java .util .*;
34
+ import java .util .concurrent .*;
40
35
import java .util .function .Consumer ;
41
36
42
37
import static java .util .Collections .singletonList ;
@@ -70,11 +65,7 @@ public void connect(StartupMessage startup, Consumer<List<Message>> replyTo) {
70
65
.channel (NioSocketChannel .class )
71
66
.handler (newProtocolInitializer (newStartupHandler (startup , replyTo )))
72
67
.connect (address )
73
- .addListener (future -> {
74
- if (!future .isSuccess ()) {
75
- replyTo .accept (singletonList (new ChannelError (future .cause ())));
76
- }
77
- });
68
+ .addListener (errorListener (replyTo ));
78
69
}
79
70
80
71
@ Override
@@ -83,7 +74,7 @@ public void send(Message message, Consumer<List<Message>> replyTo) {
83
74
throw new IllegalStateException ("Channel is closed" );
84
75
}
85
76
addNewReplyHandler (replyTo );
86
- ctx .writeAndFlush (message );
77
+ ctx .writeAndFlush (message ). addListener ( errorListener ( replyTo )) ;
87
78
}
88
79
89
80
private void addNewReplyHandler (Consumer <List <Message >> replyTo ) {
@@ -98,7 +89,8 @@ public void send(List<Message> messages, Consumer<List<Message>> replyTo) {
98
89
throw new IllegalStateException ("Channel is closed" );
99
90
}
100
91
addNewReplyHandler (replyTo );
101
- messages .forEach (ctx ::write );
92
+ GenericFutureListener <Future <Object >> errorListener = errorListener (replyTo );
93
+ messages .forEach (msg -> ctx .write (msg ).addListener (errorListener ));
102
94
ctx .flush ();
103
95
}
104
96
@@ -151,6 +143,14 @@ void publishNotification(NotificationResponse notification) {
151
143
}
152
144
}
153
145
146
+ <T > GenericFutureListener <io .netty .util .concurrent .Future <T >> errorListener (Consumer <List <Message >> replyTo ) {
147
+ return future -> {
148
+ if (!future .isSuccess ()) {
149
+ replyTo .accept (singletonList (new ChannelError (future .cause ())));
150
+ }
151
+ };
152
+ }
153
+
154
154
ChannelInboundHandlerAdapter newStartupHandler (StartupMessage startup , Consumer <List <Message >> replyTo ) {
155
155
return new ChannelInboundHandlerAdapter () {
156
156
@ Override
@@ -162,15 +162,15 @@ public void userEventTriggered(ChannelHandlerContext context, Object evt) throws
162
162
@ Override
163
163
public void channelActive (ChannelHandlerContext context ) {
164
164
if (useSsl ) {
165
- context .writeAndFlush (SSLHandshake .INSTANCE );
165
+ context .writeAndFlush (SSLHandshake .INSTANCE ). addListener ( errorListener ( replyTo )) ;
166
166
} else {
167
167
startup (context );
168
168
}
169
169
}
170
170
void startup (ChannelHandlerContext context ) {
171
171
ctx = context ;
172
172
addNewReplyHandler (replyTo );
173
- context .writeAndFlush (startup );
173
+ context .writeAndFlush (startup ). addListener ( errorListener ( replyTo )) ;
174
174
context .pipeline ().remove (this );
175
175
}
176
176
};
0 commit comments