16
16
17
17
package org .springframework .messaging .tcp .reactor ;
18
18
19
+ import java .lang .reflect .Method ;
19
20
import java .util .Collection ;
20
21
import java .util .List ;
21
22
import java .util .function .BiFunction ;
41
42
import reactor .ipc .netty .NettyInbound ;
42
43
import reactor .ipc .netty .NettyOutbound ;
43
44
import reactor .ipc .netty .options .ClientOptions ;
45
+ import reactor .ipc .netty .resources .LoopResources ;
46
+ import reactor .ipc .netty .resources .PoolResources ;
44
47
import reactor .ipc .netty .tcp .TcpClient ;
48
+ import reactor .ipc .netty .tcp .TcpResources ;
45
49
import reactor .util .concurrent .QueueSupplier ;
46
50
47
51
import org .springframework .messaging .Message ;
50
54
import org .springframework .messaging .tcp .TcpConnectionHandler ;
51
55
import org .springframework .messaging .tcp .TcpOperations ;
52
56
import org .springframework .util .Assert ;
57
+ import org .springframework .util .ReflectionUtils ;
53
58
import org .springframework .util .concurrent .ListenableFuture ;
54
59
import org .springframework .util .concurrent .SettableListenableFuture ;
55
60
@@ -68,6 +73,10 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
68
73
69
74
private final ChannelGroup channelGroup ;
70
75
76
+ private final LoopResources loopResources ;
77
+
78
+ private final PoolResources poolResources ;
79
+
71
80
private final Scheduler scheduler = Schedulers .newParallel ("ReactorNettyTcpClient" );
72
81
73
82
private volatile boolean stopping = false ;
@@ -84,11 +93,21 @@ public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec)
84
93
* Alternate constructor with a {@link ClientOptions} consumer providing
85
94
* additional control beyond a host and a port.
86
95
*/
87
- public ReactorNettyTcpClient (Consumer <ClientOptions > consumer , ReactorNettyCodec <P > codec ) {
88
- Assert .notNull (consumer , "Consumer<ClientOptions> is required" );
96
+ public ReactorNettyTcpClient (Consumer <ClientOptions > optionsConsumer , ReactorNettyCodec <P > codec ) {
97
+ Assert .notNull (optionsConsumer , "Consumer<ClientOptions> is required" );
89
98
Assert .notNull (codec , "ReactorNettyCodec is required" );
99
+
90
100
this .channelGroup = new DefaultChannelGroup (ImmediateEventExecutor .INSTANCE );
91
- this .tcpClient = TcpClient .create (consumer .andThen (opts -> opts .channelGroup (this .channelGroup )));
101
+ this .loopResources = LoopResources .create ("reactor-netty-tcp-client" );
102
+ this .poolResources = PoolResources .fixed ("reactor-netty-tcp-pool" );
103
+
104
+ Consumer <ClientOptions > builtInConsumer = opts -> opts
105
+ .channelGroup (this .channelGroup )
106
+ .loopResources (this .loopResources )
107
+ .poolResources (this .poolResources )
108
+ .preferNative (false );
109
+
110
+ this .tcpClient = TcpClient .create (optionsConsumer .andThen (builtInConsumer ));
92
111
this .codec = codec ;
93
112
}
94
113
@@ -152,7 +171,8 @@ private <T> Consumer<T> updateConnectMono(MonoProcessor<Void> connectMono) {
152
171
}
153
172
154
173
private <T > Function <Flux <T >, Publisher <?>> reconnectFunction (ReconnectStrategy reconnectStrategy ) {
155
- return flux -> flux .scan (1 , (count , e ) -> count ++)
174
+ return flux -> flux
175
+ .scan (1 , (count , element ) -> count ++)
156
176
.flatMap (attempt -> Mono .delayMillis (reconnectStrategy .getTimeToNextAttempt (attempt )));
157
177
}
158
178
@@ -163,12 +183,45 @@ public ListenableFuture<Void> shutdown() {
163
183
future .set (null );
164
184
return future ;
165
185
}
186
+
166
187
this .stopping = true ;
167
- ChannelGroupFuture future = this .channelGroup .close ();
168
- Mono <Void > completion = FutureMono .from (future ).doAfterTerminate ((x , e ) -> scheduler .dispose ());
188
+
189
+ ChannelGroupFuture close = this .channelGroup .close ();
190
+ Mono <Void > completion = FutureMono .from (close )
191
+ .doAfterTerminate ((x , e ) -> {
192
+
193
+ // TODO: https://github.com/reactor/reactor-netty/issues/24
194
+ shutdownGlobalResources ();
195
+
196
+ this .loopResources .dispose ();
197
+ this .poolResources .dispose ();
198
+
199
+ // TODO: https://github.com/reactor/reactor-netty/issues/25
200
+ try {
201
+ Thread .sleep (2000 );
202
+ }
203
+ catch (InterruptedException ex ) {
204
+ ex .printStackTrace ();
205
+ }
206
+
207
+ // Scheduler after loop resources...
208
+ this .scheduler .dispose ();
209
+ });
210
+
169
211
return new MonoToListenableFutureAdapter <>(completion );
170
212
}
171
213
214
+ private static void shutdownGlobalResources () {
215
+ try {
216
+ Method method = TcpResources .class .getDeclaredMethod ("_dispose" );
217
+ ReflectionUtils .makeAccessible (method );
218
+ ReflectionUtils .invokeMethod (method , TcpResources .get ());
219
+ }
220
+ catch (NoSuchMethodException ex ) {
221
+ ex .printStackTrace ();
222
+ }
223
+ }
224
+
172
225
173
226
private class ReactorNettyHandler implements BiFunction <NettyInbound , NettyOutbound , Publisher <Void >> {
174
227
0 commit comments