Skip to content

Commit e4d39bb

Browse files
committed
Refactor Undertow WebSocket client configuration model
This commit removes the statically created XnioWorker which is an "active" component and should not be created automatically and could lead to resource leaks. Instead XnioWorker is now required at construction aligning better with WebSocketClient#connectionBuilder which also does not have a "default" worker option. Since the XnioWorker is the main input for creating a ConnectionBuilder we now create the ConnectionBuider in a protected method and then allow a Consumer<ConnectionBuilder> to configure it further as opposed to the Function<URI, ConnectionBuilder> used previously. This commit also removes default SSL context initialization for RxNetty to better align with other client implementations. Issue: SPR-14527
1 parent 384e851 commit e4d39bb

File tree

3 files changed

+96
-86
lines changed

3 files changed

+96
-86
lines changed

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,11 @@
1616
package org.springframework.web.reactive.socket.client;
1717

1818
import java.net.URI;
19-
import java.security.NoSuchAlgorithmException;
2019
import java.util.ArrayList;
2120
import java.util.HashMap;
2221
import java.util.List;
2322
import java.util.Map;
2423
import java.util.function.Function;
25-
import javax.net.ssl.SSLContext;
26-
import javax.net.ssl.SSLEngine;
2724

2825
import io.netty.buffer.ByteBuf;
2926
import io.netty.buffer.ByteBufAllocator;
@@ -89,20 +86,8 @@ public RxNettyWebSocketClient(Function<URI, HttpClient<ByteBuf, ByteBuf>> httpCl
8986

9087
private static HttpClient<ByteBuf, ByteBuf> getDefaultHttpClientProvider(URI url) {
9188
boolean secure = "wss".equals(url.getScheme());
92-
int port = url.getPort() > 0 ? url.getPort() : secure ? 443 : 80;
93-
HttpClient<ByteBuf, ByteBuf> client = HttpClient.newClient(url.getHost(), port);
94-
if (secure) {
95-
try {
96-
SSLContext context = SSLContext.getDefault();
97-
SSLEngine engine = context.createSSLEngine(url.getHost(), port);
98-
engine.setUseClientMode(true);
99-
client.secure(engine);
100-
}
101-
catch (NoSuchAlgorithmException ex) {
102-
throw new IllegalStateException("Failed to create HttpClient for " + url, ex);
103-
}
104-
}
105-
return client;
89+
int port = (url.getPort() > 0 ? url.getPort() : secure ? 443 : 80);
90+
return HttpClient.newClient(url.getHost(), port);
10691
}
10792

10893

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java

Lines changed: 85 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,25 @@
1818

1919
import java.io.IOException;
2020
import java.net.URI;
21-
import java.security.NoSuchAlgorithmException;
2221
import java.util.Arrays;
2322
import java.util.Collections;
2423
import java.util.List;
2524
import java.util.Map;
26-
import java.util.function.Function;
27-
import javax.net.ssl.SSLContext;
25+
import java.util.function.Consumer;
2826

29-
import io.undertow.connector.ByteBufferPool;
30-
import io.undertow.protocols.ssl.UndertowXnioSsl;
3127
import io.undertow.server.DefaultByteBufferPool;
3228
import io.undertow.websockets.client.WebSocketClient.ConnectionBuilder;
3329
import io.undertow.websockets.client.WebSocketClientNegotiation;
3430
import io.undertow.websockets.core.WebSocketChannel;
3531
import org.xnio.IoFuture;
36-
import org.xnio.OptionMap;
37-
import org.xnio.Options;
38-
import org.xnio.Xnio;
3932
import org.xnio.XnioWorker;
40-
import org.xnio.ssl.XnioSsl;
4133
import reactor.core.publisher.Mono;
4234
import reactor.core.publisher.MonoProcessor;
4335

4436
import org.springframework.core.io.buffer.DataBufferFactory;
4537
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
4638
import org.springframework.http.HttpHeaders;
39+
import org.springframework.util.Assert;
4740
import org.springframework.web.reactive.socket.HandshakeInfo;
4841
import org.springframework.web.reactive.socket.WebSocketHandler;
4942
import org.springframework.web.reactive.socket.adapter.UndertowWebSocketHandlerAdapter;
@@ -58,68 +51,69 @@
5851
*/
5952
public class UndertowWebSocketClient extends WebSocketClientSupport implements WebSocketClient {
6053

61-
private static final int DEFAULT_BUFFER_SIZE = 8192;
62-
63-
private static XnioWorker worker;
64-
65-
static {
66-
try {
67-
worker = Xnio.getInstance().createWorker(OptionMap.builder()
68-
.set(Options.WORKER_IO_THREADS, 2)
69-
.set(Options.CONNECTION_HIGH_WATER, 1000000)
70-
.set(Options.CONNECTION_LOW_WATER, 1000000)
71-
.set(Options.WORKER_TASK_CORE_THREADS, 30)
72-
.set(Options.WORKER_TASK_MAX_THREADS, 30)
73-
.set(Options.TCP_NODELAY, true)
74-
.set(Options.CORK, true)
75-
.getMap());
76-
}
77-
catch (IOException ex) {
78-
throw new RuntimeException(ex);
79-
}
80-
}
54+
private static final int DEFAULT_POOL_BUFFER_SIZE = 8192;
55+
56+
57+
private final XnioWorker worker;
8158

59+
private final Consumer<ConnectionBuilder> builderConsumer;
8260

83-
private final Function<URI, ConnectionBuilder> builder;
61+
private int poolBufferSize = DEFAULT_POOL_BUFFER_SIZE;
8462

8563
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
8664

8765

8866
/**
89-
* Default constructor that uses
90-
* {@link io.undertow.websockets.client.WebSocketClient#connectionBuilder(XnioWorker, ByteBufferPool, URI)}
91-
* to create WebSocket connections.
67+
* Constructor with the {@link XnioWorker} to pass to
68+
* {@link io.undertow.websockets.client.WebSocketClient#connectionBuilder}
69+
* @param worker the Xnio worker
9270
*/
93-
public UndertowWebSocketClient() {
94-
this(UndertowWebSocketClient::createDefaultConnectionBuilder);
71+
public UndertowWebSocketClient(XnioWorker worker) {
72+
this(worker, builder -> {});
9573
}
9674

9775
/**
98-
* Constructor that accepts a {@link Function} to prepare a
99-
* {@link ConnectionBuilder} for WebSocket connections.
100-
* @param builder a connection builder that can be used to create a web socket connection.
76+
* Alternate constructor providing additional control over the
77+
* {@link ConnectionBuilder} for each WebSocket connection.
78+
* @param worker the Xnio worker to use to create {@code ConnectionBuilder}'s
79+
* @param builderConsumer a consumer to configure {@code ConnectionBuilder}'s
10180
*/
102-
public UndertowWebSocketClient(Function<URI, ConnectionBuilder> builder) {
103-
this.builder = builder;
81+
public UndertowWebSocketClient(XnioWorker worker, Consumer<ConnectionBuilder> builderConsumer) {
82+
Assert.notNull(worker, "XnioWorker is required");
83+
this.worker = worker;
84+
this.builderConsumer = builderConsumer;
10485
}
10586

106-
private static ConnectionBuilder createDefaultConnectionBuilder(URI url) {
10787

108-
ConnectionBuilder builder = io.undertow.websockets.client.WebSocketClient.connectionBuilder(
109-
worker, new DefaultByteBufferPool(false, DEFAULT_BUFFER_SIZE), url);
88+
/**
89+
* Return the configured {@link XnioWorker}.
90+
*/
91+
public XnioWorker getXnioWorker() {
92+
return this.worker;
93+
}
11094

111-
boolean secure = "wss".equals(url.getScheme());
112-
if (secure) {
113-
try {
114-
XnioSsl ssl = new UndertowXnioSsl(Xnio.getInstance(), OptionMap.EMPTY, SSLContext.getDefault());
115-
builder.setSsl(ssl);
116-
}
117-
catch (NoSuchAlgorithmException ex) {
118-
throw new RuntimeException("Failed to create Undertow ConnectionBuilder for " + url, ex);
119-
}
120-
}
95+
/**
96+
* Return the configured {@code Consumer<ConnectionBuilder}.
97+
*/
98+
public Consumer<ConnectionBuilder> getConnectionBuilderConsumer() {
99+
return this.builderConsumer;
100+
}
121101

122-
return builder;
102+
/**
103+
* Configure the size of the {@link io.undertow.connector.ByteBufferPool
104+
* ByteBufferPool} to pass to
105+
* {@link io.undertow.websockets.client.WebSocketClient#connectionBuilder}.
106+
* <p>By default the buffer size is set to 8192.
107+
*/
108+
public void setPoolBufferSize(int poolBufferSize) {
109+
this.poolBufferSize = poolBufferSize;
110+
}
111+
112+
/**
113+
* Return the size for Undertow's WebSocketClient {@code ByteBufferPool}.
114+
*/
115+
public int getPoolBufferSize() {
116+
return this.poolBufferSize;
123117
}
124118

125119

@@ -137,13 +131,13 @@ private Mono<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandle
137131
MonoProcessor<Void> completion = MonoProcessor.create();
138132
return Mono.fromCallable(
139133
() -> {
134+
ConnectionBuilder builder = createConnectionBuilder(url);
140135
String[] protocols = beforeHandshake(url, headers, handler);
141-
DefaultNegotiation negotiation = new DefaultNegotiation(protocols, headers);
136+
DefaultNegotiation negotiation = new DefaultNegotiation(protocols, headers, builder);
137+
builder.setClientNegotiation(negotiation);
142138

143-
return this.builder.apply(url)
144-
.setClientNegotiation(negotiation)
145-
.connect()
146-
.addNotifier(new IoFuture.HandlingNotifier<WebSocketChannel, Object>() {
139+
return builder.connect().addNotifier(
140+
new IoFuture.HandlingNotifier<WebSocketChannel, Object>() {
147141

148142
@Override
149143
public void handleDone(WebSocketChannel channel, Object attachment) {
@@ -159,6 +153,23 @@ public void handleFailed(IOException ex, Object attachment) {
159153
.then(completion);
160154
}
161155

156+
/**
157+
* Create a {@link ConnectionBuilder} for the given URI.
158+
* <p>The default implementation creates a builder with the configured
159+
* {@link #getXnioWorker() XnioWorker} and {@link #getPoolBufferSize()} and
160+
* then passes it to the {@link #getConnectionBuilderConsumer() consumer}
161+
* provided at construction time.
162+
*/
163+
protected ConnectionBuilder createConnectionBuilder(URI url) {
164+
165+
ConnectionBuilder builder = io.undertow.websockets.client.WebSocketClient
166+
.connectionBuilder(getXnioWorker(),
167+
new DefaultByteBufferPool(false, getPoolBufferSize()), url);
168+
169+
this.builderConsumer.accept(builder);
170+
return builder;
171+
}
172+
162173
private void handleChannel(URI url, WebSocketHandler handler, MonoProcessor<Void> completion,
163174
DefaultNegotiation negotiation, WebSocketChannel channel) {
164175

@@ -177,27 +188,37 @@ private static final class DefaultNegotiation extends WebSocketClientNegotiation
177188

178189
private final HttpHeaders requestHeaders;
179190

180-
private HttpHeaders responseHeaders = new HttpHeaders();
191+
private final HttpHeaders responseHeaders = new HttpHeaders();
192+
193+
private final WebSocketClientNegotiation delegate;
181194

182195

183-
public DefaultNegotiation(String[] subProtocols, HttpHeaders requestHeaders) {
184-
super(Arrays.asList(subProtocols), Collections.emptyList());
196+
public DefaultNegotiation(String[] protocols, HttpHeaders requestHeaders,
197+
ConnectionBuilder connectionBuilder) {
198+
199+
super(Arrays.asList(protocols), Collections.emptyList());
185200
this.requestHeaders = requestHeaders;
201+
this.delegate = connectionBuilder.getClientNegotiation();
186202
}
187203

188-
189204
public HttpHeaders getResponseHeaders() {
190205
return this.responseHeaders;
191206
}
192207

193208
@Override
194209
public void beforeRequest(Map<String, List<String>> headers) {
195210
this.requestHeaders.forEach(headers::put);
211+
if (this.delegate != null) {
212+
this.delegate.beforeRequest(headers);
213+
}
196214
}
197215

198216
@Override
199217
public void afterRequest(Map<String, List<String>> headers) {
200-
headers.forEach((k, v) -> this.responseHeaders.put(k, v));
218+
headers.forEach(this.responseHeaders::put);
219+
if (this.delegate != null) {
220+
this.delegate.afterRequest(headers);
221+
}
201222
}
202223
}
203224

spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/AbstractWebSocketIntegrationTests.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.web.reactive.socket;
1717

1818
import java.io.File;
19+
import java.io.IOException;
1920
import java.net.URI;
2021
import java.net.URISyntaxException;
2122

@@ -27,6 +28,8 @@
2728
import org.junit.runners.Parameterized;
2829
import org.junit.runners.Parameterized.Parameter;
2930
import org.junit.runners.Parameterized.Parameters;
31+
import org.xnio.OptionMap;
32+
import org.xnio.Xnio;
3033
import reactor.core.publisher.Flux;
3134
import reactor.util.function.Tuple3;
3235

@@ -72,6 +75,9 @@
7275
@SuppressWarnings({"unused", "WeakerAccess"})
7376
public abstract class AbstractWebSocketIntegrationTests {
7477

78+
private static final File TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
79+
80+
7581
protected int port;
7682

7783
@Parameter(0)
@@ -85,19 +91,17 @@ public abstract class AbstractWebSocketIntegrationTests {
8591

8692

8793
@Parameters(name = "client[{0}] - server [{1}]")
88-
public static Object[][] arguments() {
89-
90-
File base = new File(System.getProperty("java.io.tmpdir"));
94+
public static Object[][] arguments() throws IOException {
9195

9296
Flux<? extends WebSocketClient> clients = Flux.concat(
9397
Flux.just(new StandardWebSocketClient()).repeat(5),
9498
Flux.just(new JettyWebSocketClient()).repeat(5),
9599
Flux.just(new ReactorNettyWebSocketClient()).repeat(5),
96100
Flux.just(new RxNettyWebSocketClient()).repeat(5),
97-
Flux.just(new UndertowWebSocketClient()).repeat(5));
101+
Flux.just(new UndertowWebSocketClient(Xnio.getInstance().createWorker(OptionMap.EMPTY))).repeat(5));
98102

99103
Flux<? extends HttpServer> servers = Flux.just(
100-
new TomcatHttpServer(base.getAbsolutePath(), WsContextListener.class),
104+
new TomcatHttpServer(TMP_DIR.getAbsolutePath(), WsContextListener.class),
101105
new JettyHttpServer(),
102106
new ReactorHttpServer(),
103107
new RxNettyHttpServer(),

0 commit comments

Comments
 (0)