@@ -69,11 +69,16 @@ public WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Bui
69
69
70
70
@ Override
71
71
public Mono <Void > connect (Function <Mono <McpSchema .JSONRPCMessage >, Mono <McpSchema .JSONRPCMessage >> handler ) {
72
- if (this .openConnections .isDisposed ()) {
73
- return Mono .error (new RuntimeException ("Transport already disposed" ));
74
- }
75
- this .handler .set (handler );
76
- return openConnectionOnStartup ? startOrResumeSession (null ) : Mono .empty ();
72
+ return Mono .deferContextual (ctx -> {
73
+ if (this .openConnections .isDisposed ()) {
74
+ return Mono .error (new RuntimeException ("Transport already disposed" ));
75
+ }
76
+ this .handler .set (handler );
77
+ if (openConnectionOnStartup ) {
78
+ this .reconnect (null , ctx );
79
+ }
80
+ return Mono .empty ();
81
+ });
77
82
}
78
83
79
84
@ Override
@@ -82,63 +87,58 @@ public Mono<Void> closeGracefully() {
82
87
}
83
88
84
89
private void reconnect (McpStream stream , ContextView ctx ) {
85
- Disposable connection = this .startOrResumeSession (stream ).contextWrite (ctx ).subscribe ();
86
- this .openConnections .add (connection );
87
- }
88
-
89
- private Mono <Void > startOrResumeSession (McpStream stream ) {
90
- return Mono .create (sink -> {
91
- // Here we attempt to initialize the client.
92
- // In case the server supports SSE, we will establish a long-running session
93
- // here and
94
- // listen for messages.
95
- // If it doesn't, nothing actually happens here, that's just the way it is...
96
-
97
- Disposable connection = webClient .get ()
98
- .uri (this .endpoint )
99
- .accept (MediaType .TEXT_EVENT_STREAM )
100
- .headers (httpHeaders -> {
101
- if (sessionId .get () != null ) {
102
- httpHeaders .add ("mcp-session-id" , sessionId .get ());
103
- }
104
- if (stream != null && stream .lastId () != null ) {
105
- httpHeaders .add ("last-event-id" , stream .lastId ());
106
- }
107
- })
108
- .exchangeToFlux (response -> {
109
- // Per spec, we are not checking whether it's 2xx, but only if the
110
- // Accept header is proper.
111
- if (response .headers ().contentType ().isPresent ()
112
- && response .headers ().contentType ().get ().isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
113
-
114
- sink .success ();
90
+ // Here we attempt to initialize the client.
91
+ // In case the server supports SSE, we will establish a long-running session
92
+ // here and
93
+ // listen for messages.
94
+ // If it doesn't, nothing actually happens here, that's just the way it is...
95
+ final AtomicReference <Disposable > disposableRef = new AtomicReference <>();
96
+ Disposable connection = webClient .get ()
97
+ .uri (this .endpoint )
98
+ .accept (MediaType .TEXT_EVENT_STREAM )
99
+ .headers (httpHeaders -> {
100
+ if (sessionId .get () != null ) {
101
+ httpHeaders .add ("mcp-session-id" , sessionId .get ());
102
+ }
103
+ if (stream != null && stream .lastId () != null ) {
104
+ httpHeaders .add ("last-event-id" , stream .lastId ());
105
+ }
106
+ })
107
+ .exchangeToFlux (response -> {
108
+ // Per spec, we are not checking whether it's 2xx, but only if the
109
+ // Accept header is proper.
110
+ if (response .headers ().contentType ().isPresent ()
111
+ && response .headers ().contentType ().get ().isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
115
112
116
- McpStream sessionStream = stream != null ? stream : new McpStream (this .resumableStreams );
113
+ McpStream sessionStream = stream != null ? stream : new McpStream (this .resumableStreams );
117
114
118
- Flux <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> idWithMessages = response
119
- .bodyToFlux (new ParameterizedTypeReference <ServerSentEvent <String >>() {
120
- })
121
- .map (this ::parse );
115
+ Flux <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> idWithMessages = response
116
+ .bodyToFlux (new ParameterizedTypeReference <ServerSentEvent <String >>() {
117
+ })
118
+ .map (this ::parse );
122
119
123
- return sessionStream .consumeSseStream (idWithMessages );
124
- }
125
- else if (response .statusCode ().isSameCodeAs (HttpStatus .METHOD_NOT_ALLOWED )) {
126
- sink .success ();
127
- logger .info ("The server does not support SSE streams, using request-response mode." );
128
- return Flux .empty ();
129
- }
130
- else {
131
- return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
132
- sink .error (new RuntimeException ("Connection on client startup failed" , e ));
133
- }).flux ();
134
- }
135
- })
136
- // TODO: Consider retries - examine cause to decide whether a retry is
137
- // needed.
138
- .contextWrite (sink .contextView ())
139
- .subscribe ();
140
- this .openConnections .add (connection );
141
- });
120
+ return sessionStream .consumeSseStream (idWithMessages );
121
+ }
122
+ else if (response .statusCode ().isSameCodeAs (HttpStatus .METHOD_NOT_ALLOWED )) {
123
+ logger .info ("The server does not support SSE streams, using request-response mode." );
124
+ return Flux .empty ();
125
+ }
126
+ else {
127
+ return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
128
+ logger .info ("Opening an SSE stream failed. This can be safely ignored." , e );
129
+ }).flux ();
130
+ }
131
+ })
132
+ .doFinally (s -> {
133
+ Disposable ref = disposableRef .getAndSet (null );
134
+ if (ref != null ) {
135
+ this .openConnections .remove (ref );
136
+ }
137
+ })
138
+ .contextWrite (ctx )
139
+ .subscribe ();
140
+ disposableRef .set (connection );
141
+ this .openConnections .add (connection );
142
142
}
143
143
144
144
@ Override
@@ -150,6 +150,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
150
150
// here and
151
151
// listen for messages.
152
152
// If it doesn't, nothing actually happens here, that's just the way it is...
153
+ final AtomicReference <Disposable > disposableRef = new AtomicReference <>();
153
154
Disposable connection = webClient .post ()
154
155
.uri (this .endpoint )
155
156
.accept (MediaType .TEXT_EVENT_STREAM , MediaType .APPLICATION_JSON )
@@ -166,7 +167,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
166
167
sessionId .set (response .headers ().asHttpHeaders ().getFirst ("mcp-session-id" ));
167
168
// Once we have a session, we try to open an async stream for
168
169
// the server to send notifications and requests out-of-band.
169
- startOrResumeSession (null ). contextWrite ( sink .contextView ()). subscribe ( );
170
+ reconnect (null , sink .contextView ());
170
171
}
171
172
}
172
173
@@ -242,10 +243,15 @@ else if (contentType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
242
243
})
243
244
.map (Mono ::just )
244
245
.flatMap (this .handler .get ())
245
- // TODO: Consider retries - examine cause to decide whether a retry is
246
- // needed.
246
+ .doFinally (s -> {
247
+ Disposable ref = disposableRef .getAndSet (null );
248
+ if (ref != null ) {
249
+ this .openConnections .remove (ref );
250
+ }
251
+ })
247
252
.contextWrite (sink .contextView ())
248
253
.subscribe ();
254
+ disposableRef .set (connection );
249
255
this .openConnections .add (connection );
250
256
});
251
257
}
@@ -295,10 +301,7 @@ Flux<McpSchema.JSONRPCMessage> consumeSseStream(
295
301
return Flux .deferContextual (ctx -> Flux .from (eventStream ).doOnError (e -> {
296
302
// TODO: examine which error :)
297
303
if (resumable ) {
298
- Disposable connection = WebClientStreamableHttpTransport .this .startOrResumeSession (this )
299
- .contextWrite (ctx )
300
- .subscribe ();
301
- WebClientStreamableHttpTransport .this .openConnections .add (connection );
304
+ reconnect (this , ctx );
302
305
}
303
306
})
304
307
.doOnNext (idAndMessage -> idAndMessage .getT1 ().ifPresent (this .lastId ::set ))
0 commit comments