Skip to content

Commit 2a3e013

Browse files
committed
Polish ReactorNettyTcpClient
1 parent e38c020 commit 2a3e013

File tree

2 files changed

+75
-109
lines changed

2 files changed

+75
-109
lines changed

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

Lines changed: 66 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616

1717
package org.springframework.messaging.tcp.reactor;
1818

19-
import java.util.Collection;
2019
import java.util.function.BiFunction;
2120
import java.util.function.Consumer;
2221
import java.util.function.Function;
2322

2423
import io.netty.channel.group.ChannelGroup;
24+
import io.netty.channel.group.ChannelGroupFuture;
2525
import io.netty.channel.group.DefaultChannelGroup;
2626
import io.netty.util.concurrent.ImmediateEventExecutor;
2727
import org.reactivestreams.Publisher;
@@ -39,20 +39,16 @@
3939
import reactor.ipc.netty.tcp.TcpClient;
4040
import reactor.util.concurrent.QueueSupplier;
4141

42-
import org.springframework.messaging.Message;
4342
import org.springframework.messaging.tcp.ReconnectStrategy;
4443
import org.springframework.messaging.tcp.TcpConnection;
4544
import org.springframework.messaging.tcp.TcpConnectionHandler;
4645
import org.springframework.messaging.tcp.TcpOperations;
4746
import org.springframework.util.Assert;
4847
import org.springframework.util.concurrent.ListenableFuture;
48+
import org.springframework.util.concurrent.SettableListenableFuture;
4949

5050
/**
51-
* An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
52-
* based on the TCP client support of the Reactor project.
53-
*
54-
* <p>This implementation wraps N (Reactor) clients for N {@link #connect} calls,
55-
* i.e. a separate (Reactor) client instance for each connection.
51+
* Reactor Netty based implementation of {@link TcpOperations}.
5652
*
5753
* @author Rossen Stoyanchev
5854
* @author Stephane Maldini
@@ -64,55 +60,42 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
6460

6561
private final ReactorNettyCodec<P> codec;
6662

67-
private final Scheduler scheduler = Schedulers.newParallel("ReactorNettyTcpClient");
63+
private final ChannelGroup channelGroup;
6864

69-
private final ChannelGroup group;
65+
private final Scheduler scheduler = Schedulers.newParallel("ReactorNettyTcpClient");
7066

71-
private volatile boolean stopping;
67+
private volatile boolean stopping = false;
7268

7369

7470
/**
75-
* A constructor that creates a {@link TcpClient TcpClient} factory relying on
76-
* Reactor Netty TCP threads. The number of Netty threads can be tweaked with
77-
* the {@code reactor.tcp.ioThreadCount} System property. The network I/O
78-
* threads will be shared amongst the active clients.
79-
* <p>Also see the constructor accepting a {@link Consumer} of
80-
* {@link ClientOptions} for additional options.
81-
* @param host the host to connect to
82-
* @param port the port to connect to
83-
* @param codec for encoding and decoding messages
71+
* Basic constructor with a host and a port.
8472
*/
8573
public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) {
8674
this(opts -> opts.connect(host, port), codec);
8775
}
8876

8977
/**
90-
* A constructor with a configurator {@link Consumer} that will receive default
91-
* {@link ClientOptions} from {@link TcpClient}. This might be used to add SSL
92-
* or specific network parameters to the generated client configuration.
93-
* @param tcpOptions callback for configuring shared {@link ClientOptions}
94-
* @param codec for encoding and decoding messages
78+
* Alternate constructor with a {@link ClientOptions} consumer providing
79+
* additional control beyond a host and a port.
9580
*/
96-
public ReactorNettyTcpClient(Consumer<? super ClientOptions> tcpOptions, ReactorNettyCodec<P> codec) {
97-
Assert.notNull(codec, "'codec' is required");
98-
this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
99-
this.tcpClient = TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group)));
81+
public ReactorNettyTcpClient(Consumer<ClientOptions> consumer, ReactorNettyCodec<P> codec) {
82+
Assert.notNull(consumer, "Consumer<ClientOptions> is required");
83+
Assert.notNull(codec, "ReactorNettyCodec is required");
84+
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
85+
this.tcpClient = TcpClient.create(consumer.andThen(opts -> opts.channelGroup(this.channelGroup)));
10086
this.codec = codec;
10187
}
10288

10389

10490
@Override
10591
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
106-
Assert.notNull(handler, "'handler' is required");
107-
92+
Assert.notNull(handler, "TcpConnectionHandler is required");
10893
if (this.stopping) {
109-
IllegalStateException ex = new IllegalStateException("Shutting down.");
110-
handler.afterConnectFailure(ex);
111-
return new MonoToListenableFutureAdapter<>(Mono.<Void>error(ex));
94+
return handleShuttingDownConnectFailure(handler);
11295
}
11396

11497
Mono<Void> connectMono = this.tcpClient
115-
.newHandler(new MessageHandler<>(handler, this.codec, this.scheduler))
98+
.newHandler(new ReactorNettyHandler(handler))
11699
.doOnError(handler::afterConnectFailure)
117100
.then();
118101

@@ -121,99 +104,89 @@ public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
121104

122105
@Override
123106
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
124-
Assert.notNull(handler, "'handler' is required");
125-
Assert.notNull(strategy, "'reconnectStrategy' is required");
126-
107+
Assert.notNull(handler, "TcpConnectionHandler is required");
108+
Assert.notNull(strategy, "ReconnectStrategy is required");
127109
if (this.stopping) {
128-
IllegalStateException ex = new IllegalStateException("Shutting down.");
129-
handler.afterConnectFailure(ex);
130-
return new MonoToListenableFutureAdapter<>(Mono.<Void>error(ex));
110+
return handleShuttingDownConnectFailure(handler);
131111
}
132112

133113
MonoProcessor<Void> connectMono = MonoProcessor.create();
134-
135-
this.tcpClient.newHandler(new MessageHandler<>(handler, this.codec, this.scheduler))
136-
.doOnNext(item -> {
137-
if (!connectMono.isTerminated()) {
138-
connectMono.onComplete();
139-
}
140-
})
141-
.doOnError(ex -> {
142-
if (!connectMono.isTerminated()) {
143-
connectMono.onError(ex);
144-
}
145-
})
114+
this.tcpClient
115+
.newHandler(new ReactorNettyHandler(handler))
116+
.doOnNext(connectFailureConsumer(connectMono))
117+
.doOnError(connectFailureConsumer(connectMono))
146118
.then(NettyContext::onClose)
147-
.retryWhen(new Reconnector<>(strategy))
148-
.repeatWhen(new Reconnector<>(strategy))
119+
.retryWhen(reconnectFunction(strategy))
120+
.repeatWhen(reconnectFunction(strategy))
149121
.subscribe();
150122

151123
return new MonoToListenableFutureAdapter<>(connectMono);
152124
}
153125

126+
private ListenableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> handler) {
127+
IllegalStateException ex = new IllegalStateException("Shutting down.");
128+
handler.afterConnectFailure(ex);
129+
return new MonoToListenableFutureAdapter<>(Mono.error(ex));
130+
}
131+
132+
private <T> Consumer<T> connectFailureConsumer(MonoProcessor<Void> connectMono) {
133+
return o -> {
134+
if (!connectMono.isTerminated()) {
135+
if (o instanceof Throwable) {
136+
connectMono.onError((Throwable) o);
137+
}
138+
else {
139+
connectMono.onComplete();
140+
}
141+
}
142+
};
143+
}
144+
145+
private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) {
146+
return flux -> flux.scan(1, (count, e) -> count++)
147+
.flatMap(attempt -> Mono.delayMillis(reconnectStrategy.getTimeToNextAttempt(attempt)));
148+
}
149+
154150
@Override
155151
public ListenableFuture<Void> shutdown() {
156152
if (this.stopping) {
157-
return new MonoToListenableFutureAdapter<>(Mono.empty());
153+
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
154+
future.set(null);
155+
return future;
158156
}
159-
160157
this.stopping = true;
161-
162-
Mono<Void> completion = FutureMono.from(this.group.close())
163-
.doAfterTerminate((x, e) -> this.scheduler.dispose());
164-
158+
ChannelGroupFuture future = this.channelGroup.close();
159+
Mono<Void> completion = FutureMono.from(future).doAfterTerminate((x, e) -> scheduler.dispose());
165160
return new MonoToListenableFutureAdapter<>(completion);
166161
}
167162

168163

169-
private static final class MessageHandler<P> implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
164+
private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
170165

171166
private final TcpConnectionHandler<P> connectionHandler;
172167

173-
private final ReactorNettyCodec<P> codec;
174168

175-
private final Scheduler scheduler;
176-
177-
MessageHandler(TcpConnectionHandler<P> handler, ReactorNettyCodec<P> codec, Scheduler scheduler) {
169+
ReactorNettyHandler(TcpConnectionHandler<P> handler) {
178170
this.connectionHandler = handler;
179-
this.codec = codec;
180-
this.scheduler = scheduler;
181171
}
182172

183173
@Override
184-
public Publisher<Void> apply(NettyInbound in, NettyOutbound out) {
185-
Flux<Collection<Message<P>>> inbound = in.receive().map(this.codec.getDecoder());
186-
DirectProcessor<Void> closeProcessor = DirectProcessor.create();
187-
188-
TcpConnection<P> tcpConnection =
189-
new ReactorNettyTcpConnection<>(in, out, this.codec.getEncoder(), closeProcessor);
174+
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
190175

191-
this.scheduler.schedule(() -> connectionHandler.afterConnected(tcpConnection));
192-
inbound = inbound.publishOn(this.scheduler, QueueSupplier.SMALL_BUFFER_SIZE);
176+
DirectProcessor<Void> completion = DirectProcessor.create();
177+
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
178+
scheduler.schedule(() -> connectionHandler.afterConnected(connection));
193179

194-
inbound.flatMapIterable(Function.identity())
180+
inbound.receive()
181+
.map(codec.getDecoder())
182+
.publishOn(scheduler, QueueSupplier.SMALL_BUFFER_SIZE)
183+
.flatMapIterable(Function.identity())
195184
.subscribe(
196185
connectionHandler::handleMessage,
197186
connectionHandler::handleFailure,
198187
connectionHandler::afterConnectionClosed);
199188

200-
return closeProcessor;
201-
}
202-
}
203-
204-
205-
private static final class Reconnector<T> implements Function<Flux<T>, Publisher<?>> {
206-
207-
private final ReconnectStrategy strategy;
208-
209-
Reconnector(ReconnectStrategy strategy) {
210-
this.strategy = strategy;
211-
}
212-
213-
@Override
214-
public Publisher<?> apply(Flux<T> flux) {
215-
return flux.scan(1, (p, e) -> p++)
216-
.flatMap(attempt -> Mono.delayMillis(strategy.getTimeToNextAttempt(attempt)));
189+
return completion;
217190
}
218191
}
219192

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package org.springframework.messaging.tcp.reactor;
1818

19-
import java.util.function.BiConsumer;
20-
2119
import io.netty.buffer.ByteBuf;
2220
import reactor.core.publisher.DirectProcessor;
2321
import reactor.core.publisher.Mono;
@@ -29,10 +27,7 @@
2927
import org.springframework.util.concurrent.ListenableFuture;
3028

3129
/**
32-
* An implementation of {@link org.springframework.messaging.tcp.TcpConnection
33-
* TcpConnection} based on the TCP client support of the Reactor project.
34-
*
35-
* @param <P> the payload type of messages read or written to the TCP stream.
30+
* Reactor Netty based implementation of {@link TcpConnection}.
3631
*
3732
* @author Rossen Stoyanchev
3833
* @since 5.0
@@ -43,29 +38,27 @@ public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
4338

4439
private final NettyOutbound outbound;
4540

46-
private final DirectProcessor<Void> closeProcessor;
41+
private final ReactorNettyCodec<P> codec;
4742

48-
private final BiConsumer<? super ByteBuf, ? super Message<P>> encoder;
43+
private final DirectProcessor<Void> closeProcessor;
4944

5045

5146
public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound,
52-
BiConsumer<? super ByteBuf, ? super Message<P>> encoder,
53-
DirectProcessor<Void> closeProcessor) {
47+
ReactorNettyCodec<P> codec, DirectProcessor<Void> closeProcessor) {
5448

5549
this.inbound = inbound;
5650
this.outbound = outbound;
57-
this.encoder = encoder;
51+
this.codec = codec;
5852
this.closeProcessor = closeProcessor;
5953
}
6054

6155

6256
@Override
6357
public ListenableFuture<Void> send(Message<P> message) {
64-
ByteBuf byteBuf = this.outbound.alloc()
65-
.buffer();
66-
this.encoder.accept(byteBuf, message);
67-
return new MonoToListenableFutureAdapter<>(this.outbound.send(Mono.just(byteBuf))
68-
.then());
58+
ByteBuf byteBuf = this.outbound.alloc().buffer();
59+
this.codec.getEncoder().accept(byteBuf, message);
60+
Mono<Void> sendCompletion = this.outbound.send(Mono.just(byteBuf)).then();
61+
return new MonoToListenableFutureAdapter<>(sendCompletion);
6962
}
7063

7164
@Override

0 commit comments

Comments
 (0)