18
18
19
19
import java .io .IOException ;
20
20
import java .net .URI ;
21
- import java .security .NoSuchAlgorithmException ;
22
21
import java .util .Arrays ;
23
22
import java .util .Collections ;
24
23
import java .util .List ;
25
24
import java .util .Map ;
26
- import java .util .function .Function ;
27
- import javax .net .ssl .SSLContext ;
25
+ import java .util .function .Consumer ;
28
26
29
- import io .undertow .connector .ByteBufferPool ;
30
- import io .undertow .protocols .ssl .UndertowXnioSsl ;
31
27
import io .undertow .server .DefaultByteBufferPool ;
32
28
import io .undertow .websockets .client .WebSocketClient .ConnectionBuilder ;
33
29
import io .undertow .websockets .client .WebSocketClientNegotiation ;
34
30
import io .undertow .websockets .core .WebSocketChannel ;
35
31
import org .xnio .IoFuture ;
36
- import org .xnio .OptionMap ;
37
- import org .xnio .Options ;
38
- import org .xnio .Xnio ;
39
32
import org .xnio .XnioWorker ;
40
- import org .xnio .ssl .XnioSsl ;
41
33
import reactor .core .publisher .Mono ;
42
34
import reactor .core .publisher .MonoProcessor ;
43
35
44
36
import org .springframework .core .io .buffer .DataBufferFactory ;
45
37
import org .springframework .core .io .buffer .DefaultDataBufferFactory ;
46
38
import org .springframework .http .HttpHeaders ;
39
+ import org .springframework .util .Assert ;
47
40
import org .springframework .web .reactive .socket .HandshakeInfo ;
48
41
import org .springframework .web .reactive .socket .WebSocketHandler ;
49
42
import org .springframework .web .reactive .socket .adapter .UndertowWebSocketHandlerAdapter ;
58
51
*/
59
52
public class UndertowWebSocketClient extends WebSocketClientSupport implements WebSocketClient {
60
53
61
- private static final int DEFAULT_BUFFER_SIZE = 8192 ;
62
-
63
- private static XnioWorker worker ;
64
-
65
- static {
66
- try {
67
- worker = Xnio .getInstance ().createWorker (OptionMap .builder ()
68
- .set (Options .WORKER_IO_THREADS , 2 )
69
- .set (Options .CONNECTION_HIGH_WATER , 1000000 )
70
- .set (Options .CONNECTION_LOW_WATER , 1000000 )
71
- .set (Options .WORKER_TASK_CORE_THREADS , 30 )
72
- .set (Options .WORKER_TASK_MAX_THREADS , 30 )
73
- .set (Options .TCP_NODELAY , true )
74
- .set (Options .CORK , true )
75
- .getMap ());
76
- }
77
- catch (IOException ex ) {
78
- throw new RuntimeException (ex );
79
- }
80
- }
54
+ private static final int DEFAULT_POOL_BUFFER_SIZE = 8192 ;
55
+
56
+
57
+ private final XnioWorker worker ;
81
58
59
+ private final Consumer <ConnectionBuilder > builderConsumer ;
82
60
83
- private final Function < URI , ConnectionBuilder > builder ;
61
+ private int poolBufferSize = DEFAULT_POOL_BUFFER_SIZE ;
84
62
85
63
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory ();
86
64
87
65
88
66
/**
89
- * Default constructor that uses
90
- * {@link io.undertow.websockets.client.WebSocketClient#connectionBuilder(XnioWorker, ByteBufferPool, URI) }
91
- * to create WebSocket connections.
67
+ * Constructor with the {@link XnioWorker} to pass to
68
+ * {@link io.undertow.websockets.client.WebSocketClient#connectionBuilder}
69
+ * @param worker the Xnio worker
92
70
*/
93
- public UndertowWebSocketClient () {
94
- this (UndertowWebSocketClient :: createDefaultConnectionBuilder );
71
+ public UndertowWebSocketClient (XnioWorker worker ) {
72
+ this (worker , builder -> {} );
95
73
}
96
74
97
75
/**
98
- * Constructor that accepts a {@link Function} to prepare a
99
- * {@link ConnectionBuilder} for WebSocket connections.
100
- * @param builder a connection builder that can be used to create a web socket connection.
76
+ * Alternate constructor providing additional control over the
77
+ * {@link ConnectionBuilder} for each WebSocket connection.
78
+ * @param worker the Xnio worker to use to create {@code ConnectionBuilder}'s
79
+ * @param builderConsumer a consumer to configure {@code ConnectionBuilder}'s
101
80
*/
102
- public UndertowWebSocketClient (Function <URI , ConnectionBuilder > builder ) {
103
- this .builder = builder ;
81
+ public UndertowWebSocketClient (XnioWorker worker , Consumer <ConnectionBuilder > builderConsumer ) {
82
+ Assert .notNull (worker , "XnioWorker is required" );
83
+ this .worker = worker ;
84
+ this .builderConsumer = builderConsumer ;
104
85
}
105
86
106
- private static ConnectionBuilder createDefaultConnectionBuilder (URI url ) {
107
87
108
- ConnectionBuilder builder = io .undertow .websockets .client .WebSocketClient .connectionBuilder (
109
- worker , new DefaultByteBufferPool (false , DEFAULT_BUFFER_SIZE ), url );
88
+ /**
89
+ * Return the configured {@link XnioWorker}.
90
+ */
91
+ public XnioWorker getXnioWorker () {
92
+ return this .worker ;
93
+ }
110
94
111
- boolean secure = "wss" .equals (url .getScheme ());
112
- if (secure ) {
113
- try {
114
- XnioSsl ssl = new UndertowXnioSsl (Xnio .getInstance (), OptionMap .EMPTY , SSLContext .getDefault ());
115
- builder .setSsl (ssl );
116
- }
117
- catch (NoSuchAlgorithmException ex ) {
118
- throw new RuntimeException ("Failed to create Undertow ConnectionBuilder for " + url , ex );
119
- }
120
- }
95
+ /**
96
+ * Return the configured {@code Consumer<ConnectionBuilder}.
97
+ */
98
+ public Consumer <ConnectionBuilder > getConnectionBuilderConsumer () {
99
+ return this .builderConsumer ;
100
+ }
121
101
122
- return builder ;
102
+ /**
103
+ * Configure the size of the {@link io.undertow.connector.ByteBufferPool
104
+ * ByteBufferPool} to pass to
105
+ * {@link io.undertow.websockets.client.WebSocketClient#connectionBuilder}.
106
+ * <p>By default the buffer size is set to 8192.
107
+ */
108
+ public void setPoolBufferSize (int poolBufferSize ) {
109
+ this .poolBufferSize = poolBufferSize ;
110
+ }
111
+
112
+ /**
113
+ * Return the size for Undertow's WebSocketClient {@code ByteBufferPool}.
114
+ */
115
+ public int getPoolBufferSize () {
116
+ return this .poolBufferSize ;
123
117
}
124
118
125
119
@@ -137,13 +131,13 @@ private Mono<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandle
137
131
MonoProcessor <Void > completion = MonoProcessor .create ();
138
132
return Mono .fromCallable (
139
133
() -> {
134
+ ConnectionBuilder builder = createConnectionBuilder (url );
140
135
String [] protocols = beforeHandshake (url , headers , handler );
141
- DefaultNegotiation negotiation = new DefaultNegotiation (protocols , headers );
136
+ DefaultNegotiation negotiation = new DefaultNegotiation (protocols , headers , builder );
137
+ builder .setClientNegotiation (negotiation );
142
138
143
- return this .builder .apply (url )
144
- .setClientNegotiation (negotiation )
145
- .connect ()
146
- .addNotifier (new IoFuture .HandlingNotifier <WebSocketChannel , Object >() {
139
+ return builder .connect ().addNotifier (
140
+ new IoFuture .HandlingNotifier <WebSocketChannel , Object >() {
147
141
148
142
@ Override
149
143
public void handleDone (WebSocketChannel channel , Object attachment ) {
@@ -159,6 +153,23 @@ public void handleFailed(IOException ex, Object attachment) {
159
153
.then (completion );
160
154
}
161
155
156
+ /**
157
+ * Create a {@link ConnectionBuilder} for the given URI.
158
+ * <p>The default implementation creates a builder with the configured
159
+ * {@link #getXnioWorker() XnioWorker} and {@link #getPoolBufferSize()} and
160
+ * then passes it to the {@link #getConnectionBuilderConsumer() consumer}
161
+ * provided at construction time.
162
+ */
163
+ protected ConnectionBuilder createConnectionBuilder (URI url ) {
164
+
165
+ ConnectionBuilder builder = io .undertow .websockets .client .WebSocketClient
166
+ .connectionBuilder (getXnioWorker (),
167
+ new DefaultByteBufferPool (false , getPoolBufferSize ()), url );
168
+
169
+ this .builderConsumer .accept (builder );
170
+ return builder ;
171
+ }
172
+
162
173
private void handleChannel (URI url , WebSocketHandler handler , MonoProcessor <Void > completion ,
163
174
DefaultNegotiation negotiation , WebSocketChannel channel ) {
164
175
@@ -177,27 +188,37 @@ private static final class DefaultNegotiation extends WebSocketClientNegotiation
177
188
178
189
private final HttpHeaders requestHeaders ;
179
190
180
- private HttpHeaders responseHeaders = new HttpHeaders ();
191
+ private final HttpHeaders responseHeaders = new HttpHeaders ();
192
+
193
+ private final WebSocketClientNegotiation delegate ;
181
194
182
195
183
- public DefaultNegotiation (String [] subProtocols , HttpHeaders requestHeaders ) {
184
- super (Arrays .asList (subProtocols ), Collections .emptyList ());
196
+ public DefaultNegotiation (String [] protocols , HttpHeaders requestHeaders ,
197
+ ConnectionBuilder connectionBuilder ) {
198
+
199
+ super (Arrays .asList (protocols ), Collections .emptyList ());
185
200
this .requestHeaders = requestHeaders ;
201
+ this .delegate = connectionBuilder .getClientNegotiation ();
186
202
}
187
203
188
-
189
204
public HttpHeaders getResponseHeaders () {
190
205
return this .responseHeaders ;
191
206
}
192
207
193
208
@ Override
194
209
public void beforeRequest (Map <String , List <String >> headers ) {
195
210
this .requestHeaders .forEach (headers ::put );
211
+ if (this .delegate != null ) {
212
+ this .delegate .beforeRequest (headers );
213
+ }
196
214
}
197
215
198
216
@ Override
199
217
public void afterRequest (Map <String , List <String >> headers ) {
200
- headers .forEach ((k , v ) -> this .responseHeaders .put (k , v ));
218
+ headers .forEach (this .responseHeaders ::put );
219
+ if (this .delegate != null ) {
220
+ this .delegate .afterRequest (headers );
221
+ }
201
222
}
202
223
}
203
224
0 commit comments