@@ -167,7 +167,7 @@ func TestBackedPipe_Connect(t *testing.T) {
167
167
bp := backedpipe .NewBackedPipe (ctx , reconnectFn )
168
168
defer bp .Close ()
169
169
170
- err := bp .Connect (ctx )
170
+ err := bp .Connect ()
171
171
require .NoError (t , err )
172
172
require .True (t , bp .Connected ())
173
173
require .Equal (t , 1 , * callCount )
@@ -183,11 +183,11 @@ func TestBackedPipe_ConnectAlreadyConnected(t *testing.T) {
183
183
bp := backedpipe .NewBackedPipe (ctx , reconnectFn )
184
184
defer bp .Close ()
185
185
186
- err := bp .Connect (ctx )
186
+ err := bp .Connect ()
187
187
require .NoError (t , err )
188
188
189
189
// Second connect should fail
190
- err = bp .Connect (ctx )
190
+ err = bp .Connect ()
191
191
require .Error (t , err )
192
192
require .ErrorIs (t , err , backedpipe .ErrPipeAlreadyConnected )
193
193
}
@@ -204,7 +204,7 @@ func TestBackedPipe_ConnectAfterClose(t *testing.T) {
204
204
err := bp .Close ()
205
205
require .NoError (t , err )
206
206
207
- err = bp .Connect (ctx )
207
+ err = bp .Connect ()
208
208
require .Error (t , err )
209
209
require .ErrorIs (t , err , backedpipe .ErrPipeClosed )
210
210
}
@@ -219,7 +219,7 @@ func TestBackedPipe_BasicReadWrite(t *testing.T) {
219
219
bp := backedpipe .NewBackedPipe (ctx , reconnectFn )
220
220
defer bp .Close ()
221
221
222
- err := bp .Connect (ctx )
222
+ err := bp .Connect ()
223
223
require .NoError (t , err )
224
224
225
225
// Write data
@@ -264,7 +264,7 @@ func TestBackedPipe_WriteBeforeConnect(t *testing.T) {
264
264
}
265
265
266
266
// Connect should unblock the write
267
- err := bp .Connect (ctx )
267
+ err := bp .Connect ()
268
268
require .NoError (t , err )
269
269
270
270
// Write should now complete
@@ -332,7 +332,7 @@ func TestBackedPipe_Reconnection(t *testing.T) {
332
332
defer bp .Close ()
333
333
334
334
// Initial connect
335
- err := bp .Connect (ctx )
335
+ err := bp .Connect ()
336
336
require .NoError (t , err )
337
337
338
338
// Write some data before failure
@@ -373,7 +373,7 @@ func TestBackedPipe_Close(t *testing.T) {
373
373
374
374
bp := backedpipe .NewBackedPipe (ctx , reconnectFn )
375
375
376
- err := bp .Connect (ctx )
376
+ err := bp .Connect ()
377
377
require .NoError (t , err )
378
378
379
379
err = bp .Close ()
@@ -405,88 +405,6 @@ func TestBackedPipe_CloseIdempotent(t *testing.T) {
405
405
require .NoError (t , err )
406
406
}
407
407
408
- func TestBackedPipe_ConcurrentReadWrite (t * testing.T ) {
409
- t .Parallel ()
410
- ctx := testutil .Context (t , testutil .WaitShort )
411
-
412
- conn := newMockConnection ()
413
- reconnectFn , _ , _ := mockReconnectFunc (conn )
414
-
415
- bp := backedpipe .NewBackedPipe (ctx , reconnectFn )
416
- defer bp .Close ()
417
-
418
- err := bp .Connect (ctx )
419
- require .NoError (t , err )
420
-
421
- var wg sync.WaitGroup
422
- numWriters := 3
423
- writesPerWriter := 10
424
-
425
- // Fill read buffer with test data first
426
- testData := make ([]byte , 1000 )
427
- for i := range testData {
428
- testData [i ] = 'A'
429
- }
430
- conn .WriteString (string (testData ))
431
-
432
- // Channel to collect all written data
433
- writtenData := make (chan byte , numWriters * writesPerWriter )
434
-
435
- // Start a few readers
436
- for i := 0 ; i < 2 ; i ++ {
437
- wg .Add (1 )
438
- go func () {
439
- defer wg .Done ()
440
- buf := make ([]byte , 10 )
441
- for j := 0 ; j < 10 ; j ++ {
442
- bp .Read (buf )
443
- time .Sleep (time .Millisecond ) // Small delay to avoid busy waiting
444
- }
445
- }()
446
- }
447
-
448
- // Start writers
449
- for i := 0 ; i < numWriters ; i ++ {
450
- wg .Add (1 )
451
- go func (id int ) {
452
- defer wg .Done ()
453
- for j := 0 ; j < writesPerWriter ; j ++ {
454
- data := []byte {byte (id + '0' )}
455
- bp .Write (data )
456
- writtenData <- byte (id + '0' )
457
- time .Sleep (time .Millisecond ) // Small delay
458
- }
459
- }(i )
460
- }
461
-
462
- // Wait with timeout
463
- done := make (chan struct {})
464
- go func () {
465
- defer close (done )
466
- wg .Wait ()
467
- }()
468
-
469
- testutil .TryReceive (ctx , t , done )
470
-
471
- // Close the channel and collect all written data
472
- close (writtenData )
473
- var allWritten []byte
474
- for b := range writtenData {
475
- allWritten = append (allWritten , b )
476
- }
477
-
478
- // Verify that all written data was received by the connection
479
- // Note: Since this test uses the old mock that returns readerSeqNum = 0,
480
- // all data will be replayed, so we expect to receive all written data
481
- receivedData := conn .ReadString ()
482
- require .GreaterOrEqual (t , len (receivedData ), len (allWritten ), "Connection should have received at least all written data" )
483
-
484
- // Check that all written bytes appear in the received data
485
- for _ , writtenByte := range allWritten {
486
- require .Contains (t , receivedData , string (writtenByte ), "Written byte %c should be present in received data" , writtenByte )
487
- }
488
- }
489
-
490
408
func TestBackedPipe_ReconnectFunctionFailure (t * testing.T ) {
491
409
t .Parallel ()
492
410
@@ -499,7 +417,7 @@ func TestBackedPipe_ReconnectFunctionFailure(t *testing.T) {
499
417
bp := backedpipe .NewBackedPipe (ctx , failingReconnectFn )
500
418
defer bp .Close ()
501
419
502
- err := bp .Connect (ctx )
420
+ err := bp .Connect ()
503
421
require .Error (t , err )
504
422
require .ErrorIs (t , err , backedpipe .ErrReconnectFailed )
505
423
require .False (t , bp .Connected ())
@@ -517,7 +435,7 @@ func TestBackedPipe_ForceReconnect(t *testing.T) {
517
435
defer bp .Close ()
518
436
519
437
// Initial connect
520
- err := bp .Connect (ctx )
438
+ err := bp .Connect ()
521
439
require .NoError (t , err )
522
440
require .True (t , bp .Connected ())
523
441
require .Equal (t , 1 , * callCount )
@@ -644,7 +562,7 @@ func TestBackedPipe_EOFTriggersReconnection(t *testing.T) {
644
562
defer bp .Close ()
645
563
646
564
// Initial connect
647
- err := bp .Connect (ctx )
565
+ err := bp .Connect ()
648
566
require .NoError (t , err )
649
567
require .Equal (t , 1 , callCount )
650
568
@@ -685,7 +603,7 @@ func BenchmarkBackedPipe_Write(b *testing.B) {
685
603
reconnectFn , _ , _ := mockReconnectFunc (conn )
686
604
687
605
bp := backedpipe .NewBackedPipe (ctx , reconnectFn )
688
- bp .Connect (ctx )
606
+ bp .Connect ()
689
607
b .Cleanup (func () {
690
608
_ = bp .Close ()
691
609
})
@@ -704,7 +622,7 @@ func BenchmarkBackedPipe_Read(b *testing.B) {
704
622
reconnectFn , _ , _ := mockReconnectFunc (conn )
705
623
706
624
bp := backedpipe .NewBackedPipe (ctx , reconnectFn )
707
- bp .Connect (ctx )
625
+ bp .Connect ()
708
626
b .Cleanup (func () {
709
627
_ = bp .Close ()
710
628
})
0 commit comments