Skip to content

Commit fb4e34f

Browse files
committed
Add partial WebSocketMessage support
1 parent 814d24e commit fb4e34f

11 files changed

+126
-48
lines changed

spring-websocket/src/main/java/org/springframework/web/socket/BinaryMessage.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package org.springframework.web.socket;
1717

18-
import java.io.ByteArrayInputStream;
19-
import java.io.InputStream;
2018
import java.nio.ByteBuffer;
2119

2220

@@ -34,20 +32,36 @@ public final class BinaryMessage extends WebSocketMessage<ByteBuffer> {
3432
/**
3533
* Create a new {@link BinaryMessage} instance.
3634
* @param payload a non-null payload
37-
* @param isLast if the message is the last of a series of partial messages
3835
*/
3936
public BinaryMessage(ByteBuffer payload) {
40-
super(payload);
41-
this.bytes = null;
37+
this(payload, true);
4238
}
4339

4440
/**
4541
* Create a new {@link BinaryMessage} instance.
4642
* @param payload a non-null payload
4743
* @param isLast if the message is the last of a series of partial messages
4844
*/
45+
public BinaryMessage(ByteBuffer payload, boolean isLast) {
46+
super(payload, isLast);
47+
this.bytes = null;
48+
}
49+
50+
/**
51+
* Create a new {@link BinaryMessage} instance.
52+
* @param payload a non-null payload
53+
*/
4954
public BinaryMessage(byte[] payload) {
50-
this(payload, 0, (payload == null ? 0 : payload.length));
55+
this(payload, 0, (payload == null ? 0 : payload.length), true);
56+
}
57+
58+
/**
59+
* Create a new {@link BinaryMessage} instance.
60+
* @param payload a non-null payload
61+
* @param isLast if the message is the last of a series of partial messages
62+
*/
63+
public BinaryMessage(byte[] payload, boolean isLast) {
64+
this(payload, 0, (payload == null ? 0 : payload.length), isLast);
5165
}
5266

5367
/**
@@ -58,8 +72,8 @@ public BinaryMessage(byte[] payload) {
5872
* @param len the length of the array considered for the payload
5973
* @param isLast if the message is the last of a series of partial messages
6074
*/
61-
public BinaryMessage(byte[] payload, int offset, int len) {
62-
super(payload != null ? ByteBuffer.wrap(payload, offset, len) : null);
75+
public BinaryMessage(byte[] payload, int offset, int len, boolean isLast) {
76+
super(payload != null ? ByteBuffer.wrap(payload, offset, len) : null, isLast);
6377
if(offset == 0 && len == payload.length) {
6478
this.bytes = payload;
6579
}
@@ -82,18 +96,9 @@ private byte[] getRemainingBytes(ByteBuffer payload) {
8296
return result;
8397
}
8498

85-
/**
86-
* Returns access to the message payload as an {@link InputStream}.
87-
*/
88-
public InputStream getInputStream() {
89-
byte[] array = getByteArray();
90-
return (array != null) ? new ByteArrayInputStream(array) : null;
91-
}
92-
9399
@Override
94-
public String toString() {
95-
int size = (getPayload() != null) ? getPayload().remaining() : 0;
96-
return "WebSocket binary message size=" + size;
100+
protected int getPayloadSize() {
101+
return (getPayload() != null) ? getPayload().remaining() : 0;
97102
}
98103

99104
}

spring-websocket/src/main/java/org/springframework/web/socket/TextMessage.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,23 @@
2727
*/
2828
public final class TextMessage extends WebSocketMessage<String> {
2929

30+
3031
/**
3132
* Create a new {@link TextMessage} instance.
3233
* @param payload the payload
34+
* @param isLast whether this the last part of a message received or transmitted in parts
3335
*/
3436
public TextMessage(CharSequence payload) {
35-
super(payload.toString());
37+
super(payload.toString(), true);
38+
}
39+
40+
/**
41+
* Create a new {@link TextMessage} instance.
42+
* @param payload the payload
43+
* @param isLast whether this the last part of a message received or transmitted in parts
44+
*/
45+
public TextMessage(CharSequence payload, boolean isLast) {
46+
super(payload.toString(), isLast);
3647
}
3748

3849
/**
@@ -42,4 +53,9 @@ public Reader getReader() {
4253
return new StringReader(getPayload());
4354
}
4455

56+
@Override
57+
protected int getPayloadSize() {
58+
return getPayload().length();
59+
}
60+
4561
}

spring-websocket/src/main/java/org/springframework/web/socket/WebSocketHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,9 @@ public interface WebSocketHandler {
7373
*/
7474
void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;
7575

76+
/**
77+
* Whether the WebSocketHandler handles messages in parts.
78+
*/
79+
boolean supportsPartialMessages();
80+
7681
}

spring-websocket/src/main/java/org/springframework/web/socket/WebSocketMessage.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,17 @@ public abstract class WebSocketMessage<T> {
3131

3232
private final T payload;
3333

34+
private final boolean last;
35+
3436

3537
/**
3638
* Create a new {@link WebSocketMessage} instance with the given payload.
3739
* @param payload a non-null payload
3840
*/
39-
WebSocketMessage(T payload) {
41+
WebSocketMessage(T payload, boolean isLast) {
4042
Assert.notNull(payload, "Payload must not be null");
4143
this.payload = payload;
44+
this.last = isLast;
4245
}
4346

4447
/**
@@ -48,9 +51,13 @@ public T getPayload() {
4851
return this.payload;
4952
}
5053

51-
@Override
52-
public String toString() {
53-
return getClass().getSimpleName() + " [payload=" + this.payload + "]";
54+
/**
55+
* Whether this is the last part of a message, when partial message support on a
56+
* {@link WebSocketHandler} is enabled. If partial message support is not enabled the
57+
* returned value is always {@code true}.
58+
*/
59+
public boolean isLast() {
60+
return this.last;
5461
}
5562

5663
@Override
@@ -70,4 +77,11 @@ public boolean equals(Object other) {
7077
return ObjectUtils.nullSafeEquals(this.payload, otherMessage.payload);
7178
}
7279

80+
@Override
81+
public String toString() {
82+
return getClass().getSimpleName() + " [payload length=" + getPayloadSize() + ", last=" + isLast() + "]";
83+
}
84+
85+
protected abstract int getPayloadSize();
86+
7387
}

spring-websocket/src/main/java/org/springframework/web/socket/adapter/JettyWebSocketListenerAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public void onWebSocketText(String payload) {
7474

7575
@Override
7676
public void onWebSocketBinary(byte[] payload, int offset, int len) {
77-
BinaryMessage message = new BinaryMessage(payload, offset, len);
77+
BinaryMessage message = new BinaryMessage(payload, offset, len, true);
7878
try {
7979
this.webSocketHandler.handleMessage(this.wsSession, message);
8080
}

spring-websocket/src/main/java/org/springframework/web/socket/adapter/StandardEndpointAdapter.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,30 +61,46 @@ public void onOpen(final javax.websocket.Session session, EndpointConfig config)
6161

6262
this.wsSession.initSession(session);
6363

64+
if (this.handler.supportsPartialMessages()) {
65+
session.addMessageHandler(new MessageHandler.Partial<String>() {
66+
@Override
67+
public void onMessage(String message, boolean isLast) {
68+
handleTextMessage(session, message, isLast);
69+
}
70+
});
71+
session.addMessageHandler(new MessageHandler.Partial<ByteBuffer>() {
72+
@Override
73+
public void onMessage(ByteBuffer message, boolean isLast) {
74+
handleBinaryMessage(session, message, isLast);
75+
}
76+
});
77+
}
78+
else {
79+
session.addMessageHandler(new MessageHandler.Whole<String>() {
80+
@Override
81+
public void onMessage(String message) {
82+
handleTextMessage(session, message, true);
83+
}
84+
});
85+
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
86+
@Override
87+
public void onMessage(ByteBuffer message) {
88+
handleBinaryMessage(session, message, true);
89+
}
90+
});
91+
}
92+
6493
try {
6594
this.handler.afterConnectionEstablished(this.wsSession);
6695
}
6796
catch (Throwable t) {
6897
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
6998
return;
7099
}
71-
72-
session.addMessageHandler(new MessageHandler.Whole<String>() {
73-
@Override
74-
public void onMessage(String message) {
75-
handleTextMessage(session, message);
76-
}
77-
});
78-
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
79-
@Override
80-
public void onMessage(ByteBuffer message) {
81-
handleBinaryMessage(session, message);
82-
}
83-
});
84100
}
85101

86-
private void handleTextMessage(javax.websocket.Session session, String payload) {
87-
TextMessage textMessage = new TextMessage(payload);
102+
private void handleTextMessage(javax.websocket.Session session, String payload, boolean isLast) {
103+
TextMessage textMessage = new TextMessage(payload, isLast);
88104
try {
89105
this.handler.handleMessage(this.wsSession, textMessage);
90106
}
@@ -93,8 +109,8 @@ private void handleTextMessage(javax.websocket.Session session, String payload)
93109
}
94110
}
95111

96-
private void handleBinaryMessage(javax.websocket.Session session, ByteBuffer payload) {
97-
BinaryMessage binaryMessage = new BinaryMessage(payload);
112+
private void handleBinaryMessage(javax.websocket.Session session, ByteBuffer payload, boolean isLast) {
113+
BinaryMessage binaryMessage = new BinaryMessage(payload, isLast);
98114
try {
99115
this.handler.handleMessage(this.wsSession, binaryMessage);
100116
}

spring-websocket/src/main/java/org/springframework/web/socket/adapter/StandardWebSocketSessionAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,12 @@ public boolean isOpen() {
110110

111111
@Override
112112
protected void sendTextMessage(TextMessage message) throws IOException {
113-
this.session.getBasicRemote().sendText(message.getPayload());
113+
this.session.getBasicRemote().sendText(message.getPayload(), message.isLast());
114114
}
115115

116116
@Override
117117
protected void sendBinaryMessage(BinaryMessage message) throws IOException {
118-
this.session.getBasicRemote().sendBinary(message.getPayload());
118+
this.session.getBasicRemote().sendBinary(message.getPayload(), message.isLast());
119119
}
120120

121121
@Override

spring-websocket/src/main/java/org/springframework/web/socket/adapter/WebSocketHandlerAdapter.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,14 @@ public void afterConnectionEstablished(WebSocketSession session) throws Exceptio
3838
}
3939

4040
@Override
41-
public final void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
41+
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
4242
if (message instanceof TextMessage) {
4343
handleTextMessage(session, (TextMessage) message);
4444
}
4545
else if (message instanceof BinaryMessage) {
4646
handleBinaryMessage(session, (BinaryMessage) message);
4747
}
4848
else {
49-
// should not happen
5049
throw new IllegalStateException("Unexpected WebSocket message type: " + message);
5150
}
5251
}
@@ -65,4 +64,9 @@ public void handleTransportError(WebSocketSession session, Throwable exception)
6564
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
6665
}
6766

67+
@Override
68+
public boolean supportsPartialMessages() {
69+
return false;
70+
}
71+
6872
}

spring-websocket/src/main/java/org/springframework/web/socket/support/LoggingWebSocketHandlerDecorator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ public void afterConnectionEstablished(WebSocketSession session) throws Exceptio
5050

5151
@Override
5252
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
53-
if (logger.isTraceEnabled()) {
54-
logger.trace("Received " + message + ", " + session);
53+
if (logger.isDebugEnabled()) {
54+
logger.debug("Received " + message + ", " + session);
5555
}
5656
super.handleMessage(session, message);
5757
}

spring-websocket/src/main/java/org/springframework/web/socket/support/PerConnectionWebSocketHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,16 @@ public class PerConnectionWebSocketHandler implements WebSocketHandler, BeanFact
5757
private final Map<WebSocketSession, WebSocketHandler> handlers =
5858
new ConcurrentHashMap<WebSocketSession, WebSocketHandler>();
5959

60+
private final boolean supportsPartialMessages;
61+
6062

6163
public PerConnectionWebSocketHandler(Class<? extends WebSocketHandler> handlerType) {
64+
this(handlerType, false);
65+
}
66+
67+
public PerConnectionWebSocketHandler(Class<? extends WebSocketHandler> handlerType, boolean supportsPartialMessages) {
6268
this.provider = new BeanCreatingHandlerProvider<WebSocketHandler>(handlerType);
69+
this.supportsPartialMessages = supportsPartialMessages;
6370
}
6471

6572
@Override
@@ -112,6 +119,11 @@ private void destroy(WebSocketSession session) {
112119
}
113120
}
114121

122+
@Override
123+
public boolean supportsPartialMessages() {
124+
return this.supportsPartialMessages;
125+
}
126+
115127
@Override
116128
public String toString() {
117129
return "PerConnectionWebSocketHandlerProxy [handlerType=" + this.provider.getHandlerType() + "]";

spring-websocket/src/main/java/org/springframework/web/socket/support/WebSocketHandlerDecorator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ public void afterConnectionClosed(WebSocketSession session, CloseStatus closeSta
6262
this.delegate.afterConnectionClosed(session, closeStatus);
6363
}
6464

65+
@Override
66+
public boolean supportsPartialMessages() {
67+
return this.delegate.supportsPartialMessages();
68+
}
69+
70+
6571
@Override
6672
public String toString() {
6773
return getClass().getSimpleName() + " [delegate=" + this.delegate + "]";

0 commit comments

Comments
 (0)