Skip to content

Commit c54ce97

Browse files
committed
Catch errors from writing to netty context
1 parent 77ae525 commit c54ce97

File tree

1 file changed

+18
-18
lines changed

1 file changed

+18
-18
lines changed

src/main/java/com/github/pgasync/impl/netty/NettyPgProtocolStream.java

+18-18
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,12 @@
2626
import io.netty.handler.ssl.SslContext;
2727
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
2828
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
29+
import io.netty.util.concurrent.*;
30+
import io.netty.util.concurrent.Future;
2931

3032
import java.net.SocketAddress;
31-
import java.util.ArrayList;
32-
import java.util.List;
33-
import java.util.Map;
34-
import java.util.UUID;
35-
import java.util.concurrent.ConcurrentHashMap;
36-
import java.util.concurrent.ConcurrentMap;
37-
import java.util.Queue;
38-
import java.util.concurrent.ArrayBlockingQueue;
39-
import java.util.concurrent.LinkedBlockingDeque;
33+
import java.util.*;
34+
import java.util.concurrent.*;
4035
import java.util.function.Consumer;
4136

4237
import static java.util.Collections.singletonList;
@@ -70,11 +65,7 @@ public void connect(StartupMessage startup, Consumer<List<Message>> replyTo) {
7065
.channel(NioSocketChannel.class)
7166
.handler(newProtocolInitializer(newStartupHandler(startup, replyTo)))
7267
.connect(address)
73-
.addListener(future -> {
74-
if (!future.isSuccess()) {
75-
replyTo.accept(singletonList(new ChannelError(future.cause())));
76-
}
77-
});
68+
.addListener(errorListener(replyTo));
7869
}
7970

8071
@Override
@@ -83,7 +74,7 @@ public void send(Message message, Consumer<List<Message>> replyTo) {
8374
throw new IllegalStateException("Channel is closed");
8475
}
8576
addNewReplyHandler(replyTo);
86-
ctx.writeAndFlush(message);
77+
ctx.writeAndFlush(message).addListener(errorListener(replyTo));
8778
}
8879

8980
private void addNewReplyHandler(Consumer<List<Message>> replyTo) {
@@ -98,7 +89,8 @@ public void send(List<Message> messages, Consumer<List<Message>> replyTo) {
9889
throw new IllegalStateException("Channel is closed");
9990
}
10091
addNewReplyHandler(replyTo);
101-
messages.forEach(ctx::write);
92+
GenericFutureListener<Future<Object>> errorListener = errorListener(replyTo);
93+
messages.forEach(msg -> ctx.write(msg).addListener(errorListener));
10294
ctx.flush();
10395
}
10496

@@ -151,6 +143,14 @@ void publishNotification(NotificationResponse notification) {
151143
}
152144
}
153145

146+
<T> GenericFutureListener<io.netty.util.concurrent.Future<T>> errorListener(Consumer<List<Message>> replyTo) {
147+
return future -> {
148+
if(!future.isSuccess()) {
149+
replyTo.accept(singletonList(new ChannelError(future.cause())));
150+
}
151+
};
152+
}
153+
154154
ChannelInboundHandlerAdapter newStartupHandler(StartupMessage startup, Consumer<List<Message>> replyTo) {
155155
return new ChannelInboundHandlerAdapter() {
156156
@Override
@@ -162,15 +162,15 @@ public void userEventTriggered(ChannelHandlerContext context, Object evt) throws
162162
@Override
163163
public void channelActive(ChannelHandlerContext context) {
164164
if(useSsl) {
165-
context.writeAndFlush(SSLHandshake.INSTANCE);
165+
context.writeAndFlush(SSLHandshake.INSTANCE).addListener(errorListener(replyTo));
166166
} else {
167167
startup(context);
168168
}
169169
}
170170
void startup(ChannelHandlerContext context) {
171171
ctx = context;
172172
addNewReplyHandler(replyTo);
173-
context.writeAndFlush(startup);
173+
context.writeAndFlush(startup).addListener(errorListener(replyTo));
174174
context.pipeline().remove(this);
175175
}
176176
};

0 commit comments

Comments
 (0)