Skip to content

Commit f4907b6

Browse files
committed
improvements to backed pipe
1 parent 1327c97 commit f4907b6

File tree

3 files changed

+20
-124
lines changed

3 files changed

+20
-124
lines changed

agent/immortalstreams/backedpipe/backed_pipe.go

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ type BackedPipe struct {
5656
// Error channel for receiving connection errors from reader/writer
5757
errorChan chan error
5858

59-
// Connection state notification
60-
connectionChanged chan struct{}
61-
6259
// singleflight group to dedupe concurrent ForceReconnect calls
6360
sf singleflight.Group
6461
}
@@ -70,13 +67,12 @@ func NewBackedPipe(ctx context.Context, reconnectFn ReconnectFunc) *BackedPipe {
7067

7168
errorChan := make(chan error, 2) // Buffer for reader and writer errors
7269
bp := &BackedPipe{
73-
ctx: pipeCtx,
74-
cancel: cancel,
75-
reader: NewBackedReader(),
76-
writer: NewBackedWriter(DefaultBufferSize, errorChan),
77-
reconnectFn: reconnectFn,
78-
errorChan: errorChan,
79-
connectionChanged: make(chan struct{}, 1),
70+
ctx: pipeCtx,
71+
cancel: cancel,
72+
reader: NewBackedReader(),
73+
writer: NewBackedWriter(DefaultBufferSize, errorChan),
74+
reconnectFn: reconnectFn,
75+
errorChan: errorChan,
8076
}
8177

8278
// Set up error callback for reader only (writer uses error channel directly)
@@ -94,7 +90,7 @@ func NewBackedPipe(ctx context.Context, reconnectFn ReconnectFunc) *BackedPipe {
9490
}
9591

9692
// Connect establishes the initial connection using the reconnect function.
97-
func (bp *BackedPipe) Connect(_ context.Context) error { // external ctx ignored; internal ctx used
93+
func (bp *BackedPipe) Connect() error {
9894
bp.mu.Lock()
9995
defer bp.mu.Unlock()
10096

@@ -168,7 +164,6 @@ func (bp *BackedPipe) Close() error {
168164
}
169165

170166
bp.connected = false
171-
bp.signalConnectionChange()
172167

173168
// Return first error encountered
174169
if readerErr != nil {
@@ -187,15 +182,6 @@ func (bp *BackedPipe) Connected() bool {
187182
return bp.connected
188183
}
189184

190-
// signalConnectionChange signals that the connection state has changed.
191-
func (bp *BackedPipe) signalConnectionChange() {
192-
select {
193-
case bp.connectionChanged <- struct{}{}:
194-
default:
195-
// Channel is full, which is fine - we just want to signal that something changed
196-
}
197-
}
198-
199185
// reconnectLocked handles the reconnection logic. Must be called with write lock held.
200186
func (bp *BackedPipe) reconnectLocked() error {
201187
if bp.reconnecting {
@@ -214,16 +200,11 @@ func (bp *BackedPipe) reconnectLocked() error {
214200
}
215201

216202
bp.connected = false
217-
bp.signalConnectionChange()
218203

219204
// Get current writer sequence number to send to remote
220205
writerSeqNum := bp.writer.SequenceNum()
221206

222-
// Unlock during reconnect attempt to avoid blocking reads/writes
223-
bp.mu.Unlock()
224207
conn, readerSeqNum, err := bp.reconnectFn(bp.ctx, writerSeqNum)
225-
bp.mu.Lock()
226-
227208
if err != nil {
228209
return ErrReconnectFailed
229210
}
@@ -253,7 +234,6 @@ func (bp *BackedPipe) reconnectLocked() error {
253234
// Success - update state
254235
bp.conn = conn
255236
bp.connected = true
256-
bp.signalConnectionChange()
257237

258238
return nil
259239
}
@@ -276,7 +256,6 @@ func (bp *BackedPipe) handleErrors() {
276256

277257
// Mark as disconnected
278258
bp.connected = false
279-
bp.signalConnectionChange()
280259

281260
// Try to reconnect using internal context
282261
reconnectErr := bp.reconnectLocked()

agent/immortalstreams/backedpipe/backed_pipe_test.go

Lines changed: 13 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func TestBackedPipe_Connect(t *testing.T) {
167167
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
168168
defer bp.Close()
169169

170-
err := bp.Connect(ctx)
170+
err := bp.Connect()
171171
require.NoError(t, err)
172172
require.True(t, bp.Connected())
173173
require.Equal(t, 1, *callCount)
@@ -183,11 +183,11 @@ func TestBackedPipe_ConnectAlreadyConnected(t *testing.T) {
183183
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
184184
defer bp.Close()
185185

186-
err := bp.Connect(ctx)
186+
err := bp.Connect()
187187
require.NoError(t, err)
188188

189189
// Second connect should fail
190-
err = bp.Connect(ctx)
190+
err = bp.Connect()
191191
require.Error(t, err)
192192
require.ErrorIs(t, err, backedpipe.ErrPipeAlreadyConnected)
193193
}
@@ -204,7 +204,7 @@ func TestBackedPipe_ConnectAfterClose(t *testing.T) {
204204
err := bp.Close()
205205
require.NoError(t, err)
206206

207-
err = bp.Connect(ctx)
207+
err = bp.Connect()
208208
require.Error(t, err)
209209
require.ErrorIs(t, err, backedpipe.ErrPipeClosed)
210210
}
@@ -219,7 +219,7 @@ func TestBackedPipe_BasicReadWrite(t *testing.T) {
219219
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
220220
defer bp.Close()
221221

222-
err := bp.Connect(ctx)
222+
err := bp.Connect()
223223
require.NoError(t, err)
224224

225225
// Write data
@@ -264,7 +264,7 @@ func TestBackedPipe_WriteBeforeConnect(t *testing.T) {
264264
}
265265

266266
// Connect should unblock the write
267-
err := bp.Connect(ctx)
267+
err := bp.Connect()
268268
require.NoError(t, err)
269269

270270
// Write should now complete
@@ -332,7 +332,7 @@ func TestBackedPipe_Reconnection(t *testing.T) {
332332
defer bp.Close()
333333

334334
// Initial connect
335-
err := bp.Connect(ctx)
335+
err := bp.Connect()
336336
require.NoError(t, err)
337337

338338
// Write some data before failure
@@ -373,7 +373,7 @@ func TestBackedPipe_Close(t *testing.T) {
373373

374374
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
375375

376-
err := bp.Connect(ctx)
376+
err := bp.Connect()
377377
require.NoError(t, err)
378378

379379
err = bp.Close()
@@ -405,88 +405,6 @@ func TestBackedPipe_CloseIdempotent(t *testing.T) {
405405
require.NoError(t, err)
406406
}
407407

408-
func TestBackedPipe_ConcurrentReadWrite(t *testing.T) {
409-
t.Parallel()
410-
ctx := testutil.Context(t, testutil.WaitShort)
411-
412-
conn := newMockConnection()
413-
reconnectFn, _, _ := mockReconnectFunc(conn)
414-
415-
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
416-
defer bp.Close()
417-
418-
err := bp.Connect(ctx)
419-
require.NoError(t, err)
420-
421-
var wg sync.WaitGroup
422-
numWriters := 3
423-
writesPerWriter := 10
424-
425-
// Fill read buffer with test data first
426-
testData := make([]byte, 1000)
427-
for i := range testData {
428-
testData[i] = 'A'
429-
}
430-
conn.WriteString(string(testData))
431-
432-
// Channel to collect all written data
433-
writtenData := make(chan byte, numWriters*writesPerWriter)
434-
435-
// Start a few readers
436-
for i := 0; i < 2; i++ {
437-
wg.Add(1)
438-
go func() {
439-
defer wg.Done()
440-
buf := make([]byte, 10)
441-
for j := 0; j < 10; j++ {
442-
bp.Read(buf)
443-
time.Sleep(time.Millisecond) // Small delay to avoid busy waiting
444-
}
445-
}()
446-
}
447-
448-
// Start writers
449-
for i := 0; i < numWriters; i++ {
450-
wg.Add(1)
451-
go func(id int) {
452-
defer wg.Done()
453-
for j := 0; j < writesPerWriter; j++ {
454-
data := []byte{byte(id + '0')}
455-
bp.Write(data)
456-
writtenData <- byte(id + '0')
457-
time.Sleep(time.Millisecond) // Small delay
458-
}
459-
}(i)
460-
}
461-
462-
// Wait with timeout
463-
done := make(chan struct{})
464-
go func() {
465-
defer close(done)
466-
wg.Wait()
467-
}()
468-
469-
testutil.TryReceive(ctx, t, done)
470-
471-
// Close the channel and collect all written data
472-
close(writtenData)
473-
var allWritten []byte
474-
for b := range writtenData {
475-
allWritten = append(allWritten, b)
476-
}
477-
478-
// Verify that all written data was received by the connection
479-
// Note: Since this test uses the old mock that returns readerSeqNum = 0,
480-
// all data will be replayed, so we expect to receive all written data
481-
receivedData := conn.ReadString()
482-
require.GreaterOrEqual(t, len(receivedData), len(allWritten), "Connection should have received at least all written data")
483-
484-
// Check that all written bytes appear in the received data
485-
for _, writtenByte := range allWritten {
486-
require.Contains(t, receivedData, string(writtenByte), "Written byte %c should be present in received data", writtenByte)
487-
}
488-
}
489-
490408
func TestBackedPipe_ReconnectFunctionFailure(t *testing.T) {
491409
t.Parallel()
492410

@@ -499,7 +417,7 @@ func TestBackedPipe_ReconnectFunctionFailure(t *testing.T) {
499417
bp := backedpipe.NewBackedPipe(ctx, failingReconnectFn)
500418
defer bp.Close()
501419

502-
err := bp.Connect(ctx)
420+
err := bp.Connect()
503421
require.Error(t, err)
504422
require.ErrorIs(t, err, backedpipe.ErrReconnectFailed)
505423
require.False(t, bp.Connected())
@@ -517,7 +435,7 @@ func TestBackedPipe_ForceReconnect(t *testing.T) {
517435
defer bp.Close()
518436

519437
// Initial connect
520-
err := bp.Connect(ctx)
438+
err := bp.Connect()
521439
require.NoError(t, err)
522440
require.True(t, bp.Connected())
523441
require.Equal(t, 1, *callCount)
@@ -644,7 +562,7 @@ func TestBackedPipe_EOFTriggersReconnection(t *testing.T) {
644562
defer bp.Close()
645563

646564
// Initial connect
647-
err := bp.Connect(ctx)
565+
err := bp.Connect()
648566
require.NoError(t, err)
649567
require.Equal(t, 1, callCount)
650568

@@ -685,7 +603,7 @@ func BenchmarkBackedPipe_Write(b *testing.B) {
685603
reconnectFn, _, _ := mockReconnectFunc(conn)
686604

687605
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
688-
bp.Connect(ctx)
606+
bp.Connect()
689607
b.Cleanup(func() {
690608
_ = bp.Close()
691609
})
@@ -704,7 +622,7 @@ func BenchmarkBackedPipe_Read(b *testing.B) {
704622
reconnectFn, _, _ := mockReconnectFunc(conn)
705623

706624
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
707-
bp.Connect(ctx)
625+
bp.Connect()
708626
b.Cleanup(func() {
709627
_ = bp.Close()
710628
})

agent/immortalstreams/backedpipe/backed_writer.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,6 @@ func (bw *BackedWriter) Reconnect(replayFromSeq uint64, newWriter io.Writer) err
176176
// no concurrent operations can interfere with the reconnection process.
177177
if len(replayData) > 0 {
178178
n, err := newWriter.Write(replayData)
179-
180179
if err != nil {
181180
// Reconnect failed, writer remains nil
182181
return ErrReplayFailed

0 commit comments

Comments
 (0)