@@ -171,9 +171,18 @@ private void startInboundProcessing() {
171
171
}
172
172
}
173
173
catch (IOException e ) {
174
- if (!isClosing ) {
174
+ // Check isClosing before the error occurs to properly categorize it
175
+ boolean wasClosing = isClosing ;
176
+ isClosing = true ;
177
+ if (!wasClosing && e .getMessage ().equals ("Pipe closed" )) {
178
+ logger .debug ("Stream closed during shutdown" , e );
179
+ }
180
+ else if (!wasClosing ) {
175
181
logger .error ("Error reading from stdin" , e );
176
182
}
183
+ else {
184
+ logger .debug ("Stream error during shutdown" , e );
185
+ }
177
186
}
178
187
finally {
179
188
isClosing = true ;
@@ -209,6 +218,9 @@ private void startOutboundProcessing() {
209
218
logger .error ("Error writing message" , e );
210
219
sink .error (new RuntimeException (e ));
211
220
}
221
+ else {
222
+ logger .debug ("Stream closed during shutdown" , e );
223
+ }
212
224
}
213
225
}
214
226
else if (isClosing ) {
@@ -236,35 +248,46 @@ public Mono<Void> closeGracefully() {
236
248
return Mono .fromRunnable (() -> {
237
249
isClosing = true ;
238
250
logger .debug ("Initiating graceful shutdown" );
239
- })
240
- // .then(Mono.defer(() -> {
241
- // inboundSink.tryEmitComplete();
242
- // outboundSink.tryEmitComplete();
243
- // return Mono.delay(Duration.ofMillis(100));
244
- // }))
245
-
246
- .then (Mono .fromRunnable (() -> {
247
- try {
248
-
249
- inboundScheduler .dispose ();
250
- outboundScheduler .dispose ();
251
+ }).then (Mono .defer (() -> {
252
+ // First complete the sinks to stop processing
253
+ inboundSink .tryEmitComplete ();
254
+ outboundSink .tryEmitComplete ();
255
+ return Mono .delay (Duration .ofMillis (100 ));
256
+ })).then (Mono .fromRunnable (() -> {
257
+ try {
258
+ // Dispose schedulers first
259
+ inboundScheduler .dispose ();
260
+ outboundScheduler .dispose ();
261
+
262
+ // Wait for schedulers to terminate
263
+ if (!inboundScheduler .isDisposed ()) {
264
+ inboundScheduler .disposeGracefully ().block (Duration .ofSeconds (5 ));
265
+ }
266
+ if (!outboundScheduler .isDisposed ()) {
267
+ outboundScheduler .disposeGracefully ().block (Duration .ofSeconds (5 ));
268
+ }
251
269
252
- // Wait for schedulers to terminate
253
- if (!inboundScheduler .isDisposed ()) {
254
- inboundScheduler .disposeGracefully ().block (Duration .ofSeconds (5 ));
270
+ // Only after schedulers are disposed, close the streams
271
+ try {
272
+ if (inputStream != System .in ) {
273
+ inputStream .close ();
255
274
}
256
- if (!outboundScheduler .isDisposed ()) {
257
- outboundScheduler .disposeGracefully ().block (Duration .ofSeconds (5 ));
275
+ if (outputStream != System .out ) {
276
+ outputStream .flush ();
277
+ outputStream .close ();
258
278
}
259
-
260
- logger .info ("Graceful shutdown completed" );
261
279
}
262
- catch (Exception e ) {
263
- logger .error ("Error during graceful shutdown" , e );
280
+ catch (IOException e ) {
281
+ // Log but don't throw since we're shutting down
282
+ logger .debug ("Error closing streams during shutdown" , e );
264
283
}
265
- }))
266
- .then ()
267
- .subscribeOn (Schedulers .boundedElastic ());
284
+
285
+ logger .info ("Graceful shutdown completed" );
286
+ }
287
+ catch (Exception e ) {
288
+ logger .error ("Error during graceful shutdown" , e );
289
+ }
290
+ })).then ().subscribeOn (Schedulers .boundedElastic ());
268
291
}
269
292
270
293
@ Override
0 commit comments