Skip to content

Commit 84f7ae0

Browse files
committed
Starting with resiliency tests
1 parent e249145 commit 84f7ae0

File tree

7 files changed

+181
-2
lines changed

7 files changed

+181
-2
lines changed

mcp-spring/mcp-spring-webflux/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@
9999
<version>${testcontainers.version}</version>
100100
<scope>test</scope>
101101
</dependency>
102+
<dependency>
103+
<groupId>org.testcontainers</groupId>
104+
<artifactId>toxiproxy</artifactId>
105+
<version>${toxiproxy.version}</version>
106+
<scope>test</scope>
107+
</dependency>
102108

103109
<dependency>
104110
<groupId>org.awaitility</groupId>

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMappe
190190
*/
191191
@Override
192192
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
193+
// TODO: Avoid eager connection opening and enable resilience
194+
// -> upon disconnects, re-establish connection
195+
// -> allow optimizing for eager connection start using a constructor flag
193196
Flux<ServerSentEvent<String>> events = eventStream();
194197
this.inboundSubscription = events.concatMap(event -> Mono.just(event).<JSONRPCMessage>handle((e, s) -> {
195198
if (ENDPOINT_EVENT_TYPE.equals(event.event())) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.modelcontextprotocol.client;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
5+
import io.modelcontextprotocol.spec.McpClientTransport;
6+
import org.junit.jupiter.api.AfterAll;
7+
import org.junit.jupiter.api.BeforeAll;
8+
import org.springframework.web.reactive.function.client.WebClient;
9+
import org.testcontainers.containers.GenericContainer;
10+
import org.testcontainers.containers.wait.strategy.Wait;
11+
12+
public class WebClientStreamableHttpAsyncClientResiliencyTests extends AbstractMcpAsyncClientResiliencyTests {
13+
14+
@Override
15+
protected McpClientTransport createMcpTransport() {
16+
return new WebClientStreamableHttpTransport(new ObjectMapper(), WebClient.builder().baseUrl(host), "/mcp", true,
17+
false);
18+
}
19+
20+
}

mcp-test/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@
6868
<artifactId>junit-jupiter</artifactId>
6969
<version>${testcontainers.version}</version>
7070
</dependency>
71+
<dependency>
72+
<groupId>org.testcontainers</groupId>
73+
<artifactId>toxiproxy</artifactId>
74+
<version>${toxiproxy.version}</version>
75+
</dependency>
7176

7277
<dependency>
7378
<groupId>org.awaitility</groupId>
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package io.modelcontextprotocol.client;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import eu.rekawek.toxiproxy.Proxy;
5+
import eu.rekawek.toxiproxy.ToxiproxyClient;
6+
import eu.rekawek.toxiproxy.model.ToxicDirection;
7+
import io.modelcontextprotocol.spec.McpClientTransport;
8+
import io.modelcontextprotocol.spec.McpSchema;
9+
import org.awaitility.Awaitility;
10+
import org.junit.jupiter.api.AfterAll;
11+
import org.junit.jupiter.api.BeforeAll;
12+
import org.junit.jupiter.api.Test;
13+
import org.testcontainers.containers.GenericContainer;
14+
import org.testcontainers.containers.Network;
15+
import org.testcontainers.containers.ToxiproxyContainer;
16+
import org.testcontainers.containers.wait.strategy.Wait;
17+
import reactor.test.StepVerifier;
18+
19+
import java.io.IOException;
20+
import java.time.Duration;
21+
import java.util.concurrent.atomic.AtomicReference;
22+
import java.util.function.Consumer;
23+
import java.util.function.Function;
24+
25+
import static org.assertj.core.api.Assertions.assertThat;
26+
import static org.assertj.core.api.Assertions.assertThatCode;
27+
28+
public abstract class AbstractMcpAsyncClientResiliencyTests {
29+
30+
static Network network = Network.newNetwork();
31+
static String host = "http://localhost:3001";
32+
33+
// Uses the https://github.com/tzolov/mcp-everything-server-docker-image
34+
@SuppressWarnings("resource")
35+
static GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
36+
.withCommand("node dist/index.js streamableHttp")
37+
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
38+
.withNetwork(network)
39+
.withNetworkAliases("everything-server")
40+
.withExposedPorts(3001)
41+
.waitingFor(Wait.forHttp("/").forStatusCode(404));
42+
43+
static ToxiproxyContainer toxiproxy = new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0").withNetwork(network)
44+
.withExposedPorts(8474, 3000);
45+
46+
static Proxy proxy;
47+
48+
static {
49+
container.start();
50+
51+
toxiproxy.start();
52+
53+
final ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort());
54+
try {
55+
proxy = toxiproxyClient.createProxy("everything-server", "0.0.0.0:3000", "everything-server:3001");
56+
}
57+
catch (IOException e) {
58+
throw new RuntimeException("Can't create proxy!", e);
59+
}
60+
61+
final String ipAddressViaToxiproxy = toxiproxy.getHost();
62+
final int portViaToxiproxy = toxiproxy.getMappedPort(3000);
63+
64+
// int port = container.getMappedPort(3001);
65+
host = "http://" + ipAddressViaToxiproxy + ":" + portViaToxiproxy;
66+
}
67+
68+
abstract McpClientTransport createMcpTransport();
69+
70+
protected Duration getRequestTimeout() {
71+
return Duration.ofSeconds(14);
72+
}
73+
74+
protected Duration getInitializationTimeout() {
75+
return Duration.ofSeconds(2);
76+
}
77+
78+
McpAsyncClient client(McpClientTransport transport) {
79+
return client(transport, Function.identity());
80+
}
81+
82+
McpAsyncClient client(McpClientTransport transport, Function<McpClient.AsyncSpec, McpClient.AsyncSpec> customizer) {
83+
AtomicReference<McpAsyncClient> client = new AtomicReference<>();
84+
85+
assertThatCode(() -> {
86+
McpClient.AsyncSpec builder = McpClient.async(transport)
87+
.requestTimeout(getRequestTimeout())
88+
.initializationTimeout(getInitializationTimeout())
89+
.capabilities(McpSchema.ClientCapabilities.builder().roots(true).build());
90+
builder = customizer.apply(builder);
91+
client.set(builder.build());
92+
}).doesNotThrowAnyException();
93+
94+
return client.get();
95+
}
96+
97+
void withClient(McpClientTransport transport, Consumer<McpAsyncClient> c) {
98+
withClient(transport, Function.identity(), c);
99+
}
100+
101+
void withClient(McpClientTransport transport, Function<McpClient.AsyncSpec, McpClient.AsyncSpec> customizer,
102+
Consumer<McpAsyncClient> c) {
103+
var client = client(transport, customizer);
104+
try {
105+
c.accept(client);
106+
}
107+
finally {
108+
StepVerifier.create(client.closeGracefully()).expectComplete().verify(Duration.ofSeconds(10));
109+
}
110+
}
111+
112+
@Test
113+
void testPing() {
114+
withClient(createMcpTransport(), mcpAsyncClient -> {
115+
try {
116+
StepVerifier.create(mcpAsyncClient.initialize()).expectNextCount(1).verifyComplete();
117+
118+
// disconnect
119+
// proxy.toxics().bandwidth("CUT_CONNECTION_DOWNSTREAM",
120+
// ToxicDirection.DOWNSTREAM, 0);
121+
// proxy.toxics().bandwidth("CUT_CONNECTION_UPSTREAM",
122+
// ToxicDirection.UPSTREAM, 0);
123+
proxy.toxics().resetPeer("RESET_DOWNSTREAM", ToxicDirection.DOWNSTREAM, 0);
124+
proxy.toxics().resetPeer("RESET_UPSTREAM", ToxicDirection.UPSTREAM, 0);
125+
126+
StepVerifier.create(mcpAsyncClient.ping()).expectError().verify();
127+
128+
proxy.toxics().get("RESET_UPSTREAM").remove();
129+
proxy.toxics().get("RESET_DOWNSTREAM").remove();
130+
// proxy.toxics().get("CUT_CONNECTION_DOWNSTREAM").remove();
131+
// proxy.toxics().get("CUT_CONNECTION_UPSTREAM").remove();
132+
133+
StepVerifier.create(mcpAsyncClient.ping()).expectNextCount(1).verifyComplete();
134+
}
135+
catch (IOException e) {
136+
throw new RuntimeException(e);
137+
}
138+
});
139+
}
140+
141+
}

mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,10 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport, C
125125
// consumer
126126
this.connection = this.transport.connect(mono -> mono.doOnNext(this::handle)).subscribe();
127127
this.transport.handleException(t -> {
128-
this.pendingResponses.clear();
128+
// 🤔 let's think for a moment - we only clear when the session is invalidated
129+
if (t instanceof McpSessionNotFoundException) {
130+
this.pendingResponses.clear();
131+
}
129132
exceptionHandler.accept(t);
130133
});
131134
}

pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@
6363
<junit.version>5.10.2</junit.version>
6464
<mockito.version>5.17.0</mockito.version>
6565
<testcontainers.version>1.20.4</testcontainers.version>
66-
<byte-buddy.version>1.17.5</byte-buddy.version>
66+
<byte-buddy.version>1.17.5</byte-buddy.version>
67+
<toxiproxy.version>1.21.0</toxiproxy.version>
6768

6869
<slf4j-api.version>2.0.16</slf4j-api.version>
6970
<logback.version>1.5.15</logback.version>

0 commit comments

Comments
 (0)