|
46 | 46 | public class NettyPgProtocolStream implements PgProtocolStream {
|
47 | 47 |
|
48 | 48 | final EventLoopGroup group;
|
| 49 | + final EventLoop eventLoop; |
49 | 50 | final SocketAddress address;
|
50 | 51 | final boolean useSsl;
|
| 52 | + final boolean pipeline; |
51 | 53 |
|
52 | 54 | final GenericFutureListener<Future<? super Object>> onError;
|
53 | 55 | final Queue<Subscriber<? super Message>> subscribers;
|
54 | 56 | final ConcurrentMap<String,Map<String,Subscriber<? super String>>> listeners = new ConcurrentHashMap<>();
|
55 | 57 |
|
56 | 58 | ChannelHandlerContext ctx;
|
57 | 59 |
|
58 |
| - public NettyPgProtocolStream(EventLoopGroup group, SocketAddress address, boolean useSsl) { |
| 60 | + public NettyPgProtocolStream(EventLoopGroup group, SocketAddress address, boolean useSsl, boolean pipeline) { |
59 | 61 | this.group = group;
|
| 62 | + this.eventLoop = group.next(); |
60 | 63 | this.address = address;
|
61 | 64 | this.useSsl = useSsl; // TODO: refactor into SSLConfig with trust parameters
|
| 65 | + this.pipeline = pipeline; |
62 | 66 | this.subscribers = new LinkedBlockingDeque<>(); // TODO: limit pipeline queue depth
|
63 | 67 | this.onError = future -> {
|
64 | 68 | if(!future.isSuccess()) {
|
@@ -101,6 +105,14 @@ public Observable<Message> send(Message... messages) {
|
101 | 105 | return;
|
102 | 106 | }
|
103 | 107 |
|
| 108 | + if(pipeline && !eventLoop.inEventLoop()) { |
| 109 | + eventLoop.submit(() -> { |
| 110 | + pushSubscriber(subscriber); |
| 111 | + write(messages); |
| 112 | + }); |
| 113 | + return; |
| 114 | + } |
| 115 | + |
104 | 116 | pushSubscriber(subscriber);
|
105 | 117 | write(messages);
|
106 | 118 |
|
@@ -136,14 +148,16 @@ public Observable<String> listen(String channel) {
|
136 | 148 |
|
137 | 149 | @Override
|
138 | 150 | public Observable<Void> close() {
|
139 |
| - return Observable.create(subscriber -> ctx.close().addListener(f -> { |
140 |
| - if(!f.isSuccess()) { |
141 |
| - subscriber.onError(f.cause()); |
| 151 | + return Observable.create(subscriber -> |
| 152 | + ctx.writeAndFlush(Terminate.INSTANCE).addListener(written -> |
| 153 | + ctx.close().addListener(closed -> { |
| 154 | + if (!closed.isSuccess()) { |
| 155 | + subscriber.onError(closed.cause()); |
142 | 156 | return;
|
143 | 157 | }
|
144 | 158 | subscriber.onNext(null);
|
145 | 159 | subscriber.onCompleted();
|
146 |
| - })); |
| 160 | + }))); |
147 | 161 | }
|
148 | 162 |
|
149 | 163 | private void pushSubscriber(Subscriber<? super Message> subscriber) {
|
|
0 commit comments