Skip to content

Commit 384e851

Browse files
committed
Polish reactive WebSocketClient implementations
1 parent c18ebde commit 384e851

File tree

5 files changed

+170
-107
lines changed

5 files changed

+170
-107
lines changed

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import java.net.URI;
2020

21-
import org.eclipse.jetty.websocket.api.Session;
2221
import org.eclipse.jetty.websocket.api.UpgradeRequest;
2322
import org.eclipse.jetty.websocket.api.UpgradeResponse;
2423
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
@@ -36,8 +35,14 @@
3635
import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession;
3736

3837
/**
39-
* Jetty based implementation of {@link WebSocketClient}.
40-
*
38+
* A {@link WebSocketClient} implementation for use with Jetty
39+
* {@link org.eclipse.jetty.websocket.client.WebSocketClient}.
40+
*
41+
* <p><strong>Note: </strong> the Jetty {@code WebSocketClient} requires
42+
* lifecycle management and must be started and stopped. This is automatically
43+
* managed when this class is declared as a Spring bean and created with the
44+
* default constructor. See constructor notes for more details.
45+
*
4146
* @author Violeta Georgieva
4247
* @author Rossen Stoyanchev
4348
* @since 5.0
@@ -46,34 +51,60 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
4651

4752
private final org.eclipse.jetty.websocket.client.WebSocketClient jettyClient;
4853

49-
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
54+
private final boolean externallyManaged;
55+
56+
private boolean running = false;
5057

5158
private final Object lifecycleMonitor = new Object();
5259

60+
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
61+
5362

5463
/**
55-
* Default constructor that creates an instance of
56-
* {@link org.eclipse.jetty.websocket.client.WebSocketClient}.
64+
* Default constructor that creates and manages an instance of a Jetty
65+
* {@link org.eclipse.jetty.websocket.client.WebSocketClient WebSocketClient}.
66+
* The instance can be obtained with {@link #getJettyClient()} for further
67+
* configuration.
68+
*
69+
* <p><strong>Note: </strong> When this constructor is used {@link Lifecycle}
70+
* methods of this class are delegated to the Jetty {@code WebSocketClient}.
5771
*/
5872
public JettyWebSocketClient() {
59-
this(new org.eclipse.jetty.websocket.client.WebSocketClient());
73+
this.jettyClient = new org.eclipse.jetty.websocket.client.WebSocketClient();
74+
this.externallyManaged = false;
6075
}
6176

6277
/**
63-
* Constructor that accepts an existing
64-
* {@link org.eclipse.jetty.websocket.client.WebSocketClient} instance.
65-
* @param jettyClient a web socket client
78+
* Constructor that accepts an existing instance of a Jetty
79+
* {@link org.eclipse.jetty.websocket.client.WebSocketClient WebSocketClient}.
80+
*
81+
* <p><strong>Note: </strong> Use of this constructor implies the Jetty
82+
* {@code WebSocketClient} is externally managed and hence {@link Lifecycle}
83+
* methods of this class are not delegated to it.
6684
*/
6785
public JettyWebSocketClient(org.eclipse.jetty.websocket.client.WebSocketClient jettyClient) {
6886
this.jettyClient = jettyClient;
87+
this.externallyManaged = true;
88+
}
89+
90+
91+
/**
92+
* Return the underlying Jetty {@code WebSocketClient}.
93+
*/
94+
public org.eclipse.jetty.websocket.client.WebSocketClient getJettyClient() {
95+
return this.jettyClient;
6996
}
7097

7198

7299
@Override
73100
public void start() {
101+
if (this.externallyManaged) {
102+
return;
103+
}
74104
synchronized (this.lifecycleMonitor) {
75105
if (!isRunning()) {
76106
try {
107+
this.running = true;
77108
this.jettyClient.start();
78109
}
79110
catch (Exception ex) {
@@ -85,9 +116,13 @@ public void start() {
85116

86117
@Override
87118
public void stop() {
119+
if (this.externallyManaged) {
120+
return;
121+
}
88122
synchronized (this.lifecycleMonitor) {
89123
if (isRunning()) {
90124
try {
125+
this.running = false;
91126
this.jettyClient.stop();
92127
}
93128
catch (Exception ex) {
@@ -100,7 +135,7 @@ public void stop() {
100135
@Override
101136
public boolean isRunning() {
102137
synchronized (this.lifecycleMonitor) {
103-
return this.jettyClient.isStarted();
138+
return this.running;
104139
}
105140
}
106141

@@ -131,15 +166,13 @@ private Mono<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandle
131166

132167
private Object createJettyHandler(URI url, WebSocketHandler handler, MonoProcessor<Void> completion) {
133168
return new JettyWebSocketHandlerAdapter(handler,
134-
session -> createJettySession(url, completion, session));
135-
}
136-
137-
private JettyWebSocketSession createJettySession(URI url, MonoProcessor<Void> completion, Session session) {
138-
UpgradeResponse response = session.getUpgradeResponse();
139-
HttpHeaders responseHeaders = new HttpHeaders();
140-
response.getHeaders().forEach(responseHeaders::put);
141-
HandshakeInfo info = afterHandshake(url, responseHeaders);
142-
return new JettyWebSocketSession(session, info, this.bufferFactory, completion);
169+
session -> {
170+
UpgradeResponse response = session.getUpgradeResponse();
171+
HttpHeaders responseHeaders = new HttpHeaders();
172+
response.getHeaders().forEach(responseHeaders::put);
173+
HandshakeInfo info = afterHandshake(url, responseHeaders);
174+
return new JettyWebSocketSession(session, info, this.bufferFactory, completion);
175+
});
143176
}
144177

145178

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
3434

3535
/**
36-
* A Reactor Netty based implementation of {@link WebSocketClient}.
36+
* {@link WebSocketClient} implementation for use with Reactor Netty.
3737
*
3838
* @author Rossen Stoyanchev
3939
* @since 5.0
@@ -43,15 +43,30 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen
4343
private final HttpClient httpClient;
4444

4545

46+
/**
47+
* Default constructor.
48+
*/
4649
public ReactorNettyWebSocketClient() {
47-
this.httpClient = HttpClient.create();
50+
this(options -> {});
4851
}
4952

53+
/**
54+
* Constructor that accepts an {@link HttpClientOptions} consumer to supply
55+
* to {@link HttpClient#create(Consumer)}.
56+
*/
5057
public ReactorNettyWebSocketClient(Consumer<? super HttpClientOptions> clientOptions) {
5158
this.httpClient = HttpClient.create(clientOptions);
5259
}
5360

5461

62+
/**
63+
* Return the configured {@link HttpClient}.
64+
*/
65+
public HttpClient getHttpClient() {
66+
return this.httpClient;
67+
}
68+
69+
5570
@Override
5671
public Mono<Void> execute(URI url, WebSocketHandler handler) {
5772
return execute(url, new HttpHeaders(), handler);
@@ -63,32 +78,25 @@ public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler
6378
String[] protocols = beforeHandshake(url, headers, handler);
6479
// TODO: https://github.com/reactor/reactor-netty/issues/20
6580

66-
return this.httpClient
67-
.get(url.toString(), request -> {
68-
addRequestHeaders(request, headers);
69-
return request.sendWebsocket();
70-
})
81+
return getHttpClient()
82+
.get(url.toString(), request -> addHeaders(request, headers).sendWebsocket())
7183
.then(response -> {
72-
HttpHeaders responseHeaders = getResponseHeaders(response);
73-
HandshakeInfo info = afterHandshake(url, responseHeaders);
74-
84+
HandshakeInfo info = afterHandshake(url, toHttpHeaders(response));
7585
ByteBufAllocator allocator = response.channel().alloc();
7686
NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator);
77-
7887
return response.receiveWebsocket((in, out) -> {
7988
WebSocketSession session = new ReactorNettyWebSocketSession(in, out, info, factory);
8089
return handler.handle(session);
8190
});
8291
});
8392
}
8493

85-
private void addRequestHeaders(HttpClientRequest request, HttpHeaders headers) {
86-
headers.keySet().stream()
87-
.forEach(key -> headers.get(key).stream()
88-
.forEach(value -> request.addHeader(key, value)));
94+
private HttpClientRequest addHeaders(HttpClientRequest request, HttpHeaders headers) {
95+
headers.keySet().stream().forEach(key -> request.requestHeaders().set(key, headers.get(key)));
96+
return request;
8997
}
9098

91-
private HttpHeaders getResponseHeaders(HttpClientResponse response) {
99+
private HttpHeaders toHttpHeaders(HttpClientResponse response) {
92100
HttpHeaders headers = new HttpHeaders();
93101
response.responseHeaders().forEach(entry -> {
94102
String name = entry.getKey();

0 commit comments

Comments
 (0)