@@ -41,24 +41,25 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {
41
41
private static final String MESSAGE_EVENT_TYPE = "message" ;
42
42
43
43
private final ObjectMapper objectMapper ;
44
+
44
45
private final WebClient webClient ;
46
+
45
47
private final String endpoint ;
48
+
46
49
private final boolean openConnectionOnStartup ;
50
+
47
51
private final boolean resumableStreams ;
48
52
49
- private AtomicReference <Function <Mono <McpSchema .JSONRPCMessage >,
50
- Mono <McpSchema .JSONRPCMessage >>> handler = new AtomicReference <>();
53
+ private AtomicReference <Function <Mono <McpSchema .JSONRPCMessage >, Mono <McpSchema .JSONRPCMessage >>> handler = new AtomicReference <>();
51
54
52
55
private final Disposable .Composite openConnections = Disposables .composite ();
56
+
53
57
private final AtomicBoolean initialized = new AtomicBoolean ();
58
+
54
59
private final AtomicReference <String > sessionId = new AtomicReference <>();
55
60
56
- public WebClientStreamableHttpTransport (
57
- ObjectMapper objectMapper ,
58
- WebClient .Builder webClientBuilder ,
59
- String endpoint ,
60
- boolean resumableStreams ,
61
- boolean openConnectionOnStartup ) {
61
+ public WebClientStreamableHttpTransport (ObjectMapper objectMapper , WebClient .Builder webClientBuilder ,
62
+ String endpoint , boolean resumableStreams , boolean openConnectionOnStartup ) {
62
63
this .objectMapper = objectMapper ;
63
64
this .webClient = webClientBuilder .build ();
64
65
this .endpoint = endpoint ;
@@ -81,57 +82,61 @@ public Mono<Void> closeGracefully() {
81
82
}
82
83
83
84
private void reconnect (McpStream stream , ContextView ctx ) {
84
- Disposable connection = this .startOrResumeSession (stream )
85
- .contextWrite (ctx )
86
- .subscribe ();
85
+ Disposable connection = this .startOrResumeSession (stream ).contextWrite (ctx ).subscribe ();
87
86
this .openConnections .add (connection );
88
87
}
89
88
90
89
private Mono <Void > startOrResumeSession (McpStream stream ) {
91
90
return Mono .create (sink -> {
92
91
// Here we attempt to initialize the client.
93
- // In case the server supports SSE, we will establish a long-running session here and
92
+ // In case the server supports SSE, we will establish a long-running session
93
+ // here and
94
94
// listen for messages.
95
95
// If it doesn't, nothing actually happens here, that's just the way it is...
96
96
97
97
Disposable connection = webClient .get ()
98
- .uri (this .endpoint )
99
- .accept (MediaType .TEXT_EVENT_STREAM )
100
- .headers (httpHeaders -> {
101
- if (sessionId .get () != null ) {
102
- httpHeaders .add ("mcp-session-id" , sessionId .get ());
103
- }
104
- if (stream != null && stream .lastId () != null ) {
105
- httpHeaders .add ("last-event-id" , stream .lastId ());
106
- }
107
- })
108
- .exchangeToFlux (response -> {
109
- // Per spec, we are not checking whether it's 2xx, but only if the Accept header is proper.
110
- if (response .headers ().contentType ().isPresent ()
111
- && response .headers ().contentType ().get ().isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
112
-
113
- sink .success ();
114
-
115
- McpStream sessionStream = stream != null ? stream : new McpStream (this .resumableStreams );
116
-
117
- Flux <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> idWithMessages =
118
- response .bodyToFlux (new ParameterizedTypeReference <ServerSentEvent <String >>() {
119
- }).map (this ::parse );
120
-
121
- return sessionStream .consumeSseStream (idWithMessages );
122
- } else if (response .statusCode ().isSameCodeAs (HttpStatus .METHOD_NOT_ALLOWED )) {
123
- sink .success ();
124
- logger .info ("The server does not support SSE streams, using request-response mode." );
125
- return Flux .empty ();
126
- } else {
127
- return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
128
- sink .error (new RuntimeException ("Connection on client startup failed" , e ));
129
- }).flux ();
130
- }
131
- })
132
- // TODO: Consider retries - examine cause to decide whether a retry is needed.
133
- .contextWrite (sink .contextView ())
134
- .subscribe ();
98
+ .uri (this .endpoint )
99
+ .accept (MediaType .TEXT_EVENT_STREAM )
100
+ .headers (httpHeaders -> {
101
+ if (sessionId .get () != null ) {
102
+ httpHeaders .add ("mcp-session-id" , sessionId .get ());
103
+ }
104
+ if (stream != null && stream .lastId () != null ) {
105
+ httpHeaders .add ("last-event-id" , stream .lastId ());
106
+ }
107
+ })
108
+ .exchangeToFlux (response -> {
109
+ // Per spec, we are not checking whether it's 2xx, but only if the
110
+ // Accept header is proper.
111
+ if (response .headers ().contentType ().isPresent ()
112
+ && response .headers ().contentType ().get ().isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
113
+
114
+ sink .success ();
115
+
116
+ McpStream sessionStream = stream != null ? stream : new McpStream (this .resumableStreams );
117
+
118
+ Flux <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> idWithMessages = response
119
+ .bodyToFlux (new ParameterizedTypeReference <ServerSentEvent <String >>() {
120
+ })
121
+ .map (this ::parse );
122
+
123
+ return sessionStream .consumeSseStream (idWithMessages );
124
+ }
125
+ else if (response .statusCode ().isSameCodeAs (HttpStatus .METHOD_NOT_ALLOWED )) {
126
+ sink .success ();
127
+ logger .info ("The server does not support SSE streams, using request-response mode." );
128
+ return Flux .empty ();
129
+ }
130
+ else {
131
+ return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
132
+ sink .error (new RuntimeException ("Connection on client startup failed" , e ));
133
+ }).flux ();
134
+ }
135
+ })
136
+ // TODO: Consider retries - examine cause to decide whether a retry is
137
+ // needed.
138
+ .contextWrite (sink .contextView ())
139
+ .subscribe ();
135
140
this .openConnections .add (connection );
136
141
});
137
142
}
@@ -141,92 +146,106 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
141
146
return Mono .create (sink -> {
142
147
System .out .println ("Sending message " + message );
143
148
// Here we attempt to initialize the client.
144
- // In case the server supports SSE, we will establish a long-running session here and
149
+ // In case the server supports SSE, we will establish a long-running session
150
+ // here and
145
151
// listen for messages.
146
152
// If it doesn't, nothing actually happens here, that's just the way it is...
147
153
Disposable connection = webClient .post ()
148
- .uri (this .endpoint )
149
- .accept (MediaType .TEXT_EVENT_STREAM , MediaType .APPLICATION_JSON )
150
- .headers (httpHeaders -> {
151
- if (sessionId .get () != null ) {
152
- httpHeaders .add ("mcp-session-id" , sessionId .get ());
153
- }
154
- })
155
- .bodyValue (message )
156
- .exchangeToFlux (response -> {
157
- // TODO: this goes into the request phase
158
- if (!initialized .compareAndExchange (false , true )) {
159
- if (!response .headers ().header ("mcp-session-id" ).isEmpty ()) {
160
- sessionId .set (response .headers ().asHttpHeaders ().getFirst ("mcp-session-id" ));
161
- // Once we have a session, we try to open an async stream for the server to send notifications and requests out-of-band.
162
- startOrResumeSession (null )
163
- .contextWrite (sink .contextView ())
164
- .subscribe ();
165
- }
166
- }
167
-
168
- // The spec mentions only ACCEPTED, but the existing SDKs can return 200 OK for notifications
169
- // if (!response.statusCode().isSameCodeAs(HttpStatus.ACCEPTED)) {
170
- if (!response .statusCode ().is2xxSuccessful ()) {
171
- if (response .statusCode ().isSameCodeAs (HttpStatus .NOT_FOUND )) {
172
- logger .info ("Session {} was not found on the MCP server" , sessionId .get ());
173
-
174
- McpSessionNotFoundException notFoundException = new McpSessionNotFoundException ("Session " + sessionId .get () + " not found" );
175
- // inform the caller of sendMessage
176
- sink .error (notFoundException );
177
- // inform the stream/connection subscriber
178
- return Flux .error (notFoundException );
179
- }
180
- return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
181
- sink .error (new RuntimeException ("Sending request failed" , e ));
182
- }).flux ();
154
+ .uri (this .endpoint )
155
+ .accept (MediaType .TEXT_EVENT_STREAM , MediaType .APPLICATION_JSON )
156
+ .headers (httpHeaders -> {
157
+ if (sessionId .get () != null ) {
158
+ httpHeaders .add ("mcp-session-id" , sessionId .get ());
159
+ }
160
+ })
161
+ .bodyValue (message )
162
+ .exchangeToFlux (response -> {
163
+ // TODO: this goes into the request phase
164
+ if (!initialized .compareAndExchange (false , true )) {
165
+ if (!response .headers ().header ("mcp-session-id" ).isEmpty ()) {
166
+ sessionId .set (response .headers ().asHttpHeaders ().getFirst ("mcp-session-id" ));
167
+ // Once we have a session, we try to open an async stream for
168
+ // the server to send notifications and requests out-of-band.
169
+ startOrResumeSession (null ).contextWrite (sink .contextView ()).subscribe ();
183
170
}
184
-
185
- // Existing SDKs consume notifications with no response body nor content type
186
- if (response .headers ().contentType ().isEmpty ()) {
187
- sink .success ();
188
- return Flux .empty ();
189
- // return response.<McpSchema.JSONRPCMessage>createError().doOnError(e -> {
190
- //// sink.error(new RuntimeException("Response has no content type"));
191
- // }).flux();
171
+ }
172
+
173
+ // The spec mentions only ACCEPTED, but the existing SDKs can return
174
+ // 200 OK for notifications
175
+ // if (!response.statusCode().isSameCodeAs(HttpStatus.ACCEPTED)) {
176
+ if (!response .statusCode ().is2xxSuccessful ()) {
177
+ if (response .statusCode ().isSameCodeAs (HttpStatus .NOT_FOUND )) {
178
+ logger .info ("Session {} was not found on the MCP server" , sessionId .get ());
179
+
180
+ McpSessionNotFoundException notFoundException = new McpSessionNotFoundException (
181
+ "Session " + sessionId .get () + " not found" );
182
+ // inform the caller of sendMessage
183
+ sink .error (notFoundException );
184
+ // inform the stream/connection subscriber
185
+ return Flux .error (notFoundException );
192
186
}
193
-
194
- MediaType contentType = response .headers ().contentType ().get ();
195
-
196
- if (contentType .isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
197
- sink .success ();
198
- McpStream sessionStream = new McpStream (this .resumableStreams );
199
-
200
- Flux <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> idWithMessages =
201
- response .bodyToFlux (new ParameterizedTypeReference <ServerSentEvent <String >>() {
202
- }).map (this ::parse );
203
-
204
- return sessionStream .consumeSseStream (idWithMessages );
205
- } else if (contentType .isCompatibleWith (MediaType .APPLICATION_JSON )) {
206
- sink .success ();
207
- // return response.bodyToMono(new ParameterizedTypeReference<Iterable<McpSchema.JSONRPCMessage>>() {});
208
- return response .bodyToMono (String .class )
209
- .<Iterable <McpSchema .JSONRPCMessage >>handle ((responseMessage , s ) -> {
210
- try {
211
- McpSchema .JSONRPCMessage jsonRpcResponse = McpSchema .deserializeJsonRpcMessage (objectMapper , responseMessage );
212
- s .next (List .of (jsonRpcResponse ));
213
- } catch (IOException e ) {
214
- s .error (e );
215
- }
187
+ return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
188
+ sink .error (new RuntimeException ("Sending request failed" , e ));
189
+ }).flux ();
190
+ }
191
+
192
+ // Existing SDKs consume notifications with no response body nor
193
+ // content type
194
+ if (response .headers ().contentType ().isEmpty ()) {
195
+ sink .success ();
196
+ return Flux .empty ();
197
+ // return
198
+ // response.<McpSchema.JSONRPCMessage>createError().doOnError(e ->
199
+ // {
200
+ //// sink.error(new RuntimeException("Response has no content
201
+ // type"));
202
+ // }).flux();
203
+ }
204
+
205
+ MediaType contentType = response .headers ().contentType ().get ();
206
+
207
+ if (contentType .isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
208
+ sink .success ();
209
+ McpStream sessionStream = new McpStream (this .resumableStreams );
210
+
211
+ Flux <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> idWithMessages = response
212
+ .bodyToFlux (new ParameterizedTypeReference <ServerSentEvent <String >>() {
216
213
})
217
- .flatMapIterable (Function .identity ());
218
- // .map(Mono::just)
219
- // .flatMap(this.handler.get());
220
- } else {
221
- sink .error (new RuntimeException ("Unknown media type" ));
222
- return Flux .empty ();
223
- }
224
- })
225
- .map (Mono ::just )
226
- .flatMap (this .handler .get ())
227
- // TODO: Consider retries - examine cause to decide whether a retry is needed.
228
- .contextWrite (sink .contextView ())
229
- .subscribe ();
214
+ .map (this ::parse );
215
+
216
+ return sessionStream .consumeSseStream (idWithMessages );
217
+ }
218
+ else if (contentType .isCompatibleWith (MediaType .APPLICATION_JSON )) {
219
+ sink .success ();
220
+ // return response.bodyToMono(new
221
+ // ParameterizedTypeReference<Iterable<McpSchema.JSONRPCMessage>>()
222
+ // {});
223
+ return response .bodyToMono (
224
+ String .class ).<Iterable <McpSchema .JSONRPCMessage >>handle ((responseMessage , s ) -> {
225
+ try {
226
+ McpSchema .JSONRPCMessage jsonRpcResponse = McpSchema
227
+ .deserializeJsonRpcMessage (objectMapper , responseMessage );
228
+ s .next (List .of (jsonRpcResponse ));
229
+ }
230
+ catch (IOException e ) {
231
+ s .error (e );
232
+ }
233
+ })
234
+ .flatMapIterable (Function .identity ());
235
+ // .map(Mono::just)
236
+ // .flatMap(this.handler.get());
237
+ }
238
+ else {
239
+ sink .error (new RuntimeException ("Unknown media type" ));
240
+ return Flux .empty ();
241
+ }
242
+ })
243
+ .map (Mono ::just )
244
+ .flatMap (this .handler .get ())
245
+ // TODO: Consider retries - examine cause to decide whether a retry is
246
+ // needed.
247
+ .contextWrite (sink .contextView ())
248
+ .subscribe ();
230
249
this .openConnections .add (connection );
231
250
});
232
251
}
@@ -259,33 +278,33 @@ private class McpStream {
259
278
private final AtomicReference <String > lastId = new AtomicReference <>();
260
279
261
280
private final long streamId ;
281
+
262
282
private final boolean resumable ;
263
283
264
- McpStream (boolean resumable ) {
265
- this .streamId = counter .getAndIncrement ();
284
+ McpStream (boolean resumable ) {
285
+ this .streamId = counter .getAndIncrement ();
266
286
this .resumable = resumable ;
267
287
}
268
288
269
289
String lastId () {
270
290
return this .lastId .get ();
271
291
}
272
292
273
- Flux <McpSchema .JSONRPCMessage > consumeSseStream (Publisher <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> eventStream ) {
274
- return Flux .deferContextual (ctx ->
275
- Flux .from (eventStream )
276
- .doOnError (e -> {
277
- // TODO: examine which error :)
278
- if (resumable ) {
279
- Disposable connection = WebClientStreamableHttpTransport .this .startOrResumeSession (this )
280
- .contextWrite (ctx )
281
- .subscribe ();
282
- WebClientStreamableHttpTransport .this .openConnections .add (connection );
283
- }
284
- })
285
- .doOnNext (idAndMessage -> idAndMessage .getT1 ().ifPresent (this .lastId ::set ))
286
- .flatMapIterable (Tuple2 ::getT2 )
287
- );
293
+ Flux <McpSchema .JSONRPCMessage > consumeSseStream (
294
+ Publisher <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> eventStream ) {
295
+ return Flux .deferContextual (ctx -> Flux .from (eventStream ).doOnError (e -> {
296
+ // TODO: examine which error :)
297
+ if (resumable ) {
298
+ Disposable connection = WebClientStreamableHttpTransport .this .startOrResumeSession (this )
299
+ .contextWrite (ctx )
300
+ .subscribe ();
301
+ WebClientStreamableHttpTransport .this .openConnections .add (connection );
302
+ }
303
+ })
304
+ .doOnNext (idAndMessage -> idAndMessage .getT1 ().ifPresent (this .lastId ::set ))
305
+ .flatMapIterable (Tuple2 ::getT2 ));
288
306
}
289
307
290
308
}
309
+
291
310
}
0 commit comments