Skip to content

Commit 40bc356

Browse files
committed
Provide disposables cleanup
1 parent 26e8af0 commit 40bc356

File tree

1 file changed

+69
-66
lines changed

1 file changed

+69
-66
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 69 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,16 @@ public WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Bui
6969

7070
@Override
7171
public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
72-
if (this.openConnections.isDisposed()) {
73-
return Mono.error(new RuntimeException("Transport already disposed"));
74-
}
75-
this.handler.set(handler);
76-
return openConnectionOnStartup ? startOrResumeSession(null) : Mono.empty();
72+
return Mono.deferContextual(ctx -> {
73+
if (this.openConnections.isDisposed()) {
74+
return Mono.error(new RuntimeException("Transport already disposed"));
75+
}
76+
this.handler.set(handler);
77+
if (openConnectionOnStartup) {
78+
this.reconnect(null, ctx);
79+
}
80+
return Mono.empty();
81+
});
7782
}
7883

7984
@Override
@@ -82,63 +87,58 @@ public Mono<Void> closeGracefully() {
8287
}
8388

8489
private void reconnect(McpStream stream, ContextView ctx) {
85-
Disposable connection = this.startOrResumeSession(stream).contextWrite(ctx).subscribe();
86-
this.openConnections.add(connection);
87-
}
88-
89-
private Mono<Void> startOrResumeSession(McpStream stream) {
90-
return Mono.create(sink -> {
91-
// Here we attempt to initialize the client.
92-
// In case the server supports SSE, we will establish a long-running session
93-
// here and
94-
// listen for messages.
95-
// If it doesn't, nothing actually happens here, that's just the way it is...
96-
97-
Disposable connection = webClient.get()
98-
.uri(this.endpoint)
99-
.accept(MediaType.TEXT_EVENT_STREAM)
100-
.headers(httpHeaders -> {
101-
if (sessionId.get() != null) {
102-
httpHeaders.add("mcp-session-id", sessionId.get());
103-
}
104-
if (stream != null && stream.lastId() != null) {
105-
httpHeaders.add("last-event-id", stream.lastId());
106-
}
107-
})
108-
.exchangeToFlux(response -> {
109-
// Per spec, we are not checking whether it's 2xx, but only if the
110-
// Accept header is proper.
111-
if (response.headers().contentType().isPresent()
112-
&& response.headers().contentType().get().isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) {
113-
114-
sink.success();
90+
// Here we attempt to initialize the client.
91+
// In case the server supports SSE, we will establish a long-running session
92+
// here and
93+
// listen for messages.
94+
// If it doesn't, nothing actually happens here, that's just the way it is...
95+
final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
96+
Disposable connection = webClient.get()
97+
.uri(this.endpoint)
98+
.accept(MediaType.TEXT_EVENT_STREAM)
99+
.headers(httpHeaders -> {
100+
if (sessionId.get() != null) {
101+
httpHeaders.add("mcp-session-id", sessionId.get());
102+
}
103+
if (stream != null && stream.lastId() != null) {
104+
httpHeaders.add("last-event-id", stream.lastId());
105+
}
106+
})
107+
.exchangeToFlux(response -> {
108+
// Per spec, we are not checking whether it's 2xx, but only if the
109+
// Accept header is proper.
110+
if (response.headers().contentType().isPresent()
111+
&& response.headers().contentType().get().isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) {
115112

116-
McpStream sessionStream = stream != null ? stream : new McpStream(this.resumableStreams);
113+
McpStream sessionStream = stream != null ? stream : new McpStream(this.resumableStreams);
117114

118-
Flux<Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>>> idWithMessages = response
119-
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
120-
})
121-
.map(this::parse);
115+
Flux<Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>>> idWithMessages = response
116+
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
117+
})
118+
.map(this::parse);
122119

123-
return sessionStream.consumeSseStream(idWithMessages);
124-
}
125-
else if (response.statusCode().isSameCodeAs(HttpStatus.METHOD_NOT_ALLOWED)) {
126-
sink.success();
127-
logger.info("The server does not support SSE streams, using request-response mode.");
128-
return Flux.empty();
129-
}
130-
else {
131-
return response.<McpSchema.JSONRPCMessage>createError().doOnError(e -> {
132-
sink.error(new RuntimeException("Connection on client startup failed", e));
133-
}).flux();
134-
}
135-
})
136-
// TODO: Consider retries - examine cause to decide whether a retry is
137-
// needed.
138-
.contextWrite(sink.contextView())
139-
.subscribe();
140-
this.openConnections.add(connection);
141-
});
120+
return sessionStream.consumeSseStream(idWithMessages);
121+
}
122+
else if (response.statusCode().isSameCodeAs(HttpStatus.METHOD_NOT_ALLOWED)) {
123+
logger.info("The server does not support SSE streams, using request-response mode.");
124+
return Flux.empty();
125+
}
126+
else {
127+
return response.<McpSchema.JSONRPCMessage>createError().doOnError(e -> {
128+
logger.info("Opening an SSE stream failed. This can be safely ignored.", e);
129+
}).flux();
130+
}
131+
})
132+
.doFinally(s -> {
133+
Disposable ref = disposableRef.getAndSet(null);
134+
if (ref != null) {
135+
this.openConnections.remove(ref);
136+
}
137+
})
138+
.contextWrite(ctx)
139+
.subscribe();
140+
disposableRef.set(connection);
141+
this.openConnections.add(connection);
142142
}
143143

144144
@Override
@@ -150,6 +150,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
150150
// here and
151151
// listen for messages.
152152
// If it doesn't, nothing actually happens here, that's just the way it is...
153+
final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
153154
Disposable connection = webClient.post()
154155
.uri(this.endpoint)
155156
.accept(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_JSON)
@@ -166,7 +167,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
166167
sessionId.set(response.headers().asHttpHeaders().getFirst("mcp-session-id"));
167168
// Once we have a session, we try to open an async stream for
168169
// the server to send notifications and requests out-of-band.
169-
startOrResumeSession(null).contextWrite(sink.contextView()).subscribe();
170+
reconnect(null, sink.contextView());
170171
}
171172
}
172173

@@ -242,10 +243,15 @@ else if (contentType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
242243
})
243244
.map(Mono::just)
244245
.flatMap(this.handler.get())
245-
// TODO: Consider retries - examine cause to decide whether a retry is
246-
// needed.
246+
.doFinally(s -> {
247+
Disposable ref = disposableRef.getAndSet(null);
248+
if (ref != null) {
249+
this.openConnections.remove(ref);
250+
}
251+
})
247252
.contextWrite(sink.contextView())
248253
.subscribe();
254+
disposableRef.set(connection);
249255
this.openConnections.add(connection);
250256
});
251257
}
@@ -295,10 +301,7 @@ Flux<McpSchema.JSONRPCMessage> consumeSseStream(
295301
return Flux.deferContextual(ctx -> Flux.from(eventStream).doOnError(e -> {
296302
// TODO: examine which error :)
297303
if (resumable) {
298-
Disposable connection = WebClientStreamableHttpTransport.this.startOrResumeSession(this)
299-
.contextWrite(ctx)
300-
.subscribe();
301-
WebClientStreamableHttpTransport.this.openConnections.add(connection);
304+
reconnect(this, ctx);
302305
}
303306
})
304307
.doOnNext(idAndMessage -> idAndMessage.getT1().ifPresent(this.lastId::set))

0 commit comments

Comments
 (0)