Skip to content

Commit 6ddacdc

Browse files
committed
Fix issue in simple broker with peer-to-peer messages
Issue: SPR-10930
1 parent 48caeef commit 6ddacdc

File tree

7 files changed

+54
-13
lines changed

7 files changed

+54
-13
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,7 @@ protected void handleMessageInternal(Message<?> message) {
192192
if (SimpMessageType.MESSAGE.equals(messageType)) {
193193
sessionId = (sessionId == null) ? SystemStompRelaySession.ID : sessionId;
194194
headers.setSessionId(sessionId);
195-
command = (command == null) ? StompCommand.SEND : command;
196-
headers.setCommandIfNotSet(command);
195+
headers.updateStompCommandAsClientMessage();
197196
message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
198197
}
199198

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompHeaderAccessor.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.http.MediaType;
2727
import org.springframework.messaging.Message;
2828
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
29+
import org.springframework.messaging.simp.SimpMessageType;
2930
import org.springframework.util.Assert;
3031
import org.springframework.util.CollectionUtils;
3132
import org.springframework.util.StringUtils;
@@ -218,9 +219,29 @@ public Map<String, List<String>> toStompHeaderMap() {
218219
return toNativeHeaderMap();
219220
}
220221

221-
public void setCommandIfNotSet(StompCommand command) {
222+
public void updateStompCommandAsClientMessage() {
223+
224+
Assert.state(SimpMessageType.MESSAGE.equals(getMessageType()),
225+
"Unexpected message type " + getMessage());
226+
222227
if (getCommand() == null) {
223-
setHeader(COMMAND_HEADER, command);
228+
setHeader(COMMAND_HEADER, StompCommand.SEND);
229+
}
230+
else if (!getCommand().equals(StompCommand.SEND)) {
231+
throw new IllegalStateException("Unexpected STOMP command " + getCommand());
232+
}
233+
}
234+
235+
public void updateStompCommandAsServerMessage() {
236+
237+
Assert.state(SimpMessageType.MESSAGE.equals(getMessageType()),
238+
"Unexpected message type " + getMessage());
239+
240+
if ((getCommand() == null) || getCommand().equals(StompCommand.SEND)) {
241+
setHeader(COMMAND_HEADER, StompCommand.MESSAGE);
242+
}
243+
else if (!getCommand().equals(StompCommand.MESSAGE)) {
244+
throw new IllegalStateException("Unexpected STOMP command " + getCommand());
224245
}
225246
}
226247

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,16 +150,16 @@ protected void sendErrorMessage(WebSocketSession session, Throwable error) {
150150
public void handleMessageToClient(WebSocketSession session, Message<?> message) {
151151

152152
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
153-
if (headers.getCommand() == null && SimpMessageType.MESSAGE == headers.getMessageType()) {
154-
headers.setCommandIfNotSet(StompCommand.MESSAGE);
155-
}
156153

157154
if (headers.getMessageType() == SimpMessageType.CONNECT_ACK) {
158155
StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED);
159156
connectedHeaders.setVersion(getVersion(headers));
160157
connectedHeaders.setHeartbeat(0, 0); // no heart-beat support with simple broker
161158
headers = connectedHeaders;
162159
}
160+
else if (SimpMessageType.MESSAGE.equals(headers.getMessageType())) {
161+
headers.updateStompCommandAsServerMessage();
162+
}
163163

164164
if (headers.getCommand() == StompCommand.CONNECTED) {
165165
augmentConnectedHeaders(headers, session);

spring-messaging/src/test/java/org/springframework/messaging/simp/handler/AnnotationMethodIntegrationTests.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ protected Class<?>[] getAnnotatedConfigClasses() {
8080

8181

8282
@Test
83-
public void simpleController() throws Exception {
83+
public void sendMessageToController() throws Exception {
8484

8585
TextMessage message = create(StompCommand.SEND).headers("destination:/app/simple").build();
8686
WebSocketSession session = doHandshake(new TestClientWebSocketHandler(0, message), "/ws").get();
@@ -95,10 +95,10 @@ public void simpleController() throws Exception {
9595
}
9696

9797
@Test
98-
public void incrementController() throws Exception {
98+
public void sendMessageToControllerAndReceiveReplyViaTopic() throws Exception {
9999

100100
TextMessage message1 = create(StompCommand.SUBSCRIBE).headers(
101-
"id:subs1", "destination:/topic/increment").body("5").build();
101+
"id:subs1", "destination:/topic/increment").build();
102102

103103
TextMessage message2 = create(StompCommand.SEND).headers(
104104
"destination:/app/topic/increment").body("5").build();
@@ -114,6 +114,28 @@ public void incrementController() throws Exception {
114114
}
115115
}
116116

117+
// SPR-10930
118+
119+
@Test
120+
public void sendMessageToBrokerAndReceiveReplyViaTopic() throws Exception {
121+
122+
TextMessage message1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
123+
TextMessage message2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();
124+
125+
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message1, message2);
126+
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
127+
128+
try {
129+
assertTrue(clientHandler.latch.await(2, TimeUnit.SECONDS));
130+
131+
String payload = clientHandler.actual.get(0).getPayload();
132+
assertTrue("Expected STOMP Command=MESSAGE, got " + payload, payload.startsWith("MESSAGE\n"));
133+
}
134+
finally {
135+
session.close();
136+
}
137+
}
138+
117139

118140
@IntegrationTestController
119141
static class SimpleController {

spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandlerTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.springframework.messaging.support.MessageBuilder;
3232

3333
import static org.junit.Assert.*;
34-
3534
import static org.mockito.Mockito.*;
3635

3736

spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public void brokerUnvailableErrorFrameOnConnect() throws Exception {
146146

147147
@Test(expected=MessageDeliveryException.class)
148148
public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception {
149-
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE);
149+
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
150150
this.relay.handleMessage(MessageBuilder.withPayloadAndHeaders("test", headers).build());
151151
}
152152

spring-messaging/src/test/resources/log4j.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
</appender>
1313

1414
<logger name="org.springframework.messaging">
15-
<level value="trace" />
15+
<level value="debug" />
1616
</logger>
1717

1818
<logger name="org.apache.activemq">

0 commit comments

Comments
 (0)