16
16
17
17
package org .springframework .messaging .tcp .reactor ;
18
18
19
- import java .util .Collection ;
20
19
import java .util .function .BiFunction ;
21
20
import java .util .function .Consumer ;
22
21
import java .util .function .Function ;
23
22
24
23
import io .netty .channel .group .ChannelGroup ;
24
+ import io .netty .channel .group .ChannelGroupFuture ;
25
25
import io .netty .channel .group .DefaultChannelGroup ;
26
26
import io .netty .util .concurrent .ImmediateEventExecutor ;
27
27
import org .reactivestreams .Publisher ;
39
39
import reactor .ipc .netty .tcp .TcpClient ;
40
40
import reactor .util .concurrent .QueueSupplier ;
41
41
42
- import org .springframework .messaging .Message ;
43
42
import org .springframework .messaging .tcp .ReconnectStrategy ;
44
43
import org .springframework .messaging .tcp .TcpConnection ;
45
44
import org .springframework .messaging .tcp .TcpConnectionHandler ;
46
45
import org .springframework .messaging .tcp .TcpOperations ;
47
46
import org .springframework .util .Assert ;
48
47
import org .springframework .util .concurrent .ListenableFuture ;
48
+ import org .springframework .util .concurrent .SettableListenableFuture ;
49
49
50
50
/**
51
- * An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
52
- * based on the TCP client support of the Reactor project.
53
- *
54
- * <p>This implementation wraps N (Reactor) clients for N {@link #connect} calls,
55
- * i.e. a separate (Reactor) client instance for each connection.
51
+ * Reactor Netty based implementation of {@link TcpOperations}.
56
52
*
57
53
* @author Rossen Stoyanchev
58
54
* @author Stephane Maldini
@@ -64,55 +60,42 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
64
60
65
61
private final ReactorNettyCodec <P > codec ;
66
62
67
- private final Scheduler scheduler = Schedulers . newParallel ( "ReactorNettyTcpClient" ) ;
63
+ private final ChannelGroup channelGroup ;
68
64
69
- private final ChannelGroup group ;
65
+ private final Scheduler scheduler = Schedulers . newParallel ( "ReactorNettyTcpClient" ) ;
70
66
71
- private volatile boolean stopping ;
67
+ private volatile boolean stopping = false ;
72
68
73
69
74
70
/**
75
- * A constructor that creates a {@link TcpClient TcpClient} factory relying on
76
- * Reactor Netty TCP threads. The number of Netty threads can be tweaked with
77
- * the {@code reactor.tcp.ioThreadCount} System property. The network I/O
78
- * threads will be shared amongst the active clients.
79
- * <p>Also see the constructor accepting a {@link Consumer} of
80
- * {@link ClientOptions} for additional options.
81
- * @param host the host to connect to
82
- * @param port the port to connect to
83
- * @param codec for encoding and decoding messages
71
+ * Basic constructor with a host and a port.
84
72
*/
85
73
public ReactorNettyTcpClient (String host , int port , ReactorNettyCodec <P > codec ) {
86
74
this (opts -> opts .connect (host , port ), codec );
87
75
}
88
76
89
77
/**
90
- * A constructor with a configurator {@link Consumer} that will receive default
91
- * {@link ClientOptions} from {@link TcpClient}. This might be used to add SSL
92
- * or specific network parameters to the generated client configuration.
93
- * @param tcpOptions callback for configuring shared {@link ClientOptions}
94
- * @param codec for encoding and decoding messages
78
+ * Alternate constructor with a {@link ClientOptions} consumer providing
79
+ * additional control beyond a host and a port.
95
80
*/
96
- public ReactorNettyTcpClient (Consumer <? super ClientOptions > tcpOptions , ReactorNettyCodec <P > codec ) {
97
- Assert .notNull (codec , "'codec' is required" );
98
- this .group = new DefaultChannelGroup (ImmediateEventExecutor .INSTANCE );
99
- this .tcpClient = TcpClient .create (opts -> tcpOptions .accept (opts .channelGroup (group )));
81
+ public ReactorNettyTcpClient (Consumer <ClientOptions > consumer , ReactorNettyCodec <P > codec ) {
82
+ Assert .notNull (consumer , "Consumer<ClientOptions> is required" );
83
+ Assert .notNull (codec , "ReactorNettyCodec is required" );
84
+ this .channelGroup = new DefaultChannelGroup (ImmediateEventExecutor .INSTANCE );
85
+ this .tcpClient = TcpClient .create (consumer .andThen (opts -> opts .channelGroup (this .channelGroup )));
100
86
this .codec = codec ;
101
87
}
102
88
103
89
104
90
@ Override
105
91
public ListenableFuture <Void > connect (final TcpConnectionHandler <P > handler ) {
106
- Assert .notNull (handler , "'handler' is required" );
107
-
92
+ Assert .notNull (handler , "TcpConnectionHandler is required" );
108
93
if (this .stopping ) {
109
- IllegalStateException ex = new IllegalStateException ("Shutting down." );
110
- handler .afterConnectFailure (ex );
111
- return new MonoToListenableFutureAdapter <>(Mono .<Void >error (ex ));
94
+ return handleShuttingDownConnectFailure (handler );
112
95
}
113
96
114
97
Mono <Void > connectMono = this .tcpClient
115
- .newHandler (new MessageHandler <> (handler , this . codec , this . scheduler ))
98
+ .newHandler (new ReactorNettyHandler (handler ))
116
99
.doOnError (handler ::afterConnectFailure )
117
100
.then ();
118
101
@@ -121,99 +104,89 @@ public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
121
104
122
105
@ Override
123
106
public ListenableFuture <Void > connect (TcpConnectionHandler <P > handler , ReconnectStrategy strategy ) {
124
- Assert .notNull (handler , "'handler' is required" );
125
- Assert .notNull (strategy , "'reconnectStrategy' is required" );
126
-
107
+ Assert .notNull (handler , "TcpConnectionHandler is required" );
108
+ Assert .notNull (strategy , "ReconnectStrategy is required" );
127
109
if (this .stopping ) {
128
- IllegalStateException ex = new IllegalStateException ("Shutting down." );
129
- handler .afterConnectFailure (ex );
130
- return new MonoToListenableFutureAdapter <>(Mono .<Void >error (ex ));
110
+ return handleShuttingDownConnectFailure (handler );
131
111
}
132
112
133
113
MonoProcessor <Void > connectMono = MonoProcessor .create ();
134
-
135
- this .tcpClient .newHandler (new MessageHandler <>(handler , this .codec , this .scheduler ))
136
- .doOnNext (item -> {
137
- if (!connectMono .isTerminated ()) {
138
- connectMono .onComplete ();
139
- }
140
- })
141
- .doOnError (ex -> {
142
- if (!connectMono .isTerminated ()) {
143
- connectMono .onError (ex );
144
- }
145
- })
114
+ this .tcpClient
115
+ .newHandler (new ReactorNettyHandler (handler ))
116
+ .doOnNext (connectFailureConsumer (connectMono ))
117
+ .doOnError (connectFailureConsumer (connectMono ))
146
118
.then (NettyContext ::onClose )
147
- .retryWhen (new Reconnector <> (strategy ))
148
- .repeatWhen (new Reconnector <> (strategy ))
119
+ .retryWhen (reconnectFunction (strategy ))
120
+ .repeatWhen (reconnectFunction (strategy ))
149
121
.subscribe ();
150
122
151
123
return new MonoToListenableFutureAdapter <>(connectMono );
152
124
}
153
125
126
+ private ListenableFuture <Void > handleShuttingDownConnectFailure (TcpConnectionHandler <P > handler ) {
127
+ IllegalStateException ex = new IllegalStateException ("Shutting down." );
128
+ handler .afterConnectFailure (ex );
129
+ return new MonoToListenableFutureAdapter <>(Mono .error (ex ));
130
+ }
131
+
132
+ private <T > Consumer <T > connectFailureConsumer (MonoProcessor <Void > connectMono ) {
133
+ return o -> {
134
+ if (!connectMono .isTerminated ()) {
135
+ if (o instanceof Throwable ) {
136
+ connectMono .onError ((Throwable ) o );
137
+ }
138
+ else {
139
+ connectMono .onComplete ();
140
+ }
141
+ }
142
+ };
143
+ }
144
+
145
+ private <T > Function <Flux <T >, Publisher <?>> reconnectFunction (ReconnectStrategy reconnectStrategy ) {
146
+ return flux -> flux .scan (1 , (count , e ) -> count ++)
147
+ .flatMap (attempt -> Mono .delayMillis (reconnectStrategy .getTimeToNextAttempt (attempt )));
148
+ }
149
+
154
150
@ Override
155
151
public ListenableFuture <Void > shutdown () {
156
152
if (this .stopping ) {
157
- return new MonoToListenableFutureAdapter <>(Mono .empty ());
153
+ SettableListenableFuture <Void > future = new SettableListenableFuture <>();
154
+ future .set (null );
155
+ return future ;
158
156
}
159
-
160
157
this .stopping = true ;
161
-
162
- Mono <Void > completion = FutureMono .from (this .group .close ())
163
- .doAfterTerminate ((x , e ) -> this .scheduler .dispose ());
164
-
158
+ ChannelGroupFuture future = this .channelGroup .close ();
159
+ Mono <Void > completion = FutureMono .from (future ).doAfterTerminate ((x , e ) -> scheduler .dispose ());
165
160
return new MonoToListenableFutureAdapter <>(completion );
166
161
}
167
162
168
163
169
- private static final class MessageHandler < P > implements BiFunction <NettyInbound , NettyOutbound , Publisher <Void >> {
164
+ private class ReactorNettyHandler implements BiFunction <NettyInbound , NettyOutbound , Publisher <Void >> {
170
165
171
166
private final TcpConnectionHandler <P > connectionHandler ;
172
167
173
- private final ReactorNettyCodec <P > codec ;
174
168
175
- private final Scheduler scheduler ;
176
-
177
- MessageHandler (TcpConnectionHandler <P > handler , ReactorNettyCodec <P > codec , Scheduler scheduler ) {
169
+ ReactorNettyHandler (TcpConnectionHandler <P > handler ) {
178
170
this .connectionHandler = handler ;
179
- this .codec = codec ;
180
- this .scheduler = scheduler ;
181
171
}
182
172
183
173
@ Override
184
- public Publisher <Void > apply (NettyInbound in , NettyOutbound out ) {
185
- Flux <Collection <Message <P >>> inbound = in .receive ().map (this .codec .getDecoder ());
186
- DirectProcessor <Void > closeProcessor = DirectProcessor .create ();
187
-
188
- TcpConnection <P > tcpConnection =
189
- new ReactorNettyTcpConnection <>(in , out , this .codec .getEncoder (), closeProcessor );
174
+ public Publisher <Void > apply (NettyInbound inbound , NettyOutbound outbound ) {
190
175
191
- this .scheduler .schedule (() -> connectionHandler .afterConnected (tcpConnection ));
192
- inbound = inbound .publishOn (this .scheduler , QueueSupplier .SMALL_BUFFER_SIZE );
176
+ DirectProcessor <Void > completion = DirectProcessor .create ();
177
+ TcpConnection <P > connection = new ReactorNettyTcpConnection <>(inbound , outbound , codec , completion );
178
+ scheduler .schedule (() -> connectionHandler .afterConnected (connection ));
193
179
194
- inbound .flatMapIterable (Function .identity ())
180
+ inbound .receive ()
181
+ .map (codec .getDecoder ())
182
+ .publishOn (scheduler , QueueSupplier .SMALL_BUFFER_SIZE )
183
+ .flatMapIterable (Function .identity ())
195
184
.subscribe (
196
185
connectionHandler ::handleMessage ,
197
186
connectionHandler ::handleFailure ,
198
187
connectionHandler ::afterConnectionClosed );
199
188
200
- return closeProcessor ;
201
- }
202
- }
203
-
204
-
205
- private static final class Reconnector <T > implements Function <Flux <T >, Publisher <?>> {
206
-
207
- private final ReconnectStrategy strategy ;
208
-
209
- Reconnector (ReconnectStrategy strategy ) {
210
- this .strategy = strategy ;
211
- }
212
-
213
- @ Override
214
- public Publisher <?> apply (Flux <T > flux ) {
215
- return flux .scan (1 , (p , e ) -> p ++)
216
- .flatMap (attempt -> Mono .delayMillis (strategy .getTimeToNextAttempt (attempt )));
189
+ return completion ;
217
190
}
218
191
}
219
192
0 commit comments