@@ -143,8 +143,9 @@ func TestBackedWriter_WriteToUnderlyingWhenConnected(t *testing.T) {
143
143
require .Equal (t , []byte ("hello" ), writer .buffer .Bytes ())
144
144
}
145
145
146
- func TestBackedWriter_DisconnectOnWriteFailure (t * testing.T ) {
146
+ func TestBackedWriter_BlockOnWriteFailure (t * testing.T ) {
147
147
t .Parallel ()
148
+ ctx := testutil .Context (t , testutil .WaitShort )
148
149
149
150
errorChan := make (chan error , 1 )
150
151
bw := backedpipe .NewBackedWriter (backedpipe .DefaultBufferSize , errorChan )
@@ -157,11 +158,26 @@ func TestBackedWriter_DisconnectOnWriteFailure(t *testing.T) {
157
158
// Cause write to fail
158
159
writer .setError (xerrors .New ("write failed" ))
159
160
160
- // Write should fail and disconnect
161
- n , err := bw .Write ([]byte ("hello" ))
162
- require .Error (t , err ) // Write should fail
163
- require .Equal (t , 0 , n )
164
- require .False (t , bw .Connected ()) // Should be disconnected
161
+ // Write should block when underlying writer fails
162
+ writeComplete := make (chan struct {})
163
+ var writeErr error
164
+ var n int
165
+
166
+ go func () {
167
+ defer close (writeComplete )
168
+ n , writeErr = bw .Write ([]byte ("hello" ))
169
+ }()
170
+
171
+ // Verify write is blocked
172
+ select {
173
+ case <- writeComplete :
174
+ t .Fatal ("Write should have blocked when underlying writer fails" )
175
+ case <- time .After (50 * time .Millisecond ):
176
+ // Expected - write is blocked
177
+ }
178
+
179
+ // Should be disconnected
180
+ require .False (t , bw .Connected ())
165
181
166
182
// Error should be sent to error channel
167
183
select {
@@ -170,6 +186,19 @@ func TestBackedWriter_DisconnectOnWriteFailure(t *testing.T) {
170
186
default :
171
187
t .Fatal ("Expected error to be sent to error channel" )
172
188
}
189
+
190
+ // Reconnect with working writer and verify write completes
191
+ writer2 := newMockWriter ()
192
+ err = bw .Reconnect (0 , writer2 ) // Replay from beginning
193
+ require .NoError (t , err )
194
+
195
+ // Write should now complete
196
+ testutil .TryReceive (ctx , t , writeComplete )
197
+
198
+ require .NoError (t , writeErr )
199
+ require .Equal (t , 5 , n )
200
+ require .Equal (t , uint64 (5 ), bw .SequenceNum ())
201
+ require .Equal (t , []byte ("hello" ), writer2 .buffer .Bytes ())
173
202
}
174
203
175
204
func TestBackedWriter_ReplayOnReconnect (t * testing.T ) {
@@ -193,15 +222,43 @@ func TestBackedWriter_ReplayOnReconnect(t *testing.T) {
193
222
194
223
// Disconnect by causing a write failure
195
224
writer1 .setError (xerrors .New ("connection lost" ))
196
- _ , err = bw .Write ([]byte ("test" ))
197
- require .Error (t , err )
225
+
226
+ // Write should block when underlying writer fails
227
+ writeComplete := make (chan struct {})
228
+ var writeErr error
229
+ var n int
230
+
231
+ go func () {
232
+ defer close (writeComplete )
233
+ n , writeErr = bw .Write ([]byte ("test" ))
234
+ }()
235
+
236
+ // Verify write is blocked
237
+ select {
238
+ case <- writeComplete :
239
+ t .Fatal ("Write should have blocked when underlying writer fails" )
240
+ case <- time .After (50 * time .Millisecond ):
241
+ // Expected - write is blocked
242
+ }
243
+
198
244
require .False (t , bw .Connected ())
199
245
200
246
// Reconnect with new writer and request replay from beginning
201
247
writer2 := newMockWriter ()
202
248
err = bw .Reconnect (0 , writer2 )
203
249
require .NoError (t , err )
204
250
251
+ // Write should now complete
252
+ select {
253
+ case <- writeComplete :
254
+ // Expected - write completed
255
+ case <- time .After (100 * time .Millisecond ):
256
+ t .Fatal ("Write should have completed after reconnection" )
257
+ }
258
+
259
+ require .NoError (t , writeErr )
260
+ require .Equal (t , 4 , n )
261
+
205
262
// Should have replayed all data including the failed write that was buffered
206
263
require .Equal (t , []byte ("hello worldtest" ), writer2 .buffer .Bytes ())
207
264
@@ -319,7 +376,7 @@ func TestBackedWriter_Close(t *testing.T) {
319
376
320
377
// Writes after close should fail
321
378
_ , err = bw .Write ([]byte ("test" ))
322
- require .Equal (t , io .ErrClosedPipe , err )
379
+ require .Equal (t , io .EOF , err )
323
380
324
381
// Reconnect after close should fail
325
382
err = bw .Reconnect (0 , newMockWriter ())
@@ -401,8 +458,9 @@ func TestBackedWriter_ReconnectDuringReplay(t *testing.T) {
401
458
require .False (t , bw .Connected ())
402
459
}
403
460
404
- func TestBackedWriter_PartialWriteToUnderlying (t * testing.T ) {
461
+ func TestBackedWriter_BlockOnPartialWrite (t * testing.T ) {
405
462
t .Parallel ()
463
+ ctx := testutil .Context (t , testutil .WaitShort )
406
464
407
465
errorChan := make (chan error , 1 )
408
466
bw := backedpipe .NewBackedWriter (backedpipe .DefaultBufferSize , errorChan )
@@ -419,12 +477,26 @@ func TestBackedWriter_PartialWriteToUnderlying(t *testing.T) {
419
477
420
478
bw .Reconnect (0 , writer )
421
479
422
- // Write should fail due to partial write
423
- n , err := bw .Write ([]byte ("hello" ))
424
- require .Error (t , err )
425
- require .Equal (t , 0 , n )
480
+ // Write should block due to partial write
481
+ writeComplete := make (chan struct {})
482
+ var writeErr error
483
+ var n int
484
+
485
+ go func () {
486
+ defer close (writeComplete )
487
+ n , writeErr = bw .Write ([]byte ("hello" ))
488
+ }()
489
+
490
+ // Verify write is blocked
491
+ select {
492
+ case <- writeComplete :
493
+ t .Fatal ("Write should have blocked when underlying writer does partial write" )
494
+ case <- time .After (50 * time .Millisecond ):
495
+ // Expected - write is blocked
496
+ }
497
+
498
+ // Should be disconnected
426
499
require .False (t , bw .Connected ())
427
- require .Contains (t , err .Error (), "short write" )
428
500
429
501
// Error should be sent to error channel
430
502
select {
@@ -433,6 +505,19 @@ func TestBackedWriter_PartialWriteToUnderlying(t *testing.T) {
433
505
default :
434
506
t .Fatal ("Expected error to be sent to error channel" )
435
507
}
508
+
509
+ // Reconnect with working writer and verify write completes
510
+ writer2 := newMockWriter ()
511
+ err := bw .Reconnect (0 , writer2 ) // Replay from beginning
512
+ require .NoError (t , err )
513
+
514
+ // Write should now complete
515
+ testutil .TryReceive (ctx , t , writeComplete )
516
+
517
+ require .NoError (t , writeErr )
518
+ require .Equal (t , 5 , n )
519
+ require .Equal (t , uint64 (5 ), bw .SequenceNum ())
520
+ require .Equal (t , []byte ("hello" ), writer2 .buffer .Bytes ())
436
521
}
437
522
438
523
func TestBackedWriter_WriteUnblocksOnReconnect (t * testing.T ) {
@@ -498,7 +583,7 @@ func TestBackedWriter_CloseUnblocksWaitingWrites(t *testing.T) {
498
583
499
584
// Write should now complete with error
500
585
err = testutil .RequireReceive (ctx , t , writeComplete )
501
- require .Equal (t , io .ErrClosedPipe , err )
586
+ require .Equal (t , io .EOF , err )
502
587
}
503
588
504
589
func TestBackedWriter_WriteBlocksAfterDisconnection (t * testing.T ) {
@@ -517,16 +602,13 @@ func TestBackedWriter_WriteBlocksAfterDisconnection(t *testing.T) {
517
602
_ , err = bw .Write ([]byte ("hello" ))
518
603
require .NoError (t , err )
519
604
520
- // Cause disconnection
605
+ // Cause disconnection - the write should now block instead of returning an error
521
606
writer .setError (xerrors .New ("connection lost" ))
522
- _ , err = bw .Write ([]byte ("world" ))
523
- require .Error (t , err )
524
- require .False (t , bw .Connected ())
525
607
526
- // Subsequent write should block
608
+ // This write should block
527
609
writeComplete := make (chan error , 1 )
528
610
go func () {
529
- _ , err := bw .Write ([]byte ("blocked " ))
611
+ _ , err := bw .Write ([]byte ("world " ))
530
612
writeComplete <- err
531
613
}()
532
614
@@ -538,13 +620,19 @@ func TestBackedWriter_WriteBlocksAfterDisconnection(t *testing.T) {
538
620
// Expected - write is blocked
539
621
}
540
622
623
+ // Should be disconnected
624
+ require .False (t , bw .Connected ())
625
+
541
626
// Reconnect and verify write completes
542
627
writer2 := newMockWriter ()
543
628
err = bw .Reconnect (5 , writer2 ) // Replay from after "hello"
544
629
require .NoError (t , err )
545
630
546
631
err = testutil .RequireReceive (ctx , t , writeComplete )
547
632
require .NoError (t , err )
633
+
634
+ // Check that only "world" was written during replay (not duplicated)
635
+ require .Equal (t , []byte ("world" ), writer2 .buffer .Bytes ()) // Only "world" since we replayed from sequence 5
548
636
}
549
637
550
638
func BenchmarkBackedWriter_Write (b * testing.B ) {
0 commit comments