24
24
25
25
import org .springframework .messaging .Message ;
26
26
import org .springframework .messaging .MessageChannel ;
27
+ import org .springframework .messaging .MessageDeliveryException ;
27
28
import org .springframework .messaging .MessageHandler ;
28
29
import org .springframework .messaging .simp .SimpMessageType ;
29
30
import org .springframework .messaging .simp .handler .AbstractBrokerMessageHandler ;
@@ -205,34 +206,29 @@ protected void handleMessageInternal(Message<?> message) {
205
206
return ;
206
207
}
207
208
208
- try {
209
- if (SimpMessageType .CONNECT .equals (messageType )) {
210
- message = MessageBuilder .withPayloadAndHeaders (message .getPayload (), headers ).build ();
211
- StompRelaySession session = new StompRelaySession (sessionId );
212
- this .relaySessions .put (sessionId , session );
213
- session .connect (message );
214
- }
215
- else if (SimpMessageType .DISCONNECT .equals (messageType )) {
216
- StompRelaySession session = this .relaySessions .remove (sessionId );
217
- if (session == null ) {
218
- if (logger .isTraceEnabled ()) {
219
- logger .trace ("Session already removed, sessionId=" + sessionId );
220
- }
221
- return ;
222
- }
223
- session .forward (message );
224
- }
225
- else {
226
- StompRelaySession session = this .relaySessions .get (sessionId );
227
- if (session == null ) {
228
- logger .warn ("Session id=" + sessionId + " not found. Ignoring message: " + message );
229
- return ;
209
+ if (SimpMessageType .CONNECT .equals (messageType )) {
210
+ message = MessageBuilder .withPayloadAndHeaders (message .getPayload (), headers ).build ();
211
+ StompRelaySession session = new StompRelaySession (sessionId );
212
+ this .relaySessions .put (sessionId , session );
213
+ session .connect (message );
214
+ }
215
+ else if (SimpMessageType .DISCONNECT .equals (messageType )) {
216
+ StompRelaySession session = this .relaySessions .remove (sessionId );
217
+ if (session == null ) {
218
+ if (logger .isTraceEnabled ()) {
219
+ logger .trace ("Session already removed, sessionId=" + sessionId );
230
220
}
231
- session . forward ( message ) ;
221
+ return ;
232
222
}
223
+ session .forward (message );
233
224
}
234
- catch (Throwable t ) {
235
- logger .error ("Failed to handle message " + message , t );
225
+ else {
226
+ StompRelaySession session = this .relaySessions .get (sessionId );
227
+ if (session == null ) {
228
+ logger .warn ("Session id=" + sessionId + " not found. Ignoring message: " + message );
229
+ return ;
230
+ }
231
+ session .forward (message );
236
232
}
237
233
}
238
234
@@ -323,7 +319,7 @@ protected void connected(StompHeaderAccessor headers, StompConnection stompConne
323
319
publishBrokerAvailableEvent ();
324
320
}
325
321
326
- private void handleTcpClientFailure (String message , Throwable ex ) {
322
+ protected void handleTcpClientFailure (String message , Throwable ex ) {
327
323
if (logger .isErrorEnabled ()) {
328
324
logger .error (message + ", sessionId=" + this .sessionId , ex );
329
325
}
@@ -348,13 +344,21 @@ protected void sendMessageToClient(Message<?> message) {
348
344
messageChannel .send (message );
349
345
}
350
346
351
- public void forward (Message <?> message ) {
347
+ private void forward (Message <?> message ) {
352
348
TcpConnection <Message <byte []>, Message <byte []>> tcpConnection = this .stompConnection .getReadyConnection ();
353
349
if (tcpConnection == null ) {
354
- logger .warn ("Connection to STOMP broker is not active, discarding message: " + message );
355
- return ;
350
+ logger .warn ("Connection to STOMP broker is not active" );
351
+ handleForwardFailure (message );
352
+ }
353
+ else if (!forwardInternal (tcpConnection , message )) {
354
+ handleForwardFailure (message );
355
+ }
356
+ }
357
+
358
+ protected void handleForwardFailure (Message <?> message ) {
359
+ if (logger .isWarnEnabled ()) {
360
+ logger .warn ("Failed to forward message to the broker. message=" + message );
356
361
}
357
- forwardInternal (tcpConnection , message );
358
362
}
359
363
360
364
private boolean forwardInternal (
@@ -491,32 +495,40 @@ protected void connectionClosed() {
491
495
492
496
@ Override
493
497
protected void connected (StompHeaderAccessor headers , final StompConnection stompConnection ) {
494
- long brokerReceiveInterval = headers .getHeartbeat ()[1 ];
495
498
496
- if (HEARTBEAT_SEND_INTERVAL > 0 && brokerReceiveInterval > 0 ) {
499
+ long brokerReceiveInterval = headers .getHeartbeat ()[1 ];
500
+ if ((HEARTBEAT_SEND_INTERVAL > 0 ) && (brokerReceiveInterval > 0 )) {
497
501
long interval = Math .max (HEARTBEAT_SEND_INTERVAL , brokerReceiveInterval );
498
502
stompConnection .connection .on ().writeIdle (interval , new Runnable () {
499
503
500
504
@ Override
501
505
public void run () {
502
- TcpConnection <Message <byte []>, Message <byte []>> connection = stompConnection .connection ;
503
- if (connection != null ) {
504
- connection .send (MessageBuilder .withPayload (heartbeatPayload ).build ());
506
+ TcpConnection <Message <byte []>, Message <byte []>> tcpConn = stompConnection .connection ;
507
+ if (tcpConn != null ) {
508
+ tcpConn .send (MessageBuilder .withPayload (heartbeatPayload ).build (),
509
+ new Consumer <Boolean >() {
510
+ @ Override
511
+ public void accept (Boolean t ) {
512
+ handleTcpClientFailure ("Failed to send heartbeat to the broker" , null );
513
+ }
514
+ });
505
515
}
506
516
}
507
-
508
517
});
509
518
}
510
519
511
520
long brokerSendInterval = headers .getHeartbeat ()[0 ];
512
521
if (HEARTBEAT_RECEIVE_INTERVAL > 0 && brokerSendInterval > 0 ) {
513
- final long interval =
514
- Math . max ( HEARTBEAT_RECEIVE_INTERVAL , brokerSendInterval ) * HEARTBEAT_RECEIVE_MULTIPLIER ;
522
+ final long interval = Math . max ( HEARTBEAT_RECEIVE_INTERVAL , brokerSendInterval )
523
+ * HEARTBEAT_RECEIVE_MULTIPLIER ;
515
524
stompConnection .connection .on ().readIdle (interval , new Runnable () {
525
+
516
526
@ Override
517
527
public void run () {
518
528
String message = "Broker hearbeat missed: connection idle for more than " + interval + "ms" ;
519
- logger .warn (message );
529
+ if (logger .isWarnEnabled ()) {
530
+ logger .warn (message );
531
+ }
520
532
disconnected (message );
521
533
}
522
534
});
@@ -537,6 +549,12 @@ protected void sendMessageToClient(Message<?> message) {
537
549
// Ignore
538
550
}
539
551
}
552
+
553
+ @ Override
554
+ protected void handleForwardFailure (Message <?> message ) {
555
+ super .handleForwardFailure (message );
556
+ throw new MessageDeliveryException (message );
557
+ }
540
558
}
541
559
542
560
}
0 commit comments