Skip to content

Commit ea274eb

Browse files
committed
Fix decoding issue in Reactor TcpClient
When decoding STOMP messages unread portions of a given input ByteBuf must be kept until more input is received and the next complete STOMP frame can be parsed. In Reactor Net 2.x this was handled for us through the "remainder" field in NettyChannelHandlerBridge. The Reactor Netty 0.6 upgrade however applied only a simple map operator on the input ByteBuf after which the buffer is relased. This commit replaces the use of a simple map operator for decoding and installs a ByteToMessageDecoder in the Netty channel pipeline which has a built-in ability to preserve and merge unread input into subsequent input buffers.
1 parent fdf88c9 commit ea274eb

File tree

5 files changed

+120
-35
lines changed

5 files changed

+120
-35
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,46 @@
1515
*/
1616
package org.springframework.messaging.simp.stomp;
1717

18-
import org.springframework.messaging.tcp.reactor.ReactorNettyCodec;
18+
import java.nio.ByteBuffer;
19+
import java.util.List;
20+
21+
import org.springframework.messaging.Message;
22+
import org.springframework.messaging.tcp.reactor.AbstractNioBufferReactorNettyCodec;
1923

2024
/**
21-
* {@code ReactorNettyCodec} that delegates to {@link StompDecoder} and
22-
* {@link StompEncoder}.
25+
* Simple delegation to StompDecoder and StompEncoder.
2326
*
2427
* @author Rossen Stoyanchev
2528
* @since 5.0
2629
*/
27-
class StompReactorNettyCodec extends ReactorNettyCodec<byte[]> {
30+
class StompReactorNettyCodec extends AbstractNioBufferReactorNettyCodec<byte[]> {
31+
32+
private final StompDecoder decoder;
33+
34+
private final StompEncoder encoder;
35+
2836

2937
public StompReactorNettyCodec() {
30-
this(new StompDecoder(), new StompEncoder());
38+
this(new StompDecoder());
3139
}
3240

3341
public StompReactorNettyCodec(StompDecoder decoder) {
3442
this(decoder, new StompEncoder());
3543
}
3644

3745
public StompReactorNettyCodec(StompDecoder decoder, StompEncoder encoder) {
38-
super(byteBuf -> decoder.decode(byteBuf.nioBuffer()),
39-
(byteBuf, message) -> byteBuf.writeBytes(encoder.encode(message)));
46+
this.decoder = decoder;
47+
this.encoder = encoder;
48+
}
49+
50+
51+
@Override
52+
protected List<Message<byte[]>> decodeInternal(ByteBuffer nioBuffer) {
53+
return this.decoder.decode(nioBuffer);
54+
}
55+
56+
protected ByteBuffer encodeInternal(Message<byte[]> message) {
57+
return ByteBuffer.wrap(this.encoder.encode(message));
4058
}
4159

4260
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2002-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.messaging.tcp.reactor;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.Collection;
20+
import java.util.List;
21+
22+
import io.netty.buffer.ByteBuf;
23+
24+
import org.springframework.messaging.Message;
25+
26+
/**
27+
* Convenient base class for {@link ReactorNettyCodec} implementations that need
28+
* to work with NIO {@link ByteBuffer}s.
29+
*
30+
* @author Rossen Stoyanchev
31+
* @since 5.0
32+
*/
33+
public abstract class AbstractNioBufferReactorNettyCodec<P> implements ReactorNettyCodec<P> {
34+
35+
@Override
36+
public Collection<Message<P>> decode(ByteBuf inputBuffer) {
37+
ByteBuffer nioBuffer = inputBuffer.nioBuffer();
38+
int start = nioBuffer.position();
39+
List<Message<P>> messages = decodeInternal(nioBuffer);
40+
inputBuffer.skipBytes(nioBuffer.position() - start);
41+
return messages;
42+
}
43+
44+
protected abstract List<Message<P>> decodeInternal(ByteBuffer nioBuffer);
45+
46+
@Override
47+
public void encode(Message<P> message, ByteBuf outputBuffer) {
48+
outputBuffer.writeBytes(encodeInternal(message));
49+
}
50+
51+
protected abstract ByteBuffer encodeInternal(Message<P> message);
52+
53+
}

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.netty.buffer.ByteBuf;
2323

2424
import org.springframework.messaging.Message;
25-
import org.springframework.util.Assert;
2625

2726
/**
2827
* Simple holder for a decoding {@link Function} and an encoding
@@ -31,28 +30,20 @@
3130
* @author Rossen Stoyanchev
3231
* @since 5.0
3332
*/
34-
public class ReactorNettyCodec<P> {
35-
36-
private final Function<? super ByteBuf, ? extends Collection<Message<P>>> decoder;
37-
38-
private final BiConsumer<? super ByteBuf, ? super Message<P>> encoder;
39-
40-
41-
public ReactorNettyCodec(Function<? super ByteBuf, ? extends Collection<Message<P>>> decoder,
42-
BiConsumer<? super ByteBuf, ? super Message<P>> encoder) {
43-
44-
Assert.notNull(decoder, "'decoder' is required");
45-
Assert.notNull(encoder, "'encoder' is required");
46-
this.decoder = decoder;
47-
this.encoder = encoder;
48-
}
49-
50-
public Function<? super ByteBuf, ? extends Collection<Message<P>>> getDecoder() {
51-
return this.decoder;
52-
}
53-
54-
public BiConsumer<? super ByteBuf, ? super Message<P>> getEncoder() {
55-
return this.encoder;
56-
}
33+
public interface ReactorNettyCodec<P> {
34+
35+
/**
36+
* Decode the input {@link ByteBuf} into one or more {@link Message}s.
37+
* @param inputBuffer the input buffer to decode from
38+
* @return 0 or more decoded messages
39+
*/
40+
Collection<Message<P>> decode(ByteBuf inputBuffer);
41+
42+
/**
43+
* Encode the given {@link Message} to the output {@link ByteBuf}.
44+
* @param message the message the encode
45+
* @param outputBuffer the buffer to write to
46+
*/
47+
void encode(Message<P> message, ByteBuf outputBuffer);
5748

5849
}

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,18 @@
1616

1717
package org.springframework.messaging.tcp.reactor;
1818

19+
import java.util.Collection;
20+
import java.util.List;
1921
import java.util.function.BiFunction;
2022
import java.util.function.Consumer;
2123
import java.util.function.Function;
2224

25+
import io.netty.buffer.ByteBuf;
26+
import io.netty.channel.ChannelHandlerContext;
2327
import io.netty.channel.group.ChannelGroup;
2428
import io.netty.channel.group.ChannelGroupFuture;
2529
import io.netty.channel.group.DefaultChannelGroup;
30+
import io.netty.handler.codec.ByteToMessageDecoder;
2631
import io.netty.util.concurrent.ImmediateEventExecutor;
2732
import org.reactivestreams.Publisher;
2833
import reactor.core.publisher.DirectProcessor;
@@ -39,6 +44,7 @@
3944
import reactor.ipc.netty.tcp.TcpClient;
4045
import reactor.util.concurrent.QueueSupplier;
4146

47+
import org.springframework.messaging.Message;
4248
import org.springframework.messaging.tcp.ReconnectStrategy;
4349
import org.springframework.messaging.tcp.TcpConnection;
4450
import org.springframework.messaging.tcp.TcpConnectionHandler;
@@ -170,17 +176,19 @@ private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbo
170176
this.connectionHandler = handler;
171177
}
172178

179+
@SuppressWarnings("unchecked")
173180
@Override
174181
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
175182

176183
DirectProcessor<Void> completion = DirectProcessor.create();
177184
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
178185
scheduler.schedule(() -> connectionHandler.afterConnected(connection));
179186

180-
inbound.receive()
181-
.map(codec.getDecoder())
187+
inbound.context().addDecoder(new StompMessageDecoder<>(codec));
188+
189+
inbound.receiveObject()
190+
.cast(Message.class)
182191
.publishOn(scheduler, QueueSupplier.SMALL_BUFFER_SIZE)
183-
.flatMapIterable(Function.identity())
184192
.subscribe(
185193
connectionHandler::handleMessage,
186194
connectionHandler::handleFailure,
@@ -190,4 +198,19 @@ public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
190198
}
191199
}
192200

201+
private static class StompMessageDecoder<P> extends ByteToMessageDecoder {
202+
203+
private final ReactorNettyCodec<P> codec;
204+
205+
public StompMessageDecoder(ReactorNettyCodec<P> codec) {
206+
this.codec = codec;
207+
}
208+
209+
@Override
210+
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
211+
Collection<Message<P>> messages = codec.decode(in);
212+
out.addAll(messages);
213+
}
214+
}
215+
193216
}

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound,
5858
@Override
5959
public ListenableFuture<Void> send(Message<P> message) {
6060
ByteBuf byteBuf = this.outbound.alloc().buffer();
61-
this.codec.getEncoder().accept(byteBuf, message);
61+
this.codec.encode(message, byteBuf);
6262
Mono<Void> sendCompletion = this.outbound.send(Mono.just(byteBuf)).then();
6363
return new MonoToListenableFutureAdapter<>(sendCompletion);
6464
}

0 commit comments

Comments
 (0)