Skip to content

Commit 2a84799

Browse files
committed
PR feedback implemented
1 parent dde9516 commit 2a84799

File tree

9 files changed

+880
-939
lines changed

9 files changed

+880
-939
lines changed

agent/immortalstreams/backedpipe/backed_pipe.go

Lines changed: 5 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ import (
44
"context"
55
"io"
66
"sync"
7-
"time"
87

98
"golang.org/x/sync/singleflight"
109
"golang.org/x/xerrors"
1110
)
1211

1312
const (
14-
// DefaultBufferSize is the default buffer size for the BackedWriter (64MB)
13+
// Default buffer capacity used by the writer - 64MB
1514
DefaultBufferSize = 64 * 1024 * 1024
1615
)
1716

@@ -60,31 +59,25 @@ type BackedPipe struct {
6059
func NewBackedPipe(ctx context.Context, reconnectFn ReconnectFunc) *BackedPipe {
6160
pipeCtx, cancel := context.WithCancel(ctx)
6261

62+
errorChan := make(chan error, 2) // Buffer for reader and writer errors
6363
bp := &BackedPipe{
6464
ctx: pipeCtx,
6565
cancel: cancel,
6666
reader: NewBackedReader(),
67-
writer: NewBackedWriterWithCapacity(DefaultBufferSize), // 64MB default buffer
67+
writer: NewBackedWriter(DefaultBufferSize, errorChan),
6868
reconnectFn: reconnectFn,
69-
errorChan: make(chan error, 2), // Buffer for reader and writer errors
69+
errorChan: errorChan,
7070
connectionChanged: make(chan struct{}, 1),
7171
}
7272

73-
// Set up error callbacks
73+
// Set up error callback for reader only (writer uses error channel directly)
7474
bp.reader.SetErrorCallback(func(err error) {
7575
select {
7676
case bp.errorChan <- err:
7777
case <-bp.ctx.Done():
7878
}
7979
})
8080

81-
bp.writer.SetErrorCallback(func(err error) {
82-
select {
83-
case bp.errorChan <- err:
84-
case <-bp.ctx.Done():
85-
}
86-
})
87-
8881
// Start error handler goroutine
8982
go bp.handleErrors()
9083

@@ -233,16 +226,6 @@ func (bp *BackedPipe) reconnectLocked() error {
233226
readerSeqNum, writerSeqNum)
234227
}
235228

236-
// Validate writer can replay from the requested sequence
237-
if !bp.writer.CanReplayFrom(readerSeqNum) {
238-
_ = conn.Close()
239-
// Calculate data loss
240-
currentSeq := bp.writer.SequenceNum()
241-
dataLoss := currentSeq - DefaultBufferSize - readerSeqNum
242-
return xerrors.Errorf("cannot replay from sequence %d (current: %d, data loss: ~%d bytes)",
243-
readerSeqNum, currentSeq, dataLoss)
244-
}
245-
246229
// Reconnect reader and writer
247230
seqNum := make(chan uint64, 1)
248231
newR := make(chan io.Reader, 1)
@@ -300,33 +283,6 @@ func (bp *BackedPipe) handleErrors() {
300283
}
301284
}
302285

303-
// WaitForConnection blocks until the pipe is connected or the context is canceled.
304-
func (bp *BackedPipe) WaitForConnection(ctx context.Context) error {
305-
for {
306-
bp.mu.RLock()
307-
connected := bp.connected
308-
closed := bp.closed
309-
bp.mu.RUnlock()
310-
311-
if closed {
312-
return io.ErrClosedPipe
313-
}
314-
315-
if connected {
316-
return nil
317-
}
318-
319-
select {
320-
case <-ctx.Done():
321-
return ctx.Err()
322-
case <-bp.connectionChanged:
323-
// Connection state changed, check again
324-
case <-time.After(10 * time.Millisecond):
325-
// Periodically re-check to avoid missed notifications
326-
}
327-
}
328-
}
329-
330286
// ForceReconnect forces a reconnection attempt immediately.
331287
// This can be used to force a reconnection if a new connection is established.
332288
func (bp *BackedPipe) ForceReconnect() error {

agent/immortalstreams/backedpipe/backed_pipe_test.go

Lines changed: 32 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -240,21 +240,35 @@ func TestBackedPipe_BasicReadWrite(t *testing.T) {
240240

241241
func TestBackedPipe_WriteBeforeConnect(t *testing.T) {
242242
t.Parallel()
243+
ctx := testutil.Context(t, testutil.WaitShort)
243244

244-
ctx := context.Background()
245245
conn := newMockConnection()
246246
reconnectFn, _, _ := mockReconnectFunc(conn)
247247

248248
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
249249
defer bp.Close()
250250

251-
// Write before connecting should succeed (buffered)
252-
n, err := bp.Write([]byte("hello"))
251+
// Write before connecting should block
252+
writeComplete := make(chan error, 1)
253+
go func() {
254+
_, err := bp.Write([]byte("hello"))
255+
writeComplete <- err
256+
}()
257+
258+
// Verify write is blocked
259+
select {
260+
case <-writeComplete:
261+
t.Fatal("Write should have blocked when disconnected")
262+
case <-time.After(100 * time.Millisecond):
263+
// Expected - write is blocked
264+
}
265+
266+
// Connect should unblock the write
267+
err := bp.Connect(ctx)
253268
require.NoError(t, err)
254-
require.Equal(t, 5, n)
255269

256-
// Connect should replay the buffered data
257-
err = bp.Connect(ctx)
270+
// Write should now complete
271+
err = testutil.RequireReceive(ctx, t, writeComplete)
258272
require.NoError(t, err)
259273

260274
// Check that data was replayed to connection
@@ -265,6 +279,7 @@ func TestBackedPipe_ReadBlocksWhenDisconnected(t *testing.T) {
265279
t.Parallel()
266280

267281
ctx := context.Background()
282+
testCtx := testutil.Context(t, testutil.WaitShort)
268283
reconnectFn, _, _ := mockReconnectFunc(newMockConnection())
269284

270285
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
@@ -283,7 +298,7 @@ func TestBackedPipe_ReadBlocksWhenDisconnected(t *testing.T) {
283298
}()
284299

285300
// Wait for the goroutine to start
286-
<-readStarted
301+
testutil.TryReceive(testCtx, t, readStarted)
287302

288303
// Give a brief moment for the read to actually block
289304
time.Sleep(time.Millisecond)
@@ -299,18 +314,15 @@ func TestBackedPipe_ReadBlocksWhenDisconnected(t *testing.T) {
299314
// Close should unblock the read
300315
bp.Close()
301316

302-
select {
303-
case <-readDone:
304-
require.Equal(t, io.ErrClosedPipe, readErr)
305-
case <-time.After(time.Second):
306-
t.Fatal("Read did not unblock after close")
307-
}
317+
testutil.TryReceive(testCtx, t, readDone)
318+
require.Equal(t, io.EOF, readErr)
308319
}
309320

310321
func TestBackedPipe_Reconnection(t *testing.T) {
311322
t.Parallel()
312323

313324
ctx := context.Background()
325+
testCtx := testutil.Context(t, testutil.WaitShort)
314326
conn1 := newMockConnection()
315327
conn2 := newMockConnection()
316328
conn2.seqNum = 17 // Remote has received 17 bytes, so replay from sequence 17
@@ -333,10 +345,12 @@ func TestBackedPipe_Reconnection(t *testing.T) {
333345
// Trigger a write to cause the pipe to notice the failure
334346
_, _ = bp.Write([]byte("trigger failure "))
335347

336-
<-signalChan
348+
testutil.RequireReceive(testCtx, t, signalChan)
337349

338-
err = bp.WaitForConnection(ctx)
339-
require.NoError(t, err)
350+
// Wait for reconnection to complete
351+
require.Eventually(t, func() bool {
352+
return bp.Connected()
353+
}, testutil.WaitShort, testutil.IntervalFast, "pipe should reconnect")
340354

341355
replayedData := conn2.ReadString()
342356
require.Equal(t, "***trigger failure ", replayedData, "Should replay exactly the data written after sequence 17")
@@ -391,45 +405,10 @@ func TestBackedPipe_CloseIdempotent(t *testing.T) {
391405
require.NoError(t, err)
392406
}
393407

394-
func TestBackedPipe_WaitForConnection(t *testing.T) {
395-
t.Parallel()
396-
397-
ctx := context.Background()
398-
conn := newMockConnection()
399-
reconnectFn, _, _ := mockReconnectFunc(conn)
400-
401-
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
402-
defer bp.Close()
403-
404-
// Should timeout when not connected
405-
// Use a shorter timeout for this test to speed up test runs
406-
timeoutCtx, cancel := context.WithTimeout(ctx, testutil.WaitSuperShort)
407-
defer cancel()
408-
409-
err := bp.WaitForConnection(timeoutCtx)
410-
require.Equal(t, context.DeadlineExceeded, err)
411-
412-
// Connect in background after a brief delay
413-
connectionStarted := make(chan struct{})
414-
go func() {
415-
close(connectionStarted)
416-
// Small delay to ensure WaitForConnection is called first
417-
time.Sleep(time.Millisecond)
418-
bp.Connect(context.Background())
419-
}()
420-
421-
// Wait for connection goroutine to start
422-
<-connectionStarted
423-
424-
// Should succeed once connected
425-
err = bp.WaitForConnection(context.Background())
426-
require.NoError(t, err)
427-
}
428-
429408
func TestBackedPipe_ConcurrentReadWrite(t *testing.T) {
430409
t.Parallel()
410+
ctx := testutil.Context(t, testutil.WaitShort)
431411

432-
ctx := context.Background()
433412
conn := newMockConnection()
434413
reconnectFn, _, _ := mockReconnectFunc(conn)
435414

@@ -487,12 +466,7 @@ func TestBackedPipe_ConcurrentReadWrite(t *testing.T) {
487466
wg.Wait()
488467
}()
489468

490-
select {
491-
case <-done:
492-
// Success
493-
case <-time.After(5 * time.Second):
494-
t.Fatal("Test timed out")
495-
}
469+
testutil.TryReceive(ctx, t, done)
496470

497471
// Close the channel and collect all written data
498472
close(writtenData)

agent/immortalstreams/backedpipe/backed_reader.go

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,35 +34,29 @@ func NewBackedReader() *BackedReader {
3434
// When connected, it reads from the underlying reader and updates sequence numbers.
3535
// Connection failures are automatically detected and reported to the higher layer via callback.
3636
func (br *BackedReader) Read(p []byte) (int, error) {
37+
br.mu.Lock()
38+
defer br.mu.Unlock()
39+
3740
for {
3841
// Step 1: Wait until we have a reader or are closed
39-
br.mu.Lock()
4042
for br.reader == nil && !br.closed {
4143
br.cond.Wait()
4244
}
4345

4446
if br.closed {
45-
br.mu.Unlock()
46-
return 0, io.ErrClosedPipe
47+
return 0, io.EOF
4748
}
4849

49-
// Capture the current reader and release the lock while performing
50-
// the potentially blocking I/O operation to avoid deadlocks with Close().
51-
r := br.reader
52-
br.mu.Unlock()
50+
// Step 2: Perform the read while holding the mutex
51+
// This ensures proper synchronization with Reconnect and Close operations
52+
n, err := br.reader.Read(p)
53+
br.sequenceNum += uint64(n) // #nosec G115 -- n is always >= 0 per io.Reader contract
5354

54-
// Step 2: Perform the read without holding the mutex
55-
n, err := r.Read(p)
56-
57-
// Step 3: Reacquire the lock to update state based on the result
58-
br.mu.Lock()
5955
if err == nil {
60-
br.sequenceNum += uint64(n) // #nosec G115 -- n is always >= 0 per io.Reader contract
61-
br.mu.Unlock()
6256
return n, nil
6357
}
6458

65-
// Mark disconnected so future reads will wait for reconnection
59+
// Mark reader as disconnected so future reads will wait for reconnection
6660
br.reader = nil
6761

6862
if br.onError != nil {
@@ -71,13 +65,8 @@ func (br *BackedReader) Read(p []byte) (int, error) {
7165

7266
// If we got some data before the error, return it now
7367
if n > 0 {
74-
br.sequenceNum += uint64(n)
75-
br.mu.Unlock()
7668
return n, nil
7769
}
78-
79-
// Otherwise loop and wait for reconnection or close
80-
br.mu.Unlock()
8170
}
8271
}
8372

@@ -91,8 +80,7 @@ func (br *BackedReader) Reconnect(seqNum chan<- uint64, newR <-chan io.Reader) {
9180
defer br.mu.Unlock()
9281

9382
if br.closed {
94-
// Send 0 sequence number and close the channel to indicate closed state
95-
seqNum <- 0
83+
// Close the channel to indicate closed state
9684
close(seqNum)
9785
return
9886
}
@@ -117,8 +105,8 @@ func (br *BackedReader) Reconnect(seqNum chan<- uint64, newR <-chan io.Reader) {
117105
br.cond.Broadcast()
118106
}
119107

120-
// Closes the reader and wakes up any blocked reads.
121-
// After closing, all Read calls will return io.ErrClosedPipe.
108+
// Close the reader and wake up any blocked reads.
109+
// After closing, all Read calls will return io.EOF.
122110
func (br *BackedReader) Close() error {
123111
br.mu.Lock()
124112
defer br.mu.Unlock()

0 commit comments

Comments
 (0)