Skip to content

Commit fcf0ff0

Browse files
committed
refactor: simplify StdioServerTransport to only use System streams
Removes custom stream injection capability from StdioServerTransport and simplifies it to only use System.in/out. Updates related tests to use System streams instead of piped streams.
1 parent dfa7435 commit fcf0ff0

File tree

4 files changed

+34
-115
lines changed

4 files changed

+34
-115
lines changed

mcp/src/main/java/org/springframework/ai/mcp/server/transport/StdioServerTransport.java

Lines changed: 26 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public class StdioServerTransport implements McpTransport {
7777
* Creates a new StdioServerTransport with a default ObjectMapper and System streams.
7878
*/
7979
public StdioServerTransport() {
80-
this(new ObjectMapper(), System.in, System.out);
80+
this(new ObjectMapper());
8181
}
8282

8383
/**
@@ -86,26 +86,15 @@ public StdioServerTransport() {
8686
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
8787
*/
8888
public StdioServerTransport(ObjectMapper objectMapper) {
89-
this(objectMapper, System.in, System.out);
90-
}
9189

92-
/**
93-
* Creates a new StdioServerTransport with the specified ObjectMapper and streams.
94-
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
95-
* @param inputStream The input stream to read from
96-
* @param outputStream The output stream to write to
97-
*/
98-
public StdioServerTransport(ObjectMapper objectMapper, InputStream inputStream, OutputStream outputStream) {
9990
Assert.notNull(objectMapper, "The ObjectMapper can not be null");
100-
Assert.notNull(inputStream, "The InputStream can not be null");
101-
Assert.notNull(outputStream, "The OutputStream can not be null");
10291

10392
this.inboundSink = Sinks.many().unicast().onBackpressureBuffer();
10493
this.outboundSink = Sinks.many().unicast().onBackpressureBuffer();
10594

10695
this.objectMapper = objectMapper;
107-
this.inputStream = inputStream;
108-
this.outputStream = outputStream;
96+
this.inputStream = System.in;
97+
this.outputStream = System.out;
10998

11099
// Use bounded schedulers for better resource management
111100
this.inboundScheduler = Schedulers.newBoundedElastic(1, 1, "inbound");
@@ -248,50 +237,31 @@ public Mono<Void> closeGracefully() {
248237
return Mono.fromRunnable(() -> {
249238
isClosing = true;
250239
logger.debug("Initiating graceful shutdown");
251-
})
252-
// .then(Mono.defer(() -> {
253-
// // First complete the sinks to stop processing
254-
// inboundSink.tryEmitComplete();
255-
// outboundSink.tryEmitComplete();
256-
// return Mono.delay(Duration.ofMillis(100));
257-
// }))
258-
.then(Mono.fromRunnable(() -> {
259-
try {
260-
// Dispose schedulers first
261-
inboundScheduler.dispose();
262-
outboundScheduler.dispose();
263-
264-
// Wait for schedulers to terminate
265-
if (!inboundScheduler.isDisposed()) {
266-
inboundScheduler.disposeGracefully().block(Duration.ofSeconds(5));
267-
}
268-
if (!outboundScheduler.isDisposed()) {
269-
outboundScheduler.disposeGracefully().block(Duration.ofSeconds(5));
270-
}
271-
272-
// Only after schedulers are disposed, close the streams
273-
try {
274-
if (inputStream != System.in) {
275-
inputStream.close();
276-
}
277-
if (outputStream != System.out) {
278-
outputStream.flush();
279-
outputStream.close();
280-
}
281-
}
282-
catch (IOException e) {
283-
// Log but don't throw since we're shutting down
284-
logger.debug("Error closing streams during shutdown", e);
285-
}
286-
287-
logger.info("Graceful shutdown completed");
240+
}).then(Mono.defer(() -> {
241+
// First complete the sinks to stop processing
242+
inboundSink.tryEmitComplete();
243+
outboundSink.tryEmitComplete();
244+
return Mono.delay(Duration.ofMillis(100));
245+
})).then(Mono.fromRunnable(() -> {
246+
try {
247+
// Dispose schedulers first
248+
inboundScheduler.dispose();
249+
outboundScheduler.dispose();
250+
251+
// Wait for schedulers to terminate
252+
if (!inboundScheduler.isDisposed()) {
253+
inboundScheduler.disposeGracefully().block(Duration.ofSeconds(5));
288254
}
289-
catch (Exception e) {
290-
logger.error("Error during graceful shutdown", e);
255+
if (!outboundScheduler.isDisposed()) {
256+
outboundScheduler.disposeGracefully().block(Duration.ofSeconds(5));
291257
}
292-
}))
293-
.then()
294-
.subscribeOn(Schedulers.boundedElastic());
258+
259+
logger.info("Graceful shutdown completed");
260+
}
261+
catch (Exception e) {
262+
logger.error("Error during graceful shutdown", e);
263+
}
264+
})).then().subscribeOn(Schedulers.boundedElastic());
295265
}
296266

297267
@Override

mcp/src/test/java/org/springframework/ai/mcp/server/StdioMcpAsyncServerTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package org.springframework.ai.mcp.server;
1818

19-
import com.fasterxml.jackson.databind.ObjectMapper;
20-
2119
import org.springframework.ai.mcp.server.transport.StdioServerTransport;
2220
import org.springframework.ai.mcp.spec.McpTransport;
2321

@@ -30,7 +28,7 @@ class StdioMcpAsyncServerTests extends AbstractMcpAsyncServerTests {
3028

3129
@Override
3230
protected McpTransport createMcpTransport() {
33-
return new StdioServerTransport(new ObjectMapper());
31+
return new StdioServerTransport();
3432
}
3533

3634
}

mcp/src/test/java/org/springframework/ai/mcp/server/StdioMcpSyncServerTests.java

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,6 @@
1616

1717
package org.springframework.ai.mcp.server;
1818

19-
import java.io.PipedInputStream;
20-
import java.io.PipedOutputStream;
21-
22-
import com.fasterxml.jackson.databind.ObjectMapper;
23-
2419
import org.springframework.ai.mcp.server.transport.StdioServerTransport;
2520
import org.springframework.ai.mcp.spec.McpTransport;
2621

@@ -31,53 +26,9 @@
3126
*/
3227
class StdioMcpSyncServerTests extends AbstractMcpSyncServerTests {
3328

34-
private PipedInputStream testInput;
35-
36-
private PipedOutputStream testOutput;
37-
38-
private PipedOutputStream writeToInput;
39-
40-
private PipedInputStream readFromOutput;
41-
4229
@Override
4330
protected McpTransport createMcpTransport() {
44-
try {
45-
testInput = new PipedInputStream();
46-
writeToInput = new PipedOutputStream(testInput);
47-
testOutput = new PipedOutputStream();
48-
readFromOutput = new PipedInputStream(testOutput);
49-
return new StdioServerTransport(new ObjectMapper(), testInput, testOutput);
50-
}
51-
catch (Exception e) {
52-
throw new RuntimeException("Failed to initialize test streams", e);
53-
}
54-
}
55-
56-
@Override
57-
protected void onStart() {
58-
// No special setup needed for stdio transport
59-
}
60-
61-
@Override
62-
protected void onClose() {
63-
try {
64-
if (testInput != null) {
65-
testInput.close();
66-
}
67-
if (testOutput != null) {
68-
testOutput.close();
69-
}
70-
if (writeToInput != null) {
71-
writeToInput.close();
72-
}
73-
if (readFromOutput != null) {
74-
readFromOutput.close();
75-
}
76-
}
77-
catch (Exception e) {
78-
// Log but don't throw since this is cleanup
79-
e.printStackTrace();
80-
}
31+
return new StdioServerTransport();
8132
}
8233

8334
}

mcp/src/test/java/org/springframework/ai/mcp/server/transport/StdioServerTransportTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ void shouldHandleIncomingMessages() throws Exception {
9292
testIn = new ByteArrayInputStream((jsonMessage + "\n").getBytes(StandardCharsets.UTF_8));
9393

9494
// Create transport with test streams
95-
transport = new StdioServerTransport(objectMapper, testIn, testOutPrintStream);
95+
transport = new StdioServerTransport(objectMapper);
9696

9797
// Parse expected message
9898
McpSchema.JSONRPCRequest expected = objectMapper.readValue(jsonMessage, McpSchema.JSONRPCRequest.class);
@@ -109,9 +109,9 @@ void shouldHandleIncomingMessages() throws Exception {
109109
@Disabled
110110
void shouldHandleOutgoingMessages() throws Exception {
111111
// Create transport with test streams
112-
transport = new StdioServerTransport(objectMapper, new BlockingInputStream(), testOutPrintStream);
113-
// transport = new StdioServerTransport(objectMapper, new ByteArrayInputStream(new
114-
// byte[0]), testOutPrintStream);
112+
transport = new StdioServerTransport(objectMapper);
113+
// transport = new StdioServerTransport(objectMapper, new BlockingInputStream(),
114+
// testOutPrintStream);
115115

116116
// Create test messages
117117
JSONRPCRequest initMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "init", "init-id",
@@ -137,7 +137,7 @@ void shouldHandleOutgoingMessages() throws Exception {
137137
@Test
138138
void shouldWaitForProcessorsBeforeSendingMessage() {
139139
// Create transport with test streams
140-
transport = new StdioServerTransport(objectMapper, new ByteArrayInputStream(new byte[0]), testOutPrintStream);
140+
transport = new StdioServerTransport(objectMapper);
141141

142142
// Create test message
143143
JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test", "test-id",
@@ -154,7 +154,7 @@ void shouldWaitForProcessorsBeforeSendingMessage() {
154154
@Test
155155
void shouldCloseGracefully() {
156156
// Create transport with test streams
157-
transport = new StdioServerTransport(objectMapper, new ByteArrayInputStream(new byte[0]), testOutPrintStream);
157+
transport = new StdioServerTransport(objectMapper);
158158

159159
// Create test message
160160
JSONRPCRequest initMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "init", "init-id",

0 commit comments

Comments
 (0)