Skip to content

Commit f0c7eae

Browse files
committed
Issue #16: Data can be sent to a wrong subscriber when pipeline is enabled
1 parent 453e72a commit f0c7eae

File tree

2 files changed

+22
-6
lines changed

2 files changed

+22
-6
lines changed

src/main/java/com/github/pgasync/impl/netty/NettyPgConnectionPool.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,17 @@ public class NettyPgConnectionPool extends PgConnectionPool {
3939

4040
final EventLoopGroup group = new NioEventLoopGroup(1);
4141
final boolean useSsl;
42+
final boolean pipeline;
4243

4344
public NettyPgConnectionPool(PoolProperties properties) {
4445
super(properties);
4546
useSsl = properties.getUseSsl();
47+
pipeline = properties.getUsePipelining();
4648
}
4749

4850
@Override
4951
protected PgProtocolStream openStream(InetSocketAddress address) {
50-
return new NettyPgProtocolStream(group, address, useSsl);
52+
return new NettyPgProtocolStream(group, address, useSsl, pipeline);
5153
}
5254

5355
@Override

src/main/java/com/github/pgasync/impl/netty/NettyPgProtocolStream.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,23 @@
4646
public class NettyPgProtocolStream implements PgProtocolStream {
4747

4848
final EventLoopGroup group;
49+
final EventLoop eventLoop;
4950
final SocketAddress address;
5051
final boolean useSsl;
52+
final boolean pipeline;
5153

5254
final GenericFutureListener<Future<? super Object>> onError;
5355
final Queue<Subscriber<? super Message>> subscribers;
5456
final ConcurrentMap<String,Map<String,Subscriber<? super String>>> listeners = new ConcurrentHashMap<>();
5557

5658
ChannelHandlerContext ctx;
5759

58-
public NettyPgProtocolStream(EventLoopGroup group, SocketAddress address, boolean useSsl) {
60+
public NettyPgProtocolStream(EventLoopGroup group, SocketAddress address, boolean useSsl, boolean pipeline) {
5961
this.group = group;
62+
this.eventLoop = group.next();
6063
this.address = address;
6164
this.useSsl = useSsl; // TODO: refactor into SSLConfig with trust parameters
65+
this.pipeline = pipeline;
6266
this.subscribers = new LinkedBlockingDeque<>(); // TODO: limit pipeline queue depth
6367
this.onError = future -> {
6468
if(!future.isSuccess()) {
@@ -101,6 +105,14 @@ public Observable<Message> send(Message... messages) {
101105
return;
102106
}
103107

108+
if(pipeline && !eventLoop.inEventLoop()) {
109+
eventLoop.submit(() -> {
110+
pushSubscriber(subscriber);
111+
write(messages);
112+
});
113+
return;
114+
}
115+
104116
pushSubscriber(subscriber);
105117
write(messages);
106118

@@ -136,14 +148,16 @@ public Observable<String> listen(String channel) {
136148

137149
@Override
138150
public Observable<Void> close() {
139-
return Observable.create(subscriber -> ctx.close().addListener(f -> {
140-
if(!f.isSuccess()) {
141-
subscriber.onError(f.cause());
151+
return Observable.create(subscriber ->
152+
ctx.writeAndFlush(Terminate.INSTANCE).addListener(written ->
153+
ctx.close().addListener(closed -> {
154+
if (!closed.isSuccess()) {
155+
subscriber.onError(closed.cause());
142156
return;
143157
}
144158
subscriber.onNext(null);
145159
subscriber.onCompleted();
146-
}));
160+
})));
147161
}
148162

149163
private void pushSubscriber(Subscriber<? super Message> subscriber) {

0 commit comments

Comments
 (0)