Skip to content

Commit ad3cdd2

Browse files
committed
fix: Improve MCP server stability and test reliability
- Replace bounded elastic schedulers with single thread executors - Add timeouts to MCP server tests - Enhance error handling in StdioServerTransport - Comment out unused PaginatedRequest unmarshalling - Simplify shutdown logic in StdioServerTransport
1 parent fcf0ff0 commit ad3cdd2

File tree

9 files changed

+50
-41
lines changed

9 files changed

+50
-41
lines changed

mcp/src/main/java/org/springframework/ai/mcp/server/McpAsyncServer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -349,9 +349,9 @@ private DefaultMcpSession.RequestHandler toolsCallRequestHandler() {
349349

350350
private DefaultMcpSession.RequestHandler resourcesListRequestHandler() {
351351
return params -> {
352-
McpSchema.PaginatedRequest request = transport.unmarshalFrom(params,
353-
new TypeReference<McpSchema.PaginatedRequest>() {
354-
});
352+
// McpSchema.PaginatedRequest request = transport.unmarshalFrom(params,
353+
// new TypeReference<McpSchema.PaginatedRequest>() {
354+
// });
355355

356356
var resourceList = this.resources.values().stream().map(ResourceRegistration::resource).toList();
357357

@@ -361,9 +361,9 @@ private DefaultMcpSession.RequestHandler resourcesListRequestHandler() {
361361

362362
private DefaultMcpSession.RequestHandler resourceTemplateListRequestHandler() {
363363
return params -> {
364-
McpSchema.PaginatedRequest request = transport.unmarshalFrom(params,
365-
new TypeReference<McpSchema.PaginatedRequest>() {
366-
});
364+
// McpSchema.PaginatedRequest request = transport.unmarshalFrom(params,
365+
// new TypeReference<McpSchema.PaginatedRequest>() {
366+
// });
367367

368368
return Mono.just(new McpSchema.ListResourceTemplatesResult(this.resourceTemplates, null));
369369
};

mcp/src/main/java/org/springframework/ai/mcp/server/transport/StdioServerTransport.java

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.io.OutputStream;
2424
import java.nio.charset.StandardCharsets;
2525
import java.time.Duration;
26+
import java.util.concurrent.Executors;
2627
import java.util.function.Function;
2728

2829
import com.fasterxml.jackson.core.type.TypeReference;
@@ -97,8 +98,8 @@ public StdioServerTransport(ObjectMapper objectMapper) {
9798
this.outputStream = System.out;
9899

99100
// Use bounded schedulers for better resource management
100-
this.inboundScheduler = Schedulers.newBoundedElastic(1, 1, "inbound");
101-
this.outboundScheduler = Schedulers.newBoundedElastic(1, 1, "outbound");
101+
this.inboundScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), "inbound");
102+
this.outboundScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), "outbound");
102103
}
103104

104105
@Override
@@ -139,38 +140,43 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
139140
private void startInboundProcessing() {
140141
this.inboundScheduler.schedule(() -> {
141142
inboundReady.tryEmitValue(null);
142-
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
143-
String line;
144-
while (!isClosing && (line = reader.readLine()) != null) {
143+
BufferedReader reader = null;
144+
try {
145+
reader = new BufferedReader(new InputStreamReader(inputStream));
146+
while (!isClosing) {
145147
try {
146-
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, line);
147-
if (!this.inboundSink.tryEmitNext(message).isSuccess()) {
148+
String line = reader.readLine();
149+
if (line == null || isClosing) {
150+
break;
151+
}
152+
153+
try {
154+
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, line);
155+
if (!this.inboundSink.tryEmitNext(message).isSuccess()) {
156+
if (!isClosing) {
157+
logger.error("Failed to enqueue message");
158+
}
159+
break;
160+
}
161+
}
162+
catch (Exception e) {
148163
if (!isClosing) {
149-
logger.error("Failed to enqueue message");
164+
logger.error("Error processing inbound message", e);
150165
}
151166
break;
152167
}
153168
}
154-
catch (Exception e) {
169+
catch (IOException e) {
155170
if (!isClosing) {
156-
logger.error("Error processing inbound message", e);
171+
logger.error("Error reading from stdin", e);
157172
}
158173
break;
159174
}
160175
}
161176
}
162-
catch (IOException e) {
163-
// Check isClosing before the error occurs to properly categorize it
164-
boolean wasClosing = isClosing;
165-
isClosing = true;
166-
if (!wasClosing && e.getMessage().equals("Pipe closed")) {
167-
logger.debug("Stream closed during shutdown", e);
168-
}
169-
else if (!wasClosing) {
170-
logger.error("Error reading from stdin", e);
171-
}
172-
else {
173-
logger.debug("Stream error during shutdown", e);
177+
catch (Exception e) {
178+
if (!isClosing) {
179+
logger.error("Error in inbound processing", e);
174180
}
175181
}
176182
finally {
@@ -234,6 +240,7 @@ else if (isClosing) {
234240

235241
@Override
236242
public Mono<Void> closeGracefully() {
243+
237244
return Mono.fromRunnable(() -> {
238245
isClosing = true;
239246
logger.debug("Initiating graceful shutdown");
@@ -244,18 +251,10 @@ public Mono<Void> closeGracefully() {
244251
return Mono.delay(Duration.ofMillis(100));
245252
})).then(Mono.fromRunnable(() -> {
246253
try {
247-
// Dispose schedulers first
254+
// Dispose schedulers with longer timeout
248255
inboundScheduler.dispose();
249256
outboundScheduler.dispose();
250257

251-
// Wait for schedulers to terminate
252-
if (!inboundScheduler.isDisposed()) {
253-
inboundScheduler.disposeGracefully().block(Duration.ofSeconds(5));
254-
}
255-
if (!outboundScheduler.isDisposed()) {
256-
outboundScheduler.disposeGracefully().block(Duration.ofSeconds(5));
257-
}
258-
259258
logger.info("Graceful shutdown completed");
260259
}
261260
catch (Exception e) {

mcp/src/test/java/org/springframework/ai/mcp/server/AbstractMcpSyncServerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ protected void onClose() {
6666

6767
@BeforeEach
6868
void setUp() {
69+
// onStart();
6970
}
7071

7172
@AfterEach

mcp/src/test/java/org/springframework/ai/mcp/server/SseMcpAsyncServerTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.ai.mcp.server;
1818

1919
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import org.junit.jupiter.api.Timeout;
21+
2022
import org.springframework.http.server.reactive.HttpHandler;
2123
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
2224
import org.springframework.web.reactive.function.server.RouterFunctions;
@@ -31,6 +33,7 @@
3133
*
3234
* @author Christian Tzolov
3335
*/
36+
@Timeout(15) // Giving extra time beyond the client timeout
3437
class SseMcpAsyncServerTests extends AbstractMcpAsyncServerTests {
3538

3639
private static final int PORT = 8181;

mcp/src/test/java/org/springframework/ai/mcp/server/SseMcpSyncServerTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.ai.mcp.server;
1818

1919
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import org.junit.jupiter.api.Timeout;
21+
2022
import org.springframework.http.server.reactive.HttpHandler;
2123
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
2224
import org.springframework.web.reactive.function.server.RouterFunctions;
@@ -31,6 +33,7 @@
3133
*
3234
* @author Christian Tzolov
3335
*/
36+
@Timeout(15) // Giving extra time beyond the client timeout
3437
class SseMcpSyncServerTests extends AbstractMcpSyncServerTests {
3538

3639
private static final int PORT = 8182;

mcp/src/test/java/org/springframework/ai/mcp/server/StdioMcpAsyncServerTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.ai.mcp.server;
1818

19+
import org.junit.jupiter.api.Timeout;
20+
1921
import org.springframework.ai.mcp.server.transport.StdioServerTransport;
2022
import org.springframework.ai.mcp.spec.McpTransport;
2123

@@ -24,6 +26,7 @@
2426
*
2527
* @author Christian Tzolov
2628
*/
29+
@Timeout(15) // Giving extra time beyond the client timeout
2730
class StdioMcpAsyncServerTests extends AbstractMcpAsyncServerTests {
2831

2932
@Override

mcp/src/test/java/org/springframework/ai/mcp/server/StdioMcpSyncServerTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.ai.mcp.server;
1818

19+
import org.junit.jupiter.api.Timeout;
20+
1921
import org.springframework.ai.mcp.server.transport.StdioServerTransport;
2022
import org.springframework.ai.mcp.spec.McpTransport;
2123

@@ -24,6 +26,7 @@
2426
*
2527
* @author Christian Tzolov
2628
*/
29+
@Timeout(15) // Giving extra time beyond the client timeout
2730
class StdioMcpSyncServerTests extends AbstractMcpSyncServerTests {
2831

2932
@Override

mcp/src/test/java/org/springframework/ai/mcp/server/transport/StdioServerTransportTests.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.ai.mcp.server.transport;
1818

19-
import java.io.ByteArrayInputStream;
2019
import java.io.ByteArrayOutputStream;
2120
import java.io.InputStream;
2221
import java.io.PrintStream;
@@ -51,8 +50,6 @@ class StdioServerTransportTests {
5150

5251
private ByteArrayOutputStream testOut;
5352

54-
private ByteArrayInputStream testIn;
55-
5653
private ByteArrayOutputStream testErr;
5754

5855
private PrintStream testOutPrintStream;
@@ -89,7 +86,6 @@ void tearDown() {
8986
void shouldHandleIncomingMessages() throws Exception {
9087
// Prepare test input
9188
String jsonMessage = "{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"params\":{},\"id\":1}";
92-
testIn = new ByteArrayInputStream((jsonMessage + "\n").getBytes(StandardCharsets.UTF_8));
9389

9490
// Create transport with test streams
9591
transport = new StdioServerTransport(objectMapper);

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@
174174

175175
<!-- Output test execution times in the logs -->
176176
<redirectTestOutputToFile>false</redirectTestOutputToFile>
177+
177178
</configuration>
178179
</plugin>
179180
<plugin>

0 commit comments

Comments
 (0)