Skip to content

Commit 26e8af0

Browse files
committed
Use new docker images
1 parent 53012ad commit 26e8af0

File tree

12 files changed

+265
-193
lines changed

12 files changed

+265
-193
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 165 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,25 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {
4141
private static final String MESSAGE_EVENT_TYPE = "message";
4242

4343
private final ObjectMapper objectMapper;
44+
4445
private final WebClient webClient;
46+
4547
private final String endpoint;
48+
4649
private final boolean openConnectionOnStartup;
50+
4751
private final boolean resumableStreams;
4852

49-
private AtomicReference<Function<Mono<McpSchema.JSONRPCMessage>,
50-
Mono<McpSchema.JSONRPCMessage>>> handler = new AtomicReference<>();
53+
private AtomicReference<Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>>> handler = new AtomicReference<>();
5154

5255
private final Disposable.Composite openConnections = Disposables.composite();
56+
5357
private final AtomicBoolean initialized = new AtomicBoolean();
58+
5459
private final AtomicReference<String> sessionId = new AtomicReference<>();
5560

56-
public WebClientStreamableHttpTransport(
57-
ObjectMapper objectMapper,
58-
WebClient.Builder webClientBuilder,
59-
String endpoint,
60-
boolean resumableStreams,
61-
boolean openConnectionOnStartup) {
61+
public WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Builder webClientBuilder,
62+
String endpoint, boolean resumableStreams, boolean openConnectionOnStartup) {
6263
this.objectMapper = objectMapper;
6364
this.webClient = webClientBuilder.build();
6465
this.endpoint = endpoint;
@@ -81,57 +82,61 @@ public Mono<Void> closeGracefully() {
8182
}
8283

8384
private void reconnect(McpStream stream, ContextView ctx) {
84-
Disposable connection = this.startOrResumeSession(stream)
85-
.contextWrite(ctx)
86-
.subscribe();
85+
Disposable connection = this.startOrResumeSession(stream).contextWrite(ctx).subscribe();
8786
this.openConnections.add(connection);
8887
}
8988

9089
private Mono<Void> startOrResumeSession(McpStream stream) {
9190
return Mono.create(sink -> {
9291
// Here we attempt to initialize the client.
93-
// In case the server supports SSE, we will establish a long-running session here and
92+
// In case the server supports SSE, we will establish a long-running session
93+
// here and
9494
// listen for messages.
9595
// If it doesn't, nothing actually happens here, that's just the way it is...
9696

9797
Disposable connection = webClient.get()
98-
.uri(this.endpoint)
99-
.accept(MediaType.TEXT_EVENT_STREAM)
100-
.headers(httpHeaders -> {
101-
if (sessionId.get() != null) {
102-
httpHeaders.add("mcp-session-id", sessionId.get());
103-
}
104-
if (stream != null && stream.lastId() != null) {
105-
httpHeaders.add("last-event-id", stream.lastId());
106-
}
107-
})
108-
.exchangeToFlux(response -> {
109-
// Per spec, we are not checking whether it's 2xx, but only if the Accept header is proper.
110-
if (response.headers().contentType().isPresent()
111-
&& response.headers().contentType().get().isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) {
112-
113-
sink.success();
114-
115-
McpStream sessionStream = stream != null ? stream : new McpStream(this.resumableStreams);
116-
117-
Flux<Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>>> idWithMessages =
118-
response.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
119-
}).map(this::parse);
120-
121-
return sessionStream.consumeSseStream(idWithMessages);
122-
} else if (response.statusCode().isSameCodeAs(HttpStatus.METHOD_NOT_ALLOWED)) {
123-
sink.success();
124-
logger.info("The server does not support SSE streams, using request-response mode.");
125-
return Flux.empty();
126-
} else {
127-
return response.<McpSchema.JSONRPCMessage>createError().doOnError(e -> {
128-
sink.error(new RuntimeException("Connection on client startup failed", e));
129-
}).flux();
130-
}
131-
})
132-
// TODO: Consider retries - examine cause to decide whether a retry is needed.
133-
.contextWrite(sink.contextView())
134-
.subscribe();
98+
.uri(this.endpoint)
99+
.accept(MediaType.TEXT_EVENT_STREAM)
100+
.headers(httpHeaders -> {
101+
if (sessionId.get() != null) {
102+
httpHeaders.add("mcp-session-id", sessionId.get());
103+
}
104+
if (stream != null && stream.lastId() != null) {
105+
httpHeaders.add("last-event-id", stream.lastId());
106+
}
107+
})
108+
.exchangeToFlux(response -> {
109+
// Per spec, we are not checking whether it's 2xx, but only if the
110+
// Accept header is proper.
111+
if (response.headers().contentType().isPresent()
112+
&& response.headers().contentType().get().isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) {
113+
114+
sink.success();
115+
116+
McpStream sessionStream = stream != null ? stream : new McpStream(this.resumableStreams);
117+
118+
Flux<Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>>> idWithMessages = response
119+
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
120+
})
121+
.map(this::parse);
122+
123+
return sessionStream.consumeSseStream(idWithMessages);
124+
}
125+
else if (response.statusCode().isSameCodeAs(HttpStatus.METHOD_NOT_ALLOWED)) {
126+
sink.success();
127+
logger.info("The server does not support SSE streams, using request-response mode.");
128+
return Flux.empty();
129+
}
130+
else {
131+
return response.<McpSchema.JSONRPCMessage>createError().doOnError(e -> {
132+
sink.error(new RuntimeException("Connection on client startup failed", e));
133+
}).flux();
134+
}
135+
})
136+
// TODO: Consider retries - examine cause to decide whether a retry is
137+
// needed.
138+
.contextWrite(sink.contextView())
139+
.subscribe();
135140
this.openConnections.add(connection);
136141
});
137142
}
@@ -141,92 +146,106 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
141146
return Mono.create(sink -> {
142147
System.out.println("Sending message " + message);
143148
// Here we attempt to initialize the client.
144-
// In case the server supports SSE, we will establish a long-running session here and
149+
// In case the server supports SSE, we will establish a long-running session
150+
// here and
145151
// listen for messages.
146152
// If it doesn't, nothing actually happens here, that's just the way it is...
147153
Disposable connection = webClient.post()
148-
.uri(this.endpoint)
149-
.accept(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_JSON)
150-
.headers(httpHeaders -> {
151-
if (sessionId.get() != null) {
152-
httpHeaders.add("mcp-session-id", sessionId.get());
153-
}
154-
})
155-
.bodyValue(message)
156-
.exchangeToFlux(response -> {
157-
// TODO: this goes into the request phase
158-
if (!initialized.compareAndExchange(false, true)) {
159-
if (!response.headers().header("mcp-session-id").isEmpty()) {
160-
sessionId.set(response.headers().asHttpHeaders().getFirst("mcp-session-id"));
161-
// Once we have a session, we try to open an async stream for the server to send notifications and requests out-of-band.
162-
startOrResumeSession(null)
163-
.contextWrite(sink.contextView())
164-
.subscribe();
165-
}
166-
}
167-
168-
// The spec mentions only ACCEPTED, but the existing SDKs can return 200 OK for notifications
169-
// if (!response.statusCode().isSameCodeAs(HttpStatus.ACCEPTED)) {
170-
if (!response.statusCode().is2xxSuccessful()) {
171-
if (response.statusCode().isSameCodeAs(HttpStatus.NOT_FOUND)) {
172-
logger.info("Session {} was not found on the MCP server", sessionId.get());
173-
174-
McpSessionNotFoundException notFoundException = new McpSessionNotFoundException("Session " + sessionId.get() + " not found");
175-
// inform the caller of sendMessage
176-
sink.error(notFoundException);
177-
// inform the stream/connection subscriber
178-
return Flux.error(notFoundException);
179-
}
180-
return response.<McpSchema.JSONRPCMessage>createError().doOnError(e -> {
181-
sink.error(new RuntimeException("Sending request failed", e));
182-
}).flux();
154+
.uri(this.endpoint)
155+
.accept(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_JSON)
156+
.headers(httpHeaders -> {
157+
if (sessionId.get() != null) {
158+
httpHeaders.add("mcp-session-id", sessionId.get());
159+
}
160+
})
161+
.bodyValue(message)
162+
.exchangeToFlux(response -> {
163+
// TODO: this goes into the request phase
164+
if (!initialized.compareAndExchange(false, true)) {
165+
if (!response.headers().header("mcp-session-id").isEmpty()) {
166+
sessionId.set(response.headers().asHttpHeaders().getFirst("mcp-session-id"));
167+
// Once we have a session, we try to open an async stream for
168+
// the server to send notifications and requests out-of-band.
169+
startOrResumeSession(null).contextWrite(sink.contextView()).subscribe();
183170
}
184-
185-
// Existing SDKs consume notifications with no response body nor content type
186-
if (response.headers().contentType().isEmpty()) {
187-
sink.success();
188-
return Flux.empty();
189-
// return response.<McpSchema.JSONRPCMessage>createError().doOnError(e -> {
190-
//// sink.error(new RuntimeException("Response has no content type"));
191-
// }).flux();
171+
}
172+
173+
// The spec mentions only ACCEPTED, but the existing SDKs can return
174+
// 200 OK for notifications
175+
// if (!response.statusCode().isSameCodeAs(HttpStatus.ACCEPTED)) {
176+
if (!response.statusCode().is2xxSuccessful()) {
177+
if (response.statusCode().isSameCodeAs(HttpStatus.NOT_FOUND)) {
178+
logger.info("Session {} was not found on the MCP server", sessionId.get());
179+
180+
McpSessionNotFoundException notFoundException = new McpSessionNotFoundException(
181+
"Session " + sessionId.get() + " not found");
182+
// inform the caller of sendMessage
183+
sink.error(notFoundException);
184+
// inform the stream/connection subscriber
185+
return Flux.error(notFoundException);
192186
}
193-
194-
MediaType contentType = response.headers().contentType().get();
195-
196-
if (contentType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) {
197-
sink.success();
198-
McpStream sessionStream = new McpStream(this.resumableStreams);
199-
200-
Flux<Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>>> idWithMessages =
201-
response.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
202-
}).map(this::parse);
203-
204-
return sessionStream.consumeSseStream(idWithMessages);
205-
} else if (contentType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
206-
sink.success();
207-
// return response.bodyToMono(new ParameterizedTypeReference<Iterable<McpSchema.JSONRPCMessage>>() {});
208-
return response.bodyToMono(String.class)
209-
.<Iterable<McpSchema.JSONRPCMessage>>handle((responseMessage, s) -> {
210-
try {
211-
McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper, responseMessage);
212-
s.next(List.of(jsonRpcResponse));
213-
} catch (IOException e) {
214-
s.error(e);
215-
}
187+
return response.<McpSchema.JSONRPCMessage>createError().doOnError(e -> {
188+
sink.error(new RuntimeException("Sending request failed", e));
189+
}).flux();
190+
}
191+
192+
// Existing SDKs consume notifications with no response body nor
193+
// content type
194+
if (response.headers().contentType().isEmpty()) {
195+
sink.success();
196+
return Flux.empty();
197+
// return
198+
// response.<McpSchema.JSONRPCMessage>createError().doOnError(e ->
199+
// {
200+
//// sink.error(new RuntimeException("Response has no content
201+
// type"));
202+
// }).flux();
203+
}
204+
205+
MediaType contentType = response.headers().contentType().get();
206+
207+
if (contentType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) {
208+
sink.success();
209+
McpStream sessionStream = new McpStream(this.resumableStreams);
210+
211+
Flux<Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>>> idWithMessages = response
212+
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
216213
})
217-
.flatMapIterable(Function.identity());
218-
// .map(Mono::just)
219-
// .flatMap(this.handler.get());
220-
} else {
221-
sink.error(new RuntimeException("Unknown media type"));
222-
return Flux.empty();
223-
}
224-
})
225-
.map(Mono::just)
226-
.flatMap(this.handler.get())
227-
// TODO: Consider retries - examine cause to decide whether a retry is needed.
228-
.contextWrite(sink.contextView())
229-
.subscribe();
214+
.map(this::parse);
215+
216+
return sessionStream.consumeSseStream(idWithMessages);
217+
}
218+
else if (contentType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
219+
sink.success();
220+
// return response.bodyToMono(new
221+
// ParameterizedTypeReference<Iterable<McpSchema.JSONRPCMessage>>()
222+
// {});
223+
return response.bodyToMono(
224+
String.class).<Iterable<McpSchema.JSONRPCMessage>>handle((responseMessage, s) -> {
225+
try {
226+
McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema
227+
.deserializeJsonRpcMessage(objectMapper, responseMessage);
228+
s.next(List.of(jsonRpcResponse));
229+
}
230+
catch (IOException e) {
231+
s.error(e);
232+
}
233+
})
234+
.flatMapIterable(Function.identity());
235+
// .map(Mono::just)
236+
// .flatMap(this.handler.get());
237+
}
238+
else {
239+
sink.error(new RuntimeException("Unknown media type"));
240+
return Flux.empty();
241+
}
242+
})
243+
.map(Mono::just)
244+
.flatMap(this.handler.get())
245+
// TODO: Consider retries - examine cause to decide whether a retry is
246+
// needed.
247+
.contextWrite(sink.contextView())
248+
.subscribe();
230249
this.openConnections.add(connection);
231250
});
232251
}
@@ -259,33 +278,33 @@ private class McpStream {
259278
private final AtomicReference<String> lastId = new AtomicReference<>();
260279

261280
private final long streamId;
281+
262282
private final boolean resumable;
263283

264-
McpStream(boolean resumable) {
265-
this.streamId = counter.getAndIncrement();
284+
McpStream(boolean resumable) {
285+
this.streamId = counter.getAndIncrement();
266286
this.resumable = resumable;
267287
}
268288

269289
String lastId() {
270290
return this.lastId.get();
271291
}
272292

273-
Flux<McpSchema.JSONRPCMessage> consumeSseStream(Publisher<Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>>> eventStream) {
274-
return Flux.deferContextual(ctx ->
275-
Flux.from(eventStream)
276-
.doOnError(e -> {
277-
// TODO: examine which error :)
278-
if (resumable) {
279-
Disposable connection = WebClientStreamableHttpTransport.this.startOrResumeSession(this)
280-
.contextWrite(ctx)
281-
.subscribe();
282-
WebClientStreamableHttpTransport.this.openConnections.add(connection);
283-
}
284-
})
285-
.doOnNext(idAndMessage -> idAndMessage.getT1().ifPresent(this.lastId::set))
286-
.flatMapIterable(Tuple2::getT2)
287-
);
293+
Flux<McpSchema.JSONRPCMessage> consumeSseStream(
294+
Publisher<Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>>> eventStream) {
295+
return Flux.deferContextual(ctx -> Flux.from(eventStream).doOnError(e -> {
296+
// TODO: examine which error :)
297+
if (resumable) {
298+
Disposable connection = WebClientStreamableHttpTransport.this.startOrResumeSession(this)
299+
.contextWrite(ctx)
300+
.subscribe();
301+
WebClientStreamableHttpTransport.this.openConnections.add(connection);
302+
}
303+
})
304+
.doOnNext(idAndMessage -> idAndMessage.getT1().ifPresent(this.lastId::set))
305+
.flatMapIterable(Tuple2::getT2));
288306
}
289307

290308
}
309+
291310
}

0 commit comments

Comments
 (0)