Skip to content

Commit e62b104

Browse files
committed
Resource cleanup on shutdown in ReactorNettyTcpClient
1 parent 698c885 commit e62b104

File tree

1 file changed

+59
-6
lines changed

1 file changed

+59
-6
lines changed

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.messaging.tcp.reactor;
1818

19+
import java.lang.reflect.Method;
1920
import java.util.Collection;
2021
import java.util.List;
2122
import java.util.function.BiFunction;
@@ -41,7 +42,10 @@
4142
import reactor.ipc.netty.NettyInbound;
4243
import reactor.ipc.netty.NettyOutbound;
4344
import reactor.ipc.netty.options.ClientOptions;
45+
import reactor.ipc.netty.resources.LoopResources;
46+
import reactor.ipc.netty.resources.PoolResources;
4447
import reactor.ipc.netty.tcp.TcpClient;
48+
import reactor.ipc.netty.tcp.TcpResources;
4549
import reactor.util.concurrent.QueueSupplier;
4650

4751
import org.springframework.messaging.Message;
@@ -50,6 +54,7 @@
5054
import org.springframework.messaging.tcp.TcpConnectionHandler;
5155
import org.springframework.messaging.tcp.TcpOperations;
5256
import org.springframework.util.Assert;
57+
import org.springframework.util.ReflectionUtils;
5358
import org.springframework.util.concurrent.ListenableFuture;
5459
import org.springframework.util.concurrent.SettableListenableFuture;
5560

@@ -68,6 +73,10 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
6873

6974
private final ChannelGroup channelGroup;
7075

76+
private final LoopResources loopResources;
77+
78+
private final PoolResources poolResources;
79+
7180
private final Scheduler scheduler = Schedulers.newParallel("ReactorNettyTcpClient");
7281

7382
private volatile boolean stopping = false;
@@ -84,11 +93,21 @@ public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec)
8493
* Alternate constructor with a {@link ClientOptions} consumer providing
8594
* additional control beyond a host and a port.
8695
*/
87-
public ReactorNettyTcpClient(Consumer<ClientOptions> consumer, ReactorNettyCodec<P> codec) {
88-
Assert.notNull(consumer, "Consumer<ClientOptions> is required");
96+
public ReactorNettyTcpClient(Consumer<ClientOptions> optionsConsumer, ReactorNettyCodec<P> codec) {
97+
Assert.notNull(optionsConsumer, "Consumer<ClientOptions> is required");
8998
Assert.notNull(codec, "ReactorNettyCodec is required");
99+
90100
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
91-
this.tcpClient = TcpClient.create(consumer.andThen(opts -> opts.channelGroup(this.channelGroup)));
101+
this.loopResources = LoopResources.create("reactor-netty-tcp-client");
102+
this.poolResources = PoolResources.fixed("reactor-netty-tcp-pool");
103+
104+
Consumer<ClientOptions> builtInConsumer = opts -> opts
105+
.channelGroup(this.channelGroup)
106+
.loopResources(this.loopResources)
107+
.poolResources(this.poolResources)
108+
.preferNative(false);
109+
110+
this.tcpClient = TcpClient.create(optionsConsumer.andThen(builtInConsumer));
92111
this.codec = codec;
93112
}
94113

@@ -152,7 +171,8 @@ private <T> Consumer<T> updateConnectMono(MonoProcessor<Void> connectMono) {
152171
}
153172

154173
private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) {
155-
return flux -> flux.scan(1, (count, e) -> count++)
174+
return flux -> flux
175+
.scan(1, (count, element) -> count++)
156176
.flatMap(attempt -> Mono.delayMillis(reconnectStrategy.getTimeToNextAttempt(attempt)));
157177
}
158178

@@ -163,12 +183,45 @@ public ListenableFuture<Void> shutdown() {
163183
future.set(null);
164184
return future;
165185
}
186+
166187
this.stopping = true;
167-
ChannelGroupFuture future = this.channelGroup.close();
168-
Mono<Void> completion = FutureMono.from(future).doAfterTerminate((x, e) -> scheduler.dispose());
188+
189+
ChannelGroupFuture close = this.channelGroup.close();
190+
Mono<Void> completion = FutureMono.from(close)
191+
.doAfterTerminate((x, e) -> {
192+
193+
// TODO: https://github.com/reactor/reactor-netty/issues/24
194+
shutdownGlobalResources();
195+
196+
this.loopResources.dispose();
197+
this.poolResources.dispose();
198+
199+
// TODO: https://github.com/reactor/reactor-netty/issues/25
200+
try {
201+
Thread.sleep(2000);
202+
}
203+
catch (InterruptedException ex) {
204+
ex.printStackTrace();
205+
}
206+
207+
// Scheduler after loop resources...
208+
this.scheduler.dispose();
209+
});
210+
169211
return new MonoToListenableFutureAdapter<>(completion);
170212
}
171213

214+
private static void shutdownGlobalResources() {
215+
try {
216+
Method method = TcpResources.class.getDeclaredMethod("_dispose");
217+
ReflectionUtils.makeAccessible(method);
218+
ReflectionUtils.invokeMethod(method, TcpResources.get());
219+
}
220+
catch (NoSuchMethodException ex) {
221+
ex.printStackTrace();
222+
}
223+
}
224+
172225

173226
private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
174227

0 commit comments

Comments
 (0)