Skip to content

Commit 7180426

Browse files
tzolovchemicL
andauthored
refactor(client): improve validation and remove server methods (modelcontextprotocol#14)
Add client initialization and capability validation checks - New isInitialized() method to check client state - Validate server capabilities before tool/resource operations - Add clear error messages for common failure cases - Remove server-side notification methods from client: sendResourcesListChanged(), promptListChangedNotification() - Improve protocol version handling - Testing improvements and new initialization tests - Redesign MockMcpTransport internals and adapt tests - Correct McpError messages - clean unused imports Resolves modelcontextprotocol#13 Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com> Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com> Co-authored-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
1 parent d9c4818 commit 7180426

File tree

10 files changed

+184
-112
lines changed

10 files changed

+184
-112
lines changed

mcp-test/src/main/java/io/modelcontextprotocol/MockMcpTransport.java

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
package io.modelcontextprotocol;
66

7-
import java.util.concurrent.atomic.AtomicInteger;
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.function.BiConsumer;
810
import java.util.function.Function;
911

1012
import com.fasterxml.jackson.core.type.TypeReference;
@@ -14,47 +16,53 @@
1416
import io.modelcontextprotocol.spec.ServerMcpTransport;
1517
import io.modelcontextprotocol.spec.McpSchema.JSONRPCNotification;
1618
import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest;
17-
import reactor.core.publisher.Flux;
1819
import reactor.core.publisher.Mono;
1920
import reactor.core.publisher.Sinks;
20-
import reactor.core.scheduler.Schedulers;
2121

22-
@SuppressWarnings("unused")
22+
/**
23+
* A mock implementation of the {@link ClientMcpTransport} and {@link ServerMcpTransport}
24+
* interfaces.
25+
*/
2326
public class MockMcpTransport implements ClientMcpTransport, ServerMcpTransport {
2427

25-
private final AtomicInteger inboundMessageCount = new AtomicInteger(0);
28+
private final Sinks.Many<McpSchema.JSONRPCMessage> inbound = Sinks.many().unicast().onBackpressureBuffer();
29+
30+
private final List<McpSchema.JSONRPCMessage> sent = new ArrayList<>();
2631

27-
private final Sinks.Many<McpSchema.JSONRPCMessage> outgoing = Sinks.many().multicast().onBackpressureBuffer();
32+
private final BiConsumer<MockMcpTransport, McpSchema.JSONRPCMessage> interceptor;
2833

29-
private final Sinks.Many<McpSchema.JSONRPCMessage> inbound = Sinks.many().unicast().onBackpressureBuffer();
34+
public MockMcpTransport() {
35+
this((t, msg) -> {
36+
});
37+
}
3038

31-
private final Flux<McpSchema.JSONRPCMessage> outboundView = outgoing.asFlux().cache(1);
39+
public MockMcpTransport(BiConsumer<MockMcpTransport, McpSchema.JSONRPCMessage> interceptor) {
40+
this.interceptor = interceptor;
41+
}
3242

3343
public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) {
3444
if (inbound.tryEmitNext(message).isFailure()) {
35-
throw new RuntimeException("Failed to emit message " + message);
45+
throw new RuntimeException("Failed to process incoming message " + message);
3646
}
37-
inboundMessageCount.incrementAndGet();
3847
}
3948

4049
@Override
4150
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
42-
if (outgoing.tryEmitNext(message).isFailure()) {
43-
return Mono.error(new RuntimeException("Can't emit outgoing message " + message));
44-
}
51+
sent.add(message);
52+
interceptor.accept(this, message);
4553
return Mono.empty();
4654
}
4755

4856
public McpSchema.JSONRPCRequest getLastSentMessageAsRequest() {
49-
return (JSONRPCRequest) outboundView.blockFirst();
57+
return (JSONRPCRequest) getLastSentMessage();
5058
}
5159

52-
public McpSchema.JSONRPCNotification getLastSentMessageAsNotifiation() {
53-
return (JSONRPCNotification) outboundView.blockFirst();
60+
public McpSchema.JSONRPCNotification getLastSentMessageAsNotification() {
61+
return (JSONRPCNotification) getLastSentMessage();
5462
}
5563

5664
public McpSchema.JSONRPCMessage getLastSentMessage() {
57-
return outboundView.blockFirst();
65+
return !sent.isEmpty() ? sent.get(sent.size() - 1) : null;
5866
}
5967

6068
private volatile boolean connected = false;
@@ -66,7 +74,6 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
6674
}
6775
connected = true;
6876
return inbound.asFlux()
69-
.publishOn(Schedulers.boundedElastic())
7077
.flatMap(message -> Mono.just(message).transform(handler))
7178
.doFinally(signal -> connected = false)
7279
.then();
@@ -76,8 +83,8 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
7683
public Mono<Void> closeGracefully() {
7784
return Mono.defer(() -> {
7885
connected = false;
79-
outgoing.tryEmitComplete();
8086
inbound.tryEmitComplete();
87+
// Wait for all subscribers to complete
8188
return Mono.empty();
8289
});
8390
}
@@ -87,4 +94,4 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
8794
return new ObjectMapper().convertValue(data, typeRef);
8895
}
8996

90-
}
97+
}

mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,6 @@ void testNotificationHandlers() {
274274

275275
assertThatCode(() -> {
276276
client.initialize().block();
277-
// Trigger notifications
278-
client.sendResourcesListChanged().block();
279-
client.promptListChangedNotification().block();
280277
client.closeGracefully().block();
281278
}).doesNotThrowAnyException();
282279
}

mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,9 +258,6 @@ void testNotificationHandlers() {
258258

259259
assertThatCode(() -> {
260260
client.initialize();
261-
// Trigger notifications
262-
client.sendResourcesListChanged();
263-
client.promptListChangedNotification();
264261
client.close();
265262
}).doesNotThrowAnyException();
266263
}

mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,14 @@ public McpSchema.Implementation getServerInfo() {
296296
return this.serverInfo;
297297
}
298298

299+
/**
300+
* Check if the client-server connection is initialized.
301+
* @return true if the client-server connection is initialized
302+
*/
303+
public boolean isInitialized() {
304+
return this.serverCapabilities != null;
305+
}
306+
299307
/**
300308
* Get the client capabilities that define the supported features and functionality.
301309
* @return The client capabilities
@@ -456,6 +464,12 @@ private RequestHandler<CreateMessageResult> samplingCreateMessageHandler() {
456464
* (false/absent)
457465
*/
458466
public Mono<McpSchema.CallToolResult> callTool(McpSchema.CallToolRequest callToolRequest) {
467+
if (!this.isInitialized()) {
468+
return Mono.error(new McpError("Client must be initialized before calling tools"));
469+
}
470+
if (this.serverCapabilities.tools() == null) {
471+
return Mono.error(new McpError("Server does not provide tools capability"));
472+
}
459473
return this.mcpSession.sendRequest(McpSchema.METHOD_TOOLS_CALL, callToolRequest, CALL_TOOL_RESULT_TYPE_REF);
460474
}
461475

@@ -477,6 +491,12 @@ public Mono<McpSchema.ListToolsResult> listTools() {
477491
* Optional cursor for pagination if more tools are available
478492
*/
479493
public Mono<McpSchema.ListToolsResult> listTools(String cursor) {
494+
if (!this.isInitialized()) {
495+
return Mono.error(new McpError("Client must be initialized before listing tools"));
496+
}
497+
if (this.serverCapabilities.tools() == null) {
498+
return Mono.error(new McpError("Server does not provide tools capability"));
499+
}
480500
return this.mcpSession.sendRequest(McpSchema.METHOD_TOOLS_LIST, new McpSchema.PaginatedRequest(cursor),
481501
LIST_TOOLS_RESULT_TYPE_REF);
482502
}
@@ -532,6 +552,12 @@ public Mono<McpSchema.ListResourcesResult> listResources() {
532552
* @return A Mono that completes with the list of resources result
533553
*/
534554
public Mono<McpSchema.ListResourcesResult> listResources(String cursor) {
555+
if (!this.isInitialized()) {
556+
return Mono.error(new McpError("Client must be initialized before listing resources"));
557+
}
558+
if (this.serverCapabilities.resources() == null) {
559+
return Mono.error(new McpError("Server does not provide the resources capability"));
560+
}
535561
return this.mcpSession.sendRequest(McpSchema.METHOD_RESOURCES_LIST, new McpSchema.PaginatedRequest(cursor),
536562
LIST_RESOURCES_RESULT_TYPE_REF);
537563
}
@@ -551,6 +577,12 @@ public Mono<McpSchema.ReadResourceResult> readResource(McpSchema.Resource resour
551577
* @return A Mono that completes with the resource content
552578
*/
553579
public Mono<McpSchema.ReadResourceResult> readResource(McpSchema.ReadResourceRequest readResourceRequest) {
580+
if (!this.isInitialized()) {
581+
return Mono.error(new McpError("Client must be initialized before reading resources"));
582+
}
583+
if (this.serverCapabilities.resources() == null) {
584+
return Mono.error(new McpError("Server does not provide the resources capability"));
585+
}
554586
return this.mcpSession.sendRequest(McpSchema.METHOD_RESOURCES_READ, readResourceRequest,
555587
READ_RESOURCE_RESULT_TYPE_REF);
556588
}
@@ -575,19 +607,16 @@ public Mono<McpSchema.ListResourceTemplatesResult> listResourceTemplates() {
575607
* @return A Mono that completes with the list of resource templates result
576608
*/
577609
public Mono<McpSchema.ListResourceTemplatesResult> listResourceTemplates(String cursor) {
610+
if (!this.isInitialized()) {
611+
return Mono.error(new McpError("Client must be initialized before listing resource templates"));
612+
}
613+
if (this.serverCapabilities.resources() == null) {
614+
return Mono.error(new McpError("Server does not provide the resources capability"));
615+
}
578616
return this.mcpSession.sendRequest(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST,
579617
new McpSchema.PaginatedRequest(cursor), LIST_RESOURCE_TEMPLATES_RESULT_TYPE_REF);
580618
}
581619

582-
/**
583-
* List Changed Notification. When the list of available resources changes, servers
584-
* that declared the listChanged capability SHOULD send a notification.
585-
* @return A Mono that completes when the notification is sent
586-
*/
587-
public Mono<Void> sendResourcesListChanged() {
588-
return this.mcpSession.sendNotification(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED);
589-
}
590-
591620
/**
592621
* Subscriptions. The protocol supports optional subscriptions to resource changes.
593622
* Clients can subscribe to specific resources and receive notifications when they
@@ -660,16 +689,6 @@ public Mono<GetPromptResult> getPrompt(GetPromptRequest getPromptRequest) {
660689
return this.mcpSession.sendRequest(McpSchema.METHOD_PROMPT_GET, getPromptRequest, GET_PROMPT_RESULT_TYPE_REF);
661690
}
662691

663-
/**
664-
* (Server) An optional notification from the server to the client, informing it that
665-
* the list of prompts it offers has changed. This may be issued by servers without
666-
* any previous subscription from the client.
667-
* @return A Mono that completes when the notification is sent
668-
*/
669-
public Mono<Void> promptListChangedNotification() {
670-
return this.mcpSession.sendNotification(McpSchema.METHOD_NOTIFICATION_PROMPTS_LIST_CHANGED);
671-
}
672-
673692
private NotificationHandler asyncPromptsChangeNotificationHandler(
674693
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers) {
675694
return params -> listPrompts().flatMap(listPromptsResult -> Flux.fromIterable(promptsChangeConsumers)

mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -284,14 +284,6 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
284284
return this.delegate.listResourceTemplates().block();
285285
}
286286

287-
/**
288-
* List Changed Notification. When the list of available resources changes, servers
289-
* that declared the listChanged capability SHOULD send a notification:
290-
*/
291-
public void sendResourcesListChanged() {
292-
this.delegate.sendResourcesListChanged().block();
293-
}
294-
295287
/**
296288
* Subscriptions. The protocol supports optional subscriptions to resource changes.
297289
* Clients can subscribe to specific resources and receive notifications when they
@@ -329,15 +321,6 @@ public GetPromptResult getPrompt(GetPromptRequest getPromptRequest) {
329321
return this.delegate.getPrompt(getPromptRequest).block();
330322
}
331323

332-
/**
333-
* (Server) An optional notification from the server to the client, informing it that
334-
* the list of prompts it offers has changed. This may be issued by servers without
335-
* any previous subscription from the client.
336-
*/
337-
public void promptListChangedNotification() {
338-
this.delegate.promptListChangedNotification().block();
339-
}
340-
341324
/**
342325
* Client can set the minimum logging level it wants to receive from the server.
343326
* @param loggingLevel the min logging level

mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,13 @@ private DefaultMcpSession.RequestHandler<McpSchema.InitializeResult> asyncInitia
188188
initializeRequest.protocolVersion(), initializeRequest.capabilities(),
189189
initializeRequest.clientInfo());
190190

191+
// The server MUST respond with the highest protocol version it supports if
192+
// it does not support the requested (e.g. Client) version.
191193
String serverProtocolVersion = this.protocolVersions.get(this.protocolVersions.size() - 1);
192194

193195
if (this.protocolVersions.contains(initializeRequest.protocolVersion())) {
196+
// If the server supports the requested protocol version, it MUST respond
197+
// with the same version.
194198
serverProtocolVersion = initializeRequest.protocolVersion();
195199
}
196200
else {

mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
package io.modelcontextprotocol;
66

7-
import java.util.concurrent.atomic.AtomicInteger;
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.function.BiConsumer;
810
import java.util.function.Function;
911

1012
import com.fasterxml.jackson.core.type.TypeReference;
@@ -14,50 +16,53 @@
1416
import io.modelcontextprotocol.spec.ServerMcpTransport;
1517
import io.modelcontextprotocol.spec.McpSchema.JSONRPCNotification;
1618
import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest;
17-
import reactor.core.publisher.Flux;
1819
import reactor.core.publisher.Mono;
1920
import reactor.core.publisher.Sinks;
20-
import reactor.core.scheduler.Schedulers;
2121

2222
/**
2323
* A mock implementation of the {@link ClientMcpTransport} and {@link ServerMcpTransport}
2424
* interfaces.
2525
*/
2626
public class MockMcpTransport implements ClientMcpTransport, ServerMcpTransport {
2727

28-
private final AtomicInteger inboundMessageCount = new AtomicInteger(0);
28+
private final Sinks.Many<McpSchema.JSONRPCMessage> inbound = Sinks.many().unicast().onBackpressureBuffer();
2929

30-
private final Sinks.Many<McpSchema.JSONRPCMessage> outgoing = Sinks.many().multicast().onBackpressureBuffer();
30+
private final List<McpSchema.JSONRPCMessage> sent = new ArrayList<>();
3131

32-
private final Sinks.Many<McpSchema.JSONRPCMessage> inbound = Sinks.many().unicast().onBackpressureBuffer();
32+
private final BiConsumer<MockMcpTransport, McpSchema.JSONRPCMessage> interceptor;
3333

34-
private final Flux<McpSchema.JSONRPCMessage> outboundView = outgoing.asFlux().cache(1);
34+
public MockMcpTransport() {
35+
this((t, msg) -> {
36+
});
37+
}
38+
39+
public MockMcpTransport(BiConsumer<MockMcpTransport, McpSchema.JSONRPCMessage> interceptor) {
40+
this.interceptor = interceptor;
41+
}
3542

3643
public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) {
3744
if (inbound.tryEmitNext(message).isFailure()) {
38-
throw new RuntimeException("Failed to emit message " + message);
45+
throw new RuntimeException("Failed to process incoming message " + message);
3946
}
40-
inboundMessageCount.incrementAndGet();
4147
}
4248

4349
@Override
4450
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
45-
if (outgoing.tryEmitNext(message).isFailure()) {
46-
return Mono.error(new RuntimeException("Can't emit outgoing message " + message));
47-
}
51+
sent.add(message);
52+
interceptor.accept(this, message);
4853
return Mono.empty();
4954
}
5055

5156
public McpSchema.JSONRPCRequest getLastSentMessageAsRequest() {
52-
return (JSONRPCRequest) outboundView.blockFirst();
57+
return (JSONRPCRequest) getLastSentMessage();
5358
}
5459

55-
public McpSchema.JSONRPCNotification getLastSentMessageAsNotifiation() {
56-
return (JSONRPCNotification) outboundView.blockFirst();
60+
public McpSchema.JSONRPCNotification getLastSentMessageAsNotification() {
61+
return (JSONRPCNotification) getLastSentMessage();
5762
}
5863

5964
public McpSchema.JSONRPCMessage getLastSentMessage() {
60-
return outboundView.blockFirst();
65+
return !sent.isEmpty() ? sent.get(sent.size() - 1) : null;
6166
}
6267

6368
private volatile boolean connected = false;
@@ -69,7 +74,6 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
6974
}
7075
connected = true;
7176
return inbound.asFlux()
72-
.publishOn(Schedulers.boundedElastic())
7377
.flatMap(message -> Mono.just(message).transform(handler))
7478
.doFinally(signal -> connected = false)
7579
.then();
@@ -79,7 +83,6 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
7983
public Mono<Void> closeGracefully() {
8084
return Mono.defer(() -> {
8185
connected = false;
82-
outgoing.tryEmitComplete();
8386
inbound.tryEmitComplete();
8487
// Wait for all subscribers to complete
8588
return Mono.empty();

mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,9 +275,6 @@ void testNotificationHandlers() {
275275

276276
assertThatCode(() -> {
277277
client.initialize().block();
278-
// Trigger notifications
279-
client.sendResourcesListChanged().block();
280-
client.promptListChangedNotification().block();
281278
client.closeGracefully().block();
282279
}).doesNotThrowAnyException();
283280
}

mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,9 +259,6 @@ void testNotificationHandlers() {
259259

260260
assertThatCode(() -> {
261261
client.initialize();
262-
// Trigger notifications
263-
client.sendResourcesListChanged();
264-
client.promptListChangedNotification();
265262
client.close();
266263
}).doesNotThrowAnyException();
267264
}

0 commit comments

Comments
 (0)