Skip to content

Commit a7f735b

Browse files
wilkinsonarstoyanchev
authored andcommitted
Make the broker relay heartbeat intervals configurable
Prior to this commit, the intervals at which the broker relay's system session would send heartbeats to the STOMP broker and expect to receive heartbeats from the STOMP broker were hard-coded at 10 seconds. This commit makes the intervals configurable, with 10 seconds being the default value.
1 parent ba11af7 commit a7f735b

File tree

3 files changed

+93
-16
lines changed

3 files changed

+93
-16
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
3737

3838
private String applicationPasscode = "guest";
3939

40+
private Long systemHeartbeatSendInterval;
41+
42+
private Long systemHeartbeatReceiveInterval;
43+
4044
private boolean autoStartup = true;
4145

4246

@@ -63,7 +67,7 @@ public StompBrokerRelayRegistration setRelayPort(int relayPort) {
6367
}
6468

6569
/**
66-
* Set the login for a "system" TCP connection used to send messages to the STOMP
70+
* Set the login for the "system" relay session used to send messages to the STOMP
6771
* broker without having a client session (e.g. REST/HTTP request handling method).
6872
*/
6973
public StompBrokerRelayRegistration setApplicationLogin(String login) {
@@ -73,7 +77,7 @@ public StompBrokerRelayRegistration setApplicationLogin(String login) {
7377
}
7478

7579
/**
76-
* Set the passcode for a "system" TCP connection used to send messages to the STOMP
80+
* Set the passcode for the "system" relay session used to send messages to the STOMP
7781
* broker without having a client session (e.g. REST/HTTP request handling method).
7882
*/
7983
public StompBrokerRelayRegistration setApplicationPasscode(String passcode) {
@@ -82,6 +86,31 @@ public StompBrokerRelayRegistration setApplicationPasscode(String passcode) {
8286
return this;
8387
}
8488

89+
/**
90+
* Set the interval, in milliseconds, at which the "system" relay session will,
91+
* in the absence of any other data being sent, send a heartbeat to the STOMP broker.
92+
* A value of zero will prevent heartbeats from being sent to the broker.
93+
* <p>
94+
* The default value is 10000.
95+
*/
96+
public StompBrokerRelayRegistration setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
97+
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
98+
return this;
99+
}
100+
101+
/**
102+
* Set the maximum interval, in milliseconds, at which the "system" relay session
103+
* expects, in the absence of any other data, to receive a heartbeat from the STOMP
104+
* broker. A value of zero will configure the relay session to expect not to receive
105+
* heartbeats from the broker.
106+
* <p>
107+
* The default value is 10000.
108+
*/
109+
public StompBrokerRelayRegistration setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
110+
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
111+
return this;
112+
}
113+
85114
/**
86115
* Configure whether the {@link StompBrokerRelayMessageHandler} should start
87116
* automatically when the Spring ApplicationContext is refreshed.
@@ -101,6 +130,12 @@ protected StompBrokerRelayMessageHandler getMessageHandler() {
101130
handler.setRelayPort(this.relayPort);
102131
handler.setSystemLogin(this.applicationLogin);
103132
handler.setSystemPasscode(this.applicationPasscode);
133+
if (this.systemHeartbeatSendInterval != null) {
134+
handler.setSystemHeartbeatSendInterval(this.systemHeartbeatSendInterval);
135+
}
136+
if (this.systemHeartbeatReceiveInterval != null) {
137+
handler.setSystemHeartbeatReceiveInterval(this.systemHeartbeatReceiveInterval);
138+
}
104139
handler.setAutoStartup(this.autoStartup);
105140
return handler;
106141
}

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
6767

6868
private String systemPasscode = "guest";
6969

70+
private long systemHeartbeatSendInterval = 10000;
71+
72+
private long systemHeartbeatReceiveInterval = 10000;
73+
7074
private Environment environment;
7175

7276
private TcpClient<Message<byte[]>, Message<byte[]>> tcpClient;
@@ -116,7 +120,46 @@ public int getRelayPort() {
116120
}
117121

118122
/**
119-
* Set the login for a "system" TCP connection used to send messages to the STOMP
123+
* Set the interval, in milliseconds, at which the "system" relay session will,
124+
* in the absence of any other data being sent, send a heartbeat to the STOMP broker.
125+
* A value of zero will prevent heartbeats from being sent to the broker.
126+
* <p>
127+
* The default value is 10000.
128+
*/
129+
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
130+
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
131+
}
132+
133+
/**
134+
* @return The interval, in milliseconds, at which the "system" relay session will
135+
* send heartbeats to the STOMP broker.
136+
*/
137+
public long getSystemHeartbeatSendInterval() {
138+
return this.systemHeartbeatSendInterval;
139+
}
140+
141+
/**
142+
* Set the maximum interval, in milliseconds, at which the "system" relay session
143+
* expects, in the absence of any other data, to receive a heartbeat from the STOMP
144+
* broker. A value of zero will configure the relay session to expect not to receive
145+
* heartbeats from the broker.
146+
* <p>
147+
* The default value is 10000.
148+
*/
149+
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
150+
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
151+
}
152+
153+
/**
154+
* @return The interval, in milliseconds, at which the "system" relay session expects
155+
* to receive heartbeats from the STOMP broker.
156+
*/
157+
public long getSystemHeartbeatReceiveInterval() {
158+
return this.systemHeartbeatReceiveInterval;
159+
}
160+
161+
/**
162+
* Set the login for the "system" relay session used to send messages to the STOMP
120163
* broker without having a client session (e.g. REST/HTTP request handling method).
121164
*/
122165
public void setSystemLogin(String systemLogin) {
@@ -125,22 +168,22 @@ public void setSystemLogin(String systemLogin) {
125168
}
126169

127170
/**
128-
* @return the login for a shared, "system" connection to the STOMP message broker.
171+
* @return the login used by the "system" relay session to connect to the STOMP broker
129172
*/
130173
public String getSystemLogin() {
131174
return this.systemLogin;
132175
}
133176

134177
/**
135-
* Set the passcode for a "system" TCP connection used to send messages to the STOMP
178+
* Set the passcode for the "system" relay session used to send messages to the STOMP
136179
* broker without having a client session (e.g. REST/HTTP request handling method).
137180
*/
138181
public void setSystemPasscode(String systemPasscode) {
139182
this.systemPasscode = systemPasscode;
140183
}
141184

142185
/**
143-
* @return the passcode for a shared, "system" connection to the STOMP message broker.
186+
* @return the passcode used by the "system" relay session to connect to the STOMP broker
144187
*/
145188
public String getSystemPasscode() {
146189
return this.systemPasscode;
@@ -458,10 +501,6 @@ private class SystemStompRelaySession extends StompRelaySession {
458501

459502
private static final long HEARTBEAT_RECEIVE_MULTIPLIER = 3;
460503

461-
private static final long HEARTBEAT_SEND_INTERVAL = 10000;
462-
463-
private static final long HEARTBEAT_RECEIVE_INTERVAL = 10000;
464-
465504
public static final String ID = "stompRelaySystemSessionId";
466505

467506
private final byte[] heartbeatPayload = new byte[] {'\n'};
@@ -476,7 +515,7 @@ public void connect() {
476515
headers.setAcceptVersion("1.1,1.2");
477516
headers.setLogin(systemLogin);
478517
headers.setPasscode(systemPasscode);
479-
headers.setHeartbeat(HEARTBEAT_SEND_INTERVAL, HEARTBEAT_RECEIVE_INTERVAL);
518+
headers.setHeartbeat(systemHeartbeatSendInterval, systemHeartbeatReceiveInterval);
480519
Message<?> connectMessage = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build();
481520
super.connect(connectMessage);
482521
}
@@ -500,8 +539,8 @@ protected void connectionClosed() {
500539
protected void connected(StompHeaderAccessor headers, final StompConnection stompConnection) {
501540

502541
long brokerReceiveInterval = headers.getHeartbeat()[1];
503-
if ((HEARTBEAT_SEND_INTERVAL > 0) && (brokerReceiveInterval > 0)) {
504-
long interval = Math.max(HEARTBEAT_SEND_INTERVAL, brokerReceiveInterval);
542+
if ((systemHeartbeatSendInterval > 0) && (brokerReceiveInterval > 0)) {
543+
long interval = Math.max(systemHeartbeatSendInterval, brokerReceiveInterval);
505544
stompConnection.connection.on().writeIdle(interval, new Runnable() {
506545

507546
@Override
@@ -523,8 +562,8 @@ public void accept(Boolean result) {
523562
}
524563

525564
long brokerSendInterval = headers.getHeartbeat()[0];
526-
if (HEARTBEAT_RECEIVE_INTERVAL > 0 && brokerSendInterval > 0) {
527-
final long interval = Math.max(HEARTBEAT_RECEIVE_INTERVAL, brokerSendInterval)
565+
if (systemHeartbeatReceiveInterval > 0 && brokerSendInterval > 0) {
566+
final long interval = Math.max(systemHeartbeatReceiveInterval, brokerSendInterval)
528567
* HEARTBEAT_RECEIVE_MULTIPLIER;
529568
stompConnection.connection.on().readIdle(interval, new Runnable() {
530569

spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,12 @@ private void createAndStartBroker() throws Exception {
9090
}
9191

9292
private void createAndStartRelay() throws InterruptedException {
93-
this.relay = new StompBrokerRelayMessageHandler(this.responseChannel, Arrays.asList("/queue/", "/topic/"));
93+
this.relay = new StompBrokerRelayMessageHandler(
94+
this.responseChannel, Arrays.asList("/queue/", "/topic/"));
9495
this.relay.setRelayPort(port);
9596
this.relay.setApplicationEventPublisher(this.eventPublisher);
97+
this.relay.setSystemHeartbeatReceiveInterval(0);
98+
this.relay.setSystemHeartbeatSendInterval(0);
9699

97100
this.eventPublisher.expect(true);
98101
this.relay.start();

0 commit comments

Comments
 (0)