@@ -240,21 +240,35 @@ func TestBackedPipe_BasicReadWrite(t *testing.T) {
240
240
241
241
func TestBackedPipe_WriteBeforeConnect (t * testing.T ) {
242
242
t .Parallel ()
243
+ ctx := testutil .Context (t , testutil .WaitShort )
243
244
244
- ctx := context .Background ()
245
245
conn := newMockConnection ()
246
246
reconnectFn , _ , _ := mockReconnectFunc (conn )
247
247
248
248
bp := backedpipe .NewBackedPipe (ctx , reconnectFn )
249
249
defer bp .Close ()
250
250
251
- // Write before connecting should succeed (buffered)
252
- n , err := bp .Write ([]byte ("hello" ))
251
+ // Write before connecting should block
252
+ writeComplete := make (chan error , 1 )
253
+ go func () {
254
+ _ , err := bp .Write ([]byte ("hello" ))
255
+ writeComplete <- err
256
+ }()
257
+
258
+ // Verify write is blocked
259
+ select {
260
+ case <- writeComplete :
261
+ t .Fatal ("Write should have blocked when disconnected" )
262
+ case <- time .After (100 * time .Millisecond ):
263
+ // Expected - write is blocked
264
+ }
265
+
266
+ // Connect should unblock the write
267
+ err := bp .Connect (ctx )
253
268
require .NoError (t , err )
254
- require .Equal (t , 5 , n )
255
269
256
- // Connect should replay the buffered data
257
- err = bp . Connect (ctx )
270
+ // Write should now complete
271
+ err = testutil . RequireReceive (ctx , t , writeComplete )
258
272
require .NoError (t , err )
259
273
260
274
// Check that data was replayed to connection
@@ -265,6 +279,7 @@ func TestBackedPipe_ReadBlocksWhenDisconnected(t *testing.T) {
265
279
t .Parallel ()
266
280
267
281
ctx := context .Background ()
282
+ testCtx := testutil .Context (t , testutil .WaitShort )
268
283
reconnectFn , _ , _ := mockReconnectFunc (newMockConnection ())
269
284
270
285
bp := backedpipe .NewBackedPipe (ctx , reconnectFn )
@@ -283,7 +298,7 @@ func TestBackedPipe_ReadBlocksWhenDisconnected(t *testing.T) {
283
298
}()
284
299
285
300
// Wait for the goroutine to start
286
- <- readStarted
301
+ testutil . TryReceive ( testCtx , t , readStarted )
287
302
288
303
// Give a brief moment for the read to actually block
289
304
time .Sleep (time .Millisecond )
@@ -299,18 +314,15 @@ func TestBackedPipe_ReadBlocksWhenDisconnected(t *testing.T) {
299
314
// Close should unblock the read
300
315
bp .Close ()
301
316
302
- select {
303
- case <- readDone :
304
- require .Equal (t , io .ErrClosedPipe , readErr )
305
- case <- time .After (time .Second ):
306
- t .Fatal ("Read did not unblock after close" )
307
- }
317
+ testutil .TryReceive (testCtx , t , readDone )
318
+ require .Equal (t , io .EOF , readErr )
308
319
}
309
320
310
321
func TestBackedPipe_Reconnection (t * testing.T ) {
311
322
t .Parallel ()
312
323
313
324
ctx := context .Background ()
325
+ testCtx := testutil .Context (t , testutil .WaitShort )
314
326
conn1 := newMockConnection ()
315
327
conn2 := newMockConnection ()
316
328
conn2 .seqNum = 17 // Remote has received 17 bytes, so replay from sequence 17
@@ -333,10 +345,12 @@ func TestBackedPipe_Reconnection(t *testing.T) {
333
345
// Trigger a write to cause the pipe to notice the failure
334
346
_ , _ = bp .Write ([]byte ("trigger failure " ))
335
347
336
- <- signalChan
348
+ testutil . RequireReceive ( testCtx , t , signalChan )
337
349
338
- err = bp .WaitForConnection (ctx )
339
- require .NoError (t , err )
350
+ // Wait for reconnection to complete
351
+ require .Eventually (t , func () bool {
352
+ return bp .Connected ()
353
+ }, testutil .WaitShort , testutil .IntervalFast , "pipe should reconnect" )
340
354
341
355
replayedData := conn2 .ReadString ()
342
356
require .Equal (t , "***trigger failure " , replayedData , "Should replay exactly the data written after sequence 17" )
@@ -391,45 +405,10 @@ func TestBackedPipe_CloseIdempotent(t *testing.T) {
391
405
require .NoError (t , err )
392
406
}
393
407
394
- func TestBackedPipe_WaitForConnection (t * testing.T ) {
395
- t .Parallel ()
396
-
397
- ctx := context .Background ()
398
- conn := newMockConnection ()
399
- reconnectFn , _ , _ := mockReconnectFunc (conn )
400
-
401
- bp := backedpipe .NewBackedPipe (ctx , reconnectFn )
402
- defer bp .Close ()
403
-
404
- // Should timeout when not connected
405
- // Use a shorter timeout for this test to speed up test runs
406
- timeoutCtx , cancel := context .WithTimeout (ctx , testutil .WaitSuperShort )
407
- defer cancel ()
408
-
409
- err := bp .WaitForConnection (timeoutCtx )
410
- require .Equal (t , context .DeadlineExceeded , err )
411
-
412
- // Connect in background after a brief delay
413
- connectionStarted := make (chan struct {})
414
- go func () {
415
- close (connectionStarted )
416
- // Small delay to ensure WaitForConnection is called first
417
- time .Sleep (time .Millisecond )
418
- bp .Connect (context .Background ())
419
- }()
420
-
421
- // Wait for connection goroutine to start
422
- <- connectionStarted
423
-
424
- // Should succeed once connected
425
- err = bp .WaitForConnection (context .Background ())
426
- require .NoError (t , err )
427
- }
428
-
429
408
func TestBackedPipe_ConcurrentReadWrite (t * testing.T ) {
430
409
t .Parallel ()
410
+ ctx := testutil .Context (t , testutil .WaitShort )
431
411
432
- ctx := context .Background ()
433
412
conn := newMockConnection ()
434
413
reconnectFn , _ , _ := mockReconnectFunc (conn )
435
414
@@ -487,12 +466,7 @@ func TestBackedPipe_ConcurrentReadWrite(t *testing.T) {
487
466
wg .Wait ()
488
467
}()
489
468
490
- select {
491
- case <- done :
492
- // Success
493
- case <- time .After (5 * time .Second ):
494
- t .Fatal ("Test timed out" )
495
- }
469
+ testutil .TryReceive (ctx , t , done )
496
470
497
471
// Close the channel and collect all written data
498
472
close (writtenData )
0 commit comments