Skip to content

Commit 698c885

Browse files
committed
Reconnect failures delegated to TcpConnectionHandler
When connecting with a ReconnectStrategy we can only report the outcome of the first connect to the ListenableFuture<Void> return value. Failures for all subsequent attempts to reconnect however must be channeled to TcpConnectHandler#afterConnectFailure which is used in the STOMP broker relay for example to publish BroadcastAvailability(true/false) events.
1 parent ea274eb commit 698c885

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,15 @@ public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, Reconnect
116116
return handleShuttingDownConnectFailure(handler);
117117
}
118118

119+
// Report first connect to the ListenableFuture
119120
MonoProcessor<Void> connectMono = MonoProcessor.create();
121+
120122
this.tcpClient
121123
.newHandler(new ReactorNettyHandler(handler))
122-
.doOnNext(connectFailureConsumer(connectMono))
123-
.doOnError(connectFailureConsumer(connectMono))
124-
.then(NettyContext::onClose)
124+
.doOnNext(updateConnectMono(connectMono))
125+
.doOnError(updateConnectMono(connectMono))
126+
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
127+
.then(NettyContext::onClose) // post-connect issues
125128
.retryWhen(reconnectFunction(strategy))
126129
.repeatWhen(reconnectFunction(strategy))
127130
.subscribe();
@@ -135,7 +138,7 @@ private ListenableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHan
135138
return new MonoToListenableFutureAdapter<>(Mono.error(ex));
136139
}
137140

138-
private <T> Consumer<T> connectFailureConsumer(MonoProcessor<Void> connectMono) {
141+
private <T> Consumer<T> updateConnectMono(MonoProcessor<Void> connectMono) {
139142
return o -> {
140143
if (!connectMono.isTerminated()) {
141144
if (o instanceof Throwable) {

0 commit comments

Comments
 (0)