Skip to content

Commit 9b6a0b5

Browse files
committed
refactor(mcp): improve StdioServerTransport shutdown and error handling
- Add specific error categorization for pipe closed errors - Enhance shutdown sequence with proper stream and scheduler cleanup - Add debug logging for shutdown-related stream errors - Improve isClosing state management
1 parent 2de1d63 commit 9b6a0b5

File tree

1 file changed

+48
-25
lines changed

1 file changed

+48
-25
lines changed

mcp/src/main/java/org/springframework/ai/mcp/server/transport/StdioServerTransport.java

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,18 @@ private void startInboundProcessing() {
171171
}
172172
}
173173
catch (IOException e) {
174-
if (!isClosing) {
174+
// Check isClosing before the error occurs to properly categorize it
175+
boolean wasClosing = isClosing;
176+
isClosing = true;
177+
if (!wasClosing && e.getMessage().equals("Pipe closed")) {
178+
logger.debug("Stream closed during shutdown", e);
179+
}
180+
else if (!wasClosing) {
175181
logger.error("Error reading from stdin", e);
176182
}
183+
else {
184+
logger.debug("Stream error during shutdown", e);
185+
}
177186
}
178187
finally {
179188
isClosing = true;
@@ -209,6 +218,9 @@ private void startOutboundProcessing() {
209218
logger.error("Error writing message", e);
210219
sink.error(new RuntimeException(e));
211220
}
221+
else {
222+
logger.debug("Stream closed during shutdown", e);
223+
}
212224
}
213225
}
214226
else if (isClosing) {
@@ -236,35 +248,46 @@ public Mono<Void> closeGracefully() {
236248
return Mono.fromRunnable(() -> {
237249
isClosing = true;
238250
logger.debug("Initiating graceful shutdown");
239-
})
240-
// .then(Mono.defer(() -> {
241-
// inboundSink.tryEmitComplete();
242-
// outboundSink.tryEmitComplete();
243-
// return Mono.delay(Duration.ofMillis(100));
244-
// }))
245-
246-
.then(Mono.fromRunnable(() -> {
247-
try {
248-
249-
inboundScheduler.dispose();
250-
outboundScheduler.dispose();
251+
}).then(Mono.defer(() -> {
252+
// First complete the sinks to stop processing
253+
inboundSink.tryEmitComplete();
254+
outboundSink.tryEmitComplete();
255+
return Mono.delay(Duration.ofMillis(100));
256+
})).then(Mono.fromRunnable(() -> {
257+
try {
258+
// Dispose schedulers first
259+
inboundScheduler.dispose();
260+
outboundScheduler.dispose();
261+
262+
// Wait for schedulers to terminate
263+
if (!inboundScheduler.isDisposed()) {
264+
inboundScheduler.disposeGracefully().block(Duration.ofSeconds(5));
265+
}
266+
if (!outboundScheduler.isDisposed()) {
267+
outboundScheduler.disposeGracefully().block(Duration.ofSeconds(5));
268+
}
251269

252-
// Wait for schedulers to terminate
253-
if (!inboundScheduler.isDisposed()) {
254-
inboundScheduler.disposeGracefully().block(Duration.ofSeconds(5));
270+
// Only after schedulers are disposed, close the streams
271+
try {
272+
if (inputStream != System.in) {
273+
inputStream.close();
255274
}
256-
if (!outboundScheduler.isDisposed()) {
257-
outboundScheduler.disposeGracefully().block(Duration.ofSeconds(5));
275+
if (outputStream != System.out) {
276+
outputStream.flush();
277+
outputStream.close();
258278
}
259-
260-
logger.info("Graceful shutdown completed");
261279
}
262-
catch (Exception e) {
263-
logger.error("Error during graceful shutdown", e);
280+
catch (IOException e) {
281+
// Log but don't throw since we're shutting down
282+
logger.debug("Error closing streams during shutdown", e);
264283
}
265-
}))
266-
.then()
267-
.subscribeOn(Schedulers.boundedElastic());
284+
285+
logger.info("Graceful shutdown completed");
286+
}
287+
catch (Exception e) {
288+
logger.error("Error during graceful shutdown", e);
289+
}
290+
})).then().subscribeOn(Schedulers.boundedElastic());
268291
}
269292

270293
@Override

0 commit comments

Comments
 (0)