Skip to content

Commit 6e13d4a

Browse files
committed
changed err to EOF instead of ErrClosedPipe and changed backed_writer to block on errors
1 parent e1a6e78 commit 6e13d4a

File tree

4 files changed

+154
-43
lines changed

4 files changed

+154
-43
lines changed

agent/immortalstreams/backedpipe/backed_pipe.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (bp *BackedPipe) Read(p []byte) (int, error) {
110110
bp.mu.RUnlock()
111111

112112
if closed {
113-
return 0, io.ErrClosedPipe
113+
return 0, io.EOF
114114
}
115115

116116
return reader.Read(p)
@@ -124,7 +124,7 @@ func (bp *BackedPipe) Write(p []byte) (int, error) {
124124
bp.mu.RUnlock()
125125

126126
if closed {
127-
return 0, io.ErrClosedPipe
127+
return 0, io.EOF
128128
}
129129

130130
return writer.Write(p)
@@ -294,7 +294,7 @@ func (bp *BackedPipe) ForceReconnect() error {
294294
defer bp.mu.Unlock()
295295

296296
if bp.closed {
297-
return nil, io.ErrClosedPipe
297+
return nil, io.EOF
298298
}
299299

300300
return nil, bp.reconnectLocked()

agent/immortalstreams/backedpipe/backed_pipe_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -382,10 +382,10 @@ func TestBackedPipe_Close(t *testing.T) {
382382

383383
// Operations after close should fail
384384
_, err = bp.Read(make([]byte, 10))
385-
require.Equal(t, io.ErrClosedPipe, err)
385+
require.Equal(t, io.EOF, err)
386386

387387
_, err = bp.Write([]byte("test"))
388-
require.Equal(t, io.ErrClosedPipe, err)
388+
require.Equal(t, io.EOF, err)
389389
}
390390

391391
func TestBackedPipe_CloseIdempotent(t *testing.T) {
@@ -567,7 +567,7 @@ func TestBackedPipe_ForceReconnectWhenClosed(t *testing.T) {
567567
// Try to force reconnect when closed
568568
err = bp.ForceReconnect()
569569
require.Error(t, err)
570-
require.Equal(t, io.ErrClosedPipe, err)
570+
require.Equal(t, io.EOF, err)
571571
}
572572

573573
func TestBackedPipe_ForceReconnectWhenDisconnected(t *testing.T) {

agent/immortalstreams/backedpipe/backed_writer.go

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,21 @@ func NewBackedWriter(capacity int, errorChan chan<- error) *BackedWriter {
4040
return bw
4141
}
4242

43+
// blockUntilConnectedOrClosed blocks until either a writer is available or the BackedWriter is closed.
44+
// Returns io.EOF if closed while waiting, nil if connected.
45+
func (bw *BackedWriter) blockUntilConnectedOrClosed() error {
46+
for bw.writer == nil && !bw.closed {
47+
bw.cond.Wait()
48+
}
49+
if bw.closed {
50+
return io.EOF
51+
}
52+
return nil
53+
}
54+
4355
// Write implements io.Writer.
44-
// When connected, it writes to both the ring buffer and the underlying writer.
56+
// When connected, it writes to both the ring buffer (to preserve data in case we need to replay it)
57+
// and the underlying writer.
4558
// If the underlying write fails, the writer is marked as disconnected and the write blocks
4659
// until reconnection occurs.
4760
func (bw *BackedWriter) Write(p []byte) (int, error) {
@@ -53,25 +66,19 @@ func (bw *BackedWriter) Write(p []byte) (int, error) {
5366
defer bw.mu.Unlock()
5467

5568
if bw.closed {
56-
return 0, io.ErrClosedPipe
69+
return 0, io.EOF
5770
}
5871

5972
// Block until connected
60-
for bw.writer == nil && !bw.closed {
61-
bw.cond.Wait()
62-
}
63-
64-
// Check if we were closed while waiting
65-
if bw.closed {
66-
return 0, io.ErrClosedPipe
73+
if err := bw.blockUntilConnectedOrClosed(); err != nil {
74+
return 0, err
6775
}
6876

69-
// Always write to buffer first
77+
// Write to buffer
7078
bw.buffer.Write(p)
71-
// Always advance sequence number by the full length
7279
bw.sequenceNum += uint64(len(p))
7380

74-
// Write to underlying writer
81+
// Try to write to underlying writer
7582
n, err := bw.writer.Write(p)
7683
if err != nil {
7784
// Connection failed, mark as disconnected
@@ -82,19 +89,35 @@ func (bw *BackedWriter) Write(p []byte) (int, error) {
8289
case bw.errorChan <- err:
8390
default:
8491
}
85-
return 0, err
92+
93+
// Block until reconnected - reconnection will replay this data
94+
if err := bw.blockUntilConnectedOrClosed(); err != nil {
95+
return 0, err
96+
}
97+
98+
// Don't retry - reconnection replay handled it
99+
return len(p), nil
86100
}
87101

88102
if n != len(p) {
103+
// Partial write - treat as failure
89104
bw.writer = nil
90105
err = xerrors.Errorf("short write: %d bytes written, %d expected", n, len(p))
91106
select {
92107
case bw.errorChan <- err:
93108
default:
94109
}
95-
return 0, err
110+
111+
// Block until reconnected - reconnection will replay this data
112+
if err := bw.blockUntilConnectedOrClosed(); err != nil {
113+
return 0, err
114+
}
115+
116+
// Don't retry - reconnection replay handled it
117+
return len(p), nil
96118
}
97119

120+
// Write succeeded
98121
return len(p), nil
99122
}
100123

@@ -169,7 +192,7 @@ func (bw *BackedWriter) Reconnect(replayFromSeq uint64, newWriter io.Writer) err
169192
}
170193

171194
// Close closes the writer and prevents further writes.
172-
// After closing, all Write calls will return io.ErrClosedPipe.
195+
// After closing, all Write calls will return io.EOF.
173196
// This code keeps the Close() signature consistent with io.Closer,
174197
// but it never actually returns an error.
175198
func (bw *BackedWriter) Close() error {

agent/immortalstreams/backedpipe/backed_writer_test.go

Lines changed: 110 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,9 @@ func TestBackedWriter_WriteToUnderlyingWhenConnected(t *testing.T) {
143143
require.Equal(t, []byte("hello"), writer.buffer.Bytes())
144144
}
145145

146-
func TestBackedWriter_DisconnectOnWriteFailure(t *testing.T) {
146+
func TestBackedWriter_BlockOnWriteFailure(t *testing.T) {
147147
t.Parallel()
148+
ctx := testutil.Context(t, testutil.WaitShort)
148149

149150
errorChan := make(chan error, 1)
150151
bw := backedpipe.NewBackedWriter(backedpipe.DefaultBufferSize, errorChan)
@@ -157,11 +158,26 @@ func TestBackedWriter_DisconnectOnWriteFailure(t *testing.T) {
157158
// Cause write to fail
158159
writer.setError(xerrors.New("write failed"))
159160

160-
// Write should fail and disconnect
161-
n, err := bw.Write([]byte("hello"))
162-
require.Error(t, err) // Write should fail
163-
require.Equal(t, 0, n)
164-
require.False(t, bw.Connected()) // Should be disconnected
161+
// Write should block when underlying writer fails
162+
writeComplete := make(chan struct{})
163+
var writeErr error
164+
var n int
165+
166+
go func() {
167+
defer close(writeComplete)
168+
n, writeErr = bw.Write([]byte("hello"))
169+
}()
170+
171+
// Verify write is blocked
172+
select {
173+
case <-writeComplete:
174+
t.Fatal("Write should have blocked when underlying writer fails")
175+
case <-time.After(50 * time.Millisecond):
176+
// Expected - write is blocked
177+
}
178+
179+
// Should be disconnected
180+
require.False(t, bw.Connected())
165181

166182
// Error should be sent to error channel
167183
select {
@@ -170,6 +186,19 @@ func TestBackedWriter_DisconnectOnWriteFailure(t *testing.T) {
170186
default:
171187
t.Fatal("Expected error to be sent to error channel")
172188
}
189+
190+
// Reconnect with working writer and verify write completes
191+
writer2 := newMockWriter()
192+
err = bw.Reconnect(0, writer2) // Replay from beginning
193+
require.NoError(t, err)
194+
195+
// Write should now complete
196+
testutil.TryReceive(ctx, t, writeComplete)
197+
198+
require.NoError(t, writeErr)
199+
require.Equal(t, 5, n)
200+
require.Equal(t, uint64(5), bw.SequenceNum())
201+
require.Equal(t, []byte("hello"), writer2.buffer.Bytes())
173202
}
174203

175204
func TestBackedWriter_ReplayOnReconnect(t *testing.T) {
@@ -193,15 +222,43 @@ func TestBackedWriter_ReplayOnReconnect(t *testing.T) {
193222

194223
// Disconnect by causing a write failure
195224
writer1.setError(xerrors.New("connection lost"))
196-
_, err = bw.Write([]byte("test"))
197-
require.Error(t, err)
225+
226+
// Write should block when underlying writer fails
227+
writeComplete := make(chan struct{})
228+
var writeErr error
229+
var n int
230+
231+
go func() {
232+
defer close(writeComplete)
233+
n, writeErr = bw.Write([]byte("test"))
234+
}()
235+
236+
// Verify write is blocked
237+
select {
238+
case <-writeComplete:
239+
t.Fatal("Write should have blocked when underlying writer fails")
240+
case <-time.After(50 * time.Millisecond):
241+
// Expected - write is blocked
242+
}
243+
198244
require.False(t, bw.Connected())
199245

200246
// Reconnect with new writer and request replay from beginning
201247
writer2 := newMockWriter()
202248
err = bw.Reconnect(0, writer2)
203249
require.NoError(t, err)
204250

251+
// Write should now complete
252+
select {
253+
case <-writeComplete:
254+
// Expected - write completed
255+
case <-time.After(100 * time.Millisecond):
256+
t.Fatal("Write should have completed after reconnection")
257+
}
258+
259+
require.NoError(t, writeErr)
260+
require.Equal(t, 4, n)
261+
205262
// Should have replayed all data including the failed write that was buffered
206263
require.Equal(t, []byte("hello worldtest"), writer2.buffer.Bytes())
207264

@@ -319,7 +376,7 @@ func TestBackedWriter_Close(t *testing.T) {
319376

320377
// Writes after close should fail
321378
_, err = bw.Write([]byte("test"))
322-
require.Equal(t, io.ErrClosedPipe, err)
379+
require.Equal(t, io.EOF, err)
323380

324381
// Reconnect after close should fail
325382
err = bw.Reconnect(0, newMockWriter())
@@ -401,8 +458,9 @@ func TestBackedWriter_ReconnectDuringReplay(t *testing.T) {
401458
require.False(t, bw.Connected())
402459
}
403460

404-
func TestBackedWriter_PartialWriteToUnderlying(t *testing.T) {
461+
func TestBackedWriter_BlockOnPartialWrite(t *testing.T) {
405462
t.Parallel()
463+
ctx := testutil.Context(t, testutil.WaitShort)
406464

407465
errorChan := make(chan error, 1)
408466
bw := backedpipe.NewBackedWriter(backedpipe.DefaultBufferSize, errorChan)
@@ -419,12 +477,26 @@ func TestBackedWriter_PartialWriteToUnderlying(t *testing.T) {
419477

420478
bw.Reconnect(0, writer)
421479

422-
// Write should fail due to partial write
423-
n, err := bw.Write([]byte("hello"))
424-
require.Error(t, err)
425-
require.Equal(t, 0, n)
480+
// Write should block due to partial write
481+
writeComplete := make(chan struct{})
482+
var writeErr error
483+
var n int
484+
485+
go func() {
486+
defer close(writeComplete)
487+
n, writeErr = bw.Write([]byte("hello"))
488+
}()
489+
490+
// Verify write is blocked
491+
select {
492+
case <-writeComplete:
493+
t.Fatal("Write should have blocked when underlying writer does partial write")
494+
case <-time.After(50 * time.Millisecond):
495+
// Expected - write is blocked
496+
}
497+
498+
// Should be disconnected
426499
require.False(t, bw.Connected())
427-
require.Contains(t, err.Error(), "short write")
428500

429501
// Error should be sent to error channel
430502
select {
@@ -433,6 +505,19 @@ func TestBackedWriter_PartialWriteToUnderlying(t *testing.T) {
433505
default:
434506
t.Fatal("Expected error to be sent to error channel")
435507
}
508+
509+
// Reconnect with working writer and verify write completes
510+
writer2 := newMockWriter()
511+
err := bw.Reconnect(0, writer2) // Replay from beginning
512+
require.NoError(t, err)
513+
514+
// Write should now complete
515+
testutil.TryReceive(ctx, t, writeComplete)
516+
517+
require.NoError(t, writeErr)
518+
require.Equal(t, 5, n)
519+
require.Equal(t, uint64(5), bw.SequenceNum())
520+
require.Equal(t, []byte("hello"), writer2.buffer.Bytes())
436521
}
437522

438523
func TestBackedWriter_WriteUnblocksOnReconnect(t *testing.T) {
@@ -498,7 +583,7 @@ func TestBackedWriter_CloseUnblocksWaitingWrites(t *testing.T) {
498583

499584
// Write should now complete with error
500585
err = testutil.RequireReceive(ctx, t, writeComplete)
501-
require.Equal(t, io.ErrClosedPipe, err)
586+
require.Equal(t, io.EOF, err)
502587
}
503588

504589
func TestBackedWriter_WriteBlocksAfterDisconnection(t *testing.T) {
@@ -517,16 +602,13 @@ func TestBackedWriter_WriteBlocksAfterDisconnection(t *testing.T) {
517602
_, err = bw.Write([]byte("hello"))
518603
require.NoError(t, err)
519604

520-
// Cause disconnection
605+
// Cause disconnection - the write should now block instead of returning an error
521606
writer.setError(xerrors.New("connection lost"))
522-
_, err = bw.Write([]byte("world"))
523-
require.Error(t, err)
524-
require.False(t, bw.Connected())
525607

526-
// Subsequent write should block
608+
// This write should block
527609
writeComplete := make(chan error, 1)
528610
go func() {
529-
_, err := bw.Write([]byte("blocked"))
611+
_, err := bw.Write([]byte("world"))
530612
writeComplete <- err
531613
}()
532614

@@ -538,13 +620,19 @@ func TestBackedWriter_WriteBlocksAfterDisconnection(t *testing.T) {
538620
// Expected - write is blocked
539621
}
540622

623+
// Should be disconnected
624+
require.False(t, bw.Connected())
625+
541626
// Reconnect and verify write completes
542627
writer2 := newMockWriter()
543628
err = bw.Reconnect(5, writer2) // Replay from after "hello"
544629
require.NoError(t, err)
545630

546631
err = testutil.RequireReceive(ctx, t, writeComplete)
547632
require.NoError(t, err)
633+
634+
// Check that only "world" was written during replay (not duplicated)
635+
require.Equal(t, []byte("world"), writer2.buffer.Bytes()) // Only "world" since we replayed from sequence 5
548636
}
549637

550638
func BenchmarkBackedWriter_Write(b *testing.B) {

0 commit comments

Comments
 (0)