Skip to content

Commit b2f31a3

Browse files
wilkinsonarstoyanchev
authored andcommitted
Improve handling of send failures
Prior to this commit, a failure to send a heartbeat was ignored and a failure to forward a message to the broker would result in an error frame being sent but nothing more. Following this commit, a failure to send a heartbeat to the broker is treated as a TCP client failure. Furthermore, if the system relay session fails to forward a message to the broker an exception is thrown. Typically, the system relay session will be forwarding messages on behalf of local application code, rather than a remote WebSocket client. Throwing an exception allows the application code to be notified of the problem directly, rather than via a broker availability event.
1 parent 9b2c041 commit b2f31a3

File tree

2 files changed

+64
-39
lines changed

2 files changed

+64
-39
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

Lines changed: 57 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.springframework.messaging.Message;
2626
import org.springframework.messaging.MessageChannel;
27+
import org.springframework.messaging.MessageDeliveryException;
2728
import org.springframework.messaging.MessageHandler;
2829
import org.springframework.messaging.simp.SimpMessageType;
2930
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
@@ -205,34 +206,29 @@ protected void handleMessageInternal(Message<?> message) {
205206
return;
206207
}
207208

208-
try {
209-
if (SimpMessageType.CONNECT.equals(messageType)) {
210-
message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
211-
StompRelaySession session = new StompRelaySession(sessionId);
212-
this.relaySessions.put(sessionId, session);
213-
session.connect(message);
214-
}
215-
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
216-
StompRelaySession session = this.relaySessions.remove(sessionId);
217-
if (session == null) {
218-
if (logger.isTraceEnabled()) {
219-
logger.trace("Session already removed, sessionId=" + sessionId);
220-
}
221-
return;
222-
}
223-
session.forward(message);
224-
}
225-
else {
226-
StompRelaySession session = this.relaySessions.get(sessionId);
227-
if (session == null) {
228-
logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message);
229-
return;
209+
if (SimpMessageType.CONNECT.equals(messageType)) {
210+
message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
211+
StompRelaySession session = new StompRelaySession(sessionId);
212+
this.relaySessions.put(sessionId, session);
213+
session.connect(message);
214+
}
215+
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
216+
StompRelaySession session = this.relaySessions.remove(sessionId);
217+
if (session == null) {
218+
if (logger.isTraceEnabled()) {
219+
logger.trace("Session already removed, sessionId=" + sessionId);
230220
}
231-
session.forward(message);
221+
return;
232222
}
223+
session.forward(message);
233224
}
234-
catch (Throwable t) {
235-
logger.error("Failed to handle message " + message, t);
225+
else {
226+
StompRelaySession session = this.relaySessions.get(sessionId);
227+
if (session == null) {
228+
logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message);
229+
return;
230+
}
231+
session.forward(message);
236232
}
237233
}
238234

@@ -323,7 +319,7 @@ protected void connected(StompHeaderAccessor headers, StompConnection stompConne
323319
publishBrokerAvailableEvent();
324320
}
325321

326-
private void handleTcpClientFailure(String message, Throwable ex) {
322+
protected void handleTcpClientFailure(String message, Throwable ex) {
327323
if (logger.isErrorEnabled()) {
328324
logger.error(message + ", sessionId=" + this.sessionId, ex);
329325
}
@@ -348,13 +344,21 @@ protected void sendMessageToClient(Message<?> message) {
348344
messageChannel.send(message);
349345
}
350346

351-
public void forward(Message<?> message) {
347+
private void forward(Message<?> message) {
352348
TcpConnection<Message<byte[]>, Message<byte[]>> tcpConnection = this.stompConnection.getReadyConnection();
353349
if (tcpConnection == null) {
354-
logger.warn("Connection to STOMP broker is not active, discarding message: " + message);
355-
return;
350+
logger.warn("Connection to STOMP broker is not active");
351+
handleForwardFailure(message);
352+
}
353+
else if (!forwardInternal(tcpConnection, message)) {
354+
handleForwardFailure(message);
355+
}
356+
}
357+
358+
protected void handleForwardFailure(Message<?> message) {
359+
if (logger.isWarnEnabled()) {
360+
logger.warn("Failed to forward message to the broker. message=" + message);
356361
}
357-
forwardInternal(tcpConnection, message);
358362
}
359363

360364
private boolean forwardInternal(
@@ -491,32 +495,40 @@ protected void connectionClosed() {
491495

492496
@Override
493497
protected void connected(StompHeaderAccessor headers, final StompConnection stompConnection) {
494-
long brokerReceiveInterval = headers.getHeartbeat()[1];
495498

496-
if (HEARTBEAT_SEND_INTERVAL > 0 && brokerReceiveInterval > 0) {
499+
long brokerReceiveInterval = headers.getHeartbeat()[1];
500+
if ((HEARTBEAT_SEND_INTERVAL > 0) && (brokerReceiveInterval > 0)) {
497501
long interval = Math.max(HEARTBEAT_SEND_INTERVAL, brokerReceiveInterval);
498502
stompConnection.connection.on().writeIdle(interval, new Runnable() {
499503

500504
@Override
501505
public void run() {
502-
TcpConnection<Message<byte[]>, Message<byte[]>> connection = stompConnection.connection;
503-
if (connection != null) {
504-
connection.send(MessageBuilder.withPayload(heartbeatPayload).build());
506+
TcpConnection<Message<byte[]>, Message<byte[]>> tcpConn = stompConnection.connection;
507+
if (tcpConn != null) {
508+
tcpConn.send(MessageBuilder.withPayload(heartbeatPayload).build(),
509+
new Consumer<Boolean>() {
510+
@Override
511+
public void accept(Boolean t) {
512+
handleTcpClientFailure("Failed to send heartbeat to the broker", null);
513+
}
514+
});
505515
}
506516
}
507-
508517
});
509518
}
510519

511520
long brokerSendInterval = headers.getHeartbeat()[0];
512521
if (HEARTBEAT_RECEIVE_INTERVAL > 0 && brokerSendInterval > 0) {
513-
final long interval =
514-
Math.max(HEARTBEAT_RECEIVE_INTERVAL, brokerSendInterval) * HEARTBEAT_RECEIVE_MULTIPLIER;
522+
final long interval = Math.max(HEARTBEAT_RECEIVE_INTERVAL, brokerSendInterval)
523+
* HEARTBEAT_RECEIVE_MULTIPLIER;
515524
stompConnection.connection.on().readIdle(interval, new Runnable() {
525+
516526
@Override
517527
public void run() {
518528
String message = "Broker hearbeat missed: connection idle for more than " + interval + "ms";
519-
logger.warn(message);
529+
if (logger.isWarnEnabled()) {
530+
logger.warn(message);
531+
}
520532
disconnected(message);
521533
}
522534
});
@@ -537,6 +549,12 @@ protected void sendMessageToClient(Message<?> message) {
537549
// Ignore
538550
}
539551
}
552+
553+
@Override
554+
protected void handleForwardFailure(Message<?> message) {
555+
super.handleForwardFailure(message);
556+
throw new MessageDeliveryException(message);
557+
}
540558
}
541559

542560
}

spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.context.ApplicationEvent;
3434
import org.springframework.context.ApplicationEventPublisher;
3535
import org.springframework.messaging.Message;
36+
import org.springframework.messaging.MessageDeliveryException;
3637
import org.springframework.messaging.MessageHandler;
3738
import org.springframework.messaging.MessagingException;
3839
import org.springframework.messaging.simp.BrokerAvailabilityEvent;
@@ -143,6 +144,12 @@ public void brokerUnvailableErrorFrameOnConnect() throws Exception {
143144
this.responseHandler.awaitAndAssert();
144145
}
145146

147+
@Test(expected=MessageDeliveryException.class)
148+
public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception {
149+
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE);
150+
this.relay.handleMessage(MessageBuilder.withPayloadAndHeaders("test", headers).build());
151+
}
152+
146153
@Test
147154
public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception {
148155

0 commit comments

Comments
 (0)