Skip to content

Commit 94f172a

Browse files
committed
Enhance roots functionality and improve error handling
- Replace Utils.isEmpty check with null check for roots map - Add specialized error messages for method not found errors - Extend McpError to include JSONRPCError information - Add comprehensive tests for roots functionality including: - Empty roots list handling - Missing capability scenarios - Multiple consumers - Server closure with active subscriptions
1 parent 1e2a176 commit 94f172a

File tree

4 files changed

+153
-26
lines changed

4 files changed

+153
-26
lines changed

mcp/src/main/java/org/springframework/ai/mcp/client/McpAsyncClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,13 @@ public McpAsyncClient(ClientMcpTransport transport, Duration requestTimeout, Imp
150150

151151
this.transport = transport;
152152

153-
this.roots = !Utils.isEmpty(roots) ? new ConcurrentHashMap<>(roots) : new ConcurrentHashMap<>();
153+
this.roots = roots != null ? new ConcurrentHashMap<>(roots) : new ConcurrentHashMap<>();
154154

155155
// Request Handlers
156156
Map<String, RequestHandler> requestHanlers = new HashMap<>();
157157

158158
// Roots List Request Handler
159-
if (!Utils.isEmpty(this.roots) && this.clientCapabilities.roots() != null) {
159+
if (this.roots != null && this.clientCapabilities.roots() != null) {
160160
requestHanlers.put(McpSchema.METHOD_ROOTS_LIST, rootsListRequestHandler());
161161
}
162162

mcp/src/main/java/org/springframework/ai/mcp/spec/DefaultMcpSession.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,11 @@ public DefaultMcpSession(Duration requestTimeout, McpTransport transport,
128128
this.requestHandlers.putAll(requestHandlers);
129129
this.notificationHandlers.putAll(notificationHandlers);
130130

131-
// TODO: consider mono.transformDeferredContextual where the Context contains the
131+
// TODO: consider mono.transformDeferredContextual where the Context contains
132+
// the
132133
// Observation associated with the individual message - it can be used to
133-
// create child Observation and emit it together with the message to the consumer
134+
// create child Observation and emit it together with the message to the
135+
// consumer
134136
this.connection = this.transport.connect(mono -> mono.doOnNext(message -> {
135137
if (message instanceof McpSchema.JSONRPCResponse response) {
136138
logger.info("Received Response: {}", response);
@@ -169,9 +171,10 @@ private Mono<McpSchema.JSONRPCResponse> handleIncomingRequest(McpSchema.JSONRPCR
169171
return Mono.defer(() -> {
170172
var handler = this.requestHandlers.get(request.method());
171173
if (handler == null) {
174+
MethodNotFoundError error = getMethodNotFoundError(request.method());
172175
return Mono.just(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), null,
173176
new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.METHOD_NOT_FOUND,
174-
"Method not found: " + request.method(), null)));
177+
error.message(), error.data())));
175178
}
176179

177180
return handler.handle(request.params())
@@ -183,6 +186,19 @@ private Mono<McpSchema.JSONRPCResponse> handleIncomingRequest(McpSchema.JSONRPCR
183186
});
184187
}
185188

189+
record MethodNotFoundError(String method, String message, Object data) {
190+
}
191+
192+
public static MethodNotFoundError getMethodNotFoundError(String method) {
193+
switch (method) {
194+
case McpSchema.METHOD_ROOTS_LIST:
195+
return new MethodNotFoundError(method, "Roots not supported",
196+
Map.of("reason", "Client does not have roots capability"));
197+
default:
198+
return new MethodNotFoundError(method, "Method not found: " + method, null);
199+
}
200+
}
201+
186202
/**
187203
* Handles an incoming JSON-RPC notification by routing it to the appropriate handler.
188204
* @param notification The incoming JSON-RPC notification

mcp/src/main/java/org/springframework/ai/mcp/spec/McpError.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,23 @@
1515
*/
1616
package org.springframework.ai.mcp.spec;
1717

18+
import org.springframework.ai.mcp.spec.McpSchema.JSONRPCResponse.JSONRPCError;
19+
1820
public class McpError extends RuntimeException {
1921

22+
private JSONRPCError jsonRpcError;
23+
24+
public McpError(JSONRPCError jsonRpcError) {
25+
super(jsonRpcError.message());
26+
this.jsonRpcError = jsonRpcError;
27+
}
28+
2029
public McpError(Object error) {
2130
super(error.toString());
2231
}
2332

33+
public JSONRPCError getJsonRpcError() {
34+
return jsonRpcError;
35+
}
36+
2437
}

mcp/src/test/java/org/springframework/ai/mcp/server/SseAsyncIntegrationTests.java

Lines changed: 119 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
/*
2-
* Copyright 2024 - 2024 the original author or authors.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* https://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
2+
* Copyright 2024 - 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
1616
package org.springframework.ai.mcp.server;
1717

1818
import java.time.Duration;
@@ -46,6 +46,7 @@
4646
import org.springframework.web.reactive.function.server.RouterFunctions;
4747

4848
import static org.assertj.core.api.Assertions.assertThat;
49+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4950
import static org.awaitility.Awaitility.await;
5051

5152
public class SseAsyncIntegrationTests {
@@ -165,20 +166,13 @@ void testCreateMessageSuccess() throws InterruptedException {
165166
// ---------------------------------------
166167
@Test
167168
void testRootsSuccess() {
168-
169169
List<Root> roots = List.of(new Root("uri1://", "root1"), new Root("uri2://", "root2"));
170170

171171
AtomicReference<List<Root>> rootsRef = new AtomicReference<>();
172172
var mcpServer = McpServer.using(mcpServerTransport)
173173
.rootsChangeConsumer(rootsUpdate -> rootsRef.set(rootsUpdate))
174174
.sync();
175175

176-
// HttpHandler httpHandler =
177-
// RouterFunctions.toHttpHandler(mcpServerTransport.getRouterFunction());
178-
// ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
179-
// HttpServer httpServer = HttpServer.create().port(8080).handle(adapter);
180-
// DisposableServer d = httpServer.bindNow();
181-
182176
var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build())
183177
.roots(roots)
184178
.sync();
@@ -212,8 +206,112 @@ void testRootsSuccess() {
212206
});
213207

214208
mcpClient.close();
209+
mcpServer.close();
210+
}
211+
212+
@Test
213+
void testRootsWithoutCapability() {
214+
var mcpServer = McpServer.using(mcpServerTransport).rootsChangeConsumer(rootsUpdate -> {
215+
}).sync();
216+
217+
// Create client without roots capability
218+
var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().build()) // No
219+
// roots
220+
// capability
221+
.sync();
222+
223+
InitializeResult initResult = mcpClient.initialize();
224+
assertThat(initResult).isNotNull();
225+
226+
// Attempt to list roots should fail
227+
assertThatThrownBy(() -> mcpServer.listRoots().roots()).isInstanceOf(McpError.class)
228+
.hasMessage("Roots not supported");
229+
230+
mcpClient.close();
231+
mcpServer.close();
232+
}
233+
234+
@Test
235+
void testRootsWithEmptyRootsList() {
236+
AtomicReference<List<Root>> rootsRef = new AtomicReference<>();
237+
var mcpServer = McpServer.using(mcpServerTransport)
238+
.rootsChangeConsumer(rootsUpdate -> rootsRef.set(rootsUpdate))
239+
.sync();
240+
241+
var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build())
242+
.roots(List.of()) // Empty roots list
243+
.sync();
244+
245+
InitializeResult initResult = mcpClient.initialize();
246+
assertThat(initResult).isNotNull();
247+
248+
mcpClient.rootsListChangedNotification();
249+
250+
await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
251+
assertThat(rootsRef.get()).isEmpty();
252+
});
253+
254+
mcpClient.close();
255+
mcpServer.close();
256+
}
257+
258+
@Test
259+
void testRootsWithMultipleConsumers() {
260+
List<Root> roots = List.of(new Root("uri1://", "root1"));
261+
262+
AtomicReference<List<Root>> rootsRef1 = new AtomicReference<>();
263+
AtomicReference<List<Root>> rootsRef2 = new AtomicReference<>();
264+
265+
var mcpServer = McpServer.using(mcpServerTransport)
266+
.rootsChangeConsumer(rootsUpdate -> rootsRef1.set(rootsUpdate))
267+
.rootsChangeConsumer(rootsUpdate -> rootsRef2.set(rootsUpdate))
268+
.sync();
269+
270+
var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build())
271+
.roots(roots)
272+
.sync();
273+
274+
InitializeResult initResult = mcpClient.initialize();
275+
assertThat(initResult).isNotNull();
276+
277+
mcpClient.rootsListChangedNotification();
278+
279+
await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
280+
assertThat(rootsRef1.get()).containsAll(roots);
281+
assertThat(rootsRef2.get()).containsAll(roots);
282+
});
283+
284+
mcpClient.close();
285+
mcpServer.close();
286+
}
287+
288+
@Test
289+
void testRootsServerCloseWithActiveSubscription() {
290+
List<Root> roots = List.of(new Root("uri1://", "root1"));
215291

292+
AtomicReference<List<Root>> rootsRef = new AtomicReference<>();
293+
var mcpServer = McpServer.using(mcpServerTransport)
294+
.rootsChangeConsumer(rootsUpdate -> rootsRef.set(rootsUpdate))
295+
.sync();
296+
297+
var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build())
298+
.roots(roots)
299+
.sync();
300+
301+
InitializeResult initResult = mcpClient.initialize();
302+
assertThat(initResult).isNotNull();
303+
304+
mcpClient.rootsListChangedNotification();
305+
306+
await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
307+
assertThat(rootsRef.get()).containsAll(roots);
308+
});
309+
310+
// Close server while subscription is active
216311
mcpServer.close();
312+
313+
// Verify client can handle server closure gracefully
314+
mcpClient.close();
217315
}
218316

219317
}

0 commit comments

Comments
 (0)