Skip to content

Commit 2c2db8e

Browse files
committed
coordinating tests with channels
separate channels for writer/reader errors moved to an interface for reconnection coordination logic for closing a pipe
1 parent f4907b6 commit 2c2db8e

File tree

7 files changed

+427
-280
lines changed

7 files changed

+427
-280
lines changed

agent/immortalstreams/backedpipe/backed_pipe.go

Lines changed: 76 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io"
66
"sync"
77

8+
"golang.org/x/sync/errgroup"
89
"golang.org/x/sync/singleflight"
910
"golang.org/x/xerrors"
1011
)
@@ -23,8 +24,8 @@ const (
2324
DefaultBufferSize = 64 * 1024 * 1024
2425
)
2526

26-
// ReconnectFunc is called when the BackedPipe needs to establish a new connection.
27-
// It should:
27+
// Reconnector is an interface for establishing connections when the BackedPipe needs to reconnect.
28+
// Implementations should:
2829
// 1. Establish a new connection to the remote side
2930
// 2. Exchange sequence numbers with the remote side
3031
// 3. Return the new connection and the remote's current sequence number
@@ -34,7 +35,9 @@ const (
3435
//
3536
// The returned readerSeqNum should be the remote side's current sequence number,
3637
// which indicates where the local reader should resume from.
37-
type ReconnectFunc func(ctx context.Context, writerSeqNum uint64) (conn io.ReadWriteCloser, readerSeqNum uint64, err error)
38+
type Reconnector interface {
39+
Reconnect(ctx context.Context, writerSeqNum uint64) (conn io.ReadWriteCloser, readerSeqNum uint64, err error)
40+
}
3841

3942
// BackedPipe provides a reliable bidirectional byte stream over unreliable network connections.
4043
// It orchestrates a BackedReader and BackedWriter to provide transparent reconnection
@@ -45,44 +48,39 @@ type BackedPipe struct {
4548
mu sync.RWMutex
4649
reader *BackedReader
4750
writer *BackedWriter
48-
reconnectFn ReconnectFunc
51+
reconnector Reconnector
4952
conn io.ReadWriteCloser
5053
connected bool
5154
closed bool
5255

5356
// Reconnection state
5457
reconnecting bool
5558

56-
// Error channel for receiving connection errors from reader/writer
57-
errorChan chan error
59+
// Error channels for receiving connection errors from reader/writer separately
60+
readerErrorChan chan error
61+
writerErrorChan chan error
5862

5963
// singleflight group to dedupe concurrent ForceReconnect calls
6064
sf singleflight.Group
6165
}
6266

63-
// NewBackedPipe creates a new BackedPipe with default options and the specified reconnect function.
67+
// NewBackedPipe creates a new BackedPipe with default options and the specified reconnector.
6468
// The pipe starts disconnected and must be connected using Connect().
65-
func NewBackedPipe(ctx context.Context, reconnectFn ReconnectFunc) *BackedPipe {
69+
func NewBackedPipe(ctx context.Context, reconnector Reconnector) *BackedPipe {
6670
pipeCtx, cancel := context.WithCancel(ctx)
6771

68-
errorChan := make(chan error, 2) // Buffer for reader and writer errors
72+
readerErrorChan := make(chan error, 1) // Buffer for reader errors
73+
writerErrorChan := make(chan error, 1) // Buffer for writer errors
6974
bp := &BackedPipe{
70-
ctx: pipeCtx,
71-
cancel: cancel,
72-
reader: NewBackedReader(),
73-
writer: NewBackedWriter(DefaultBufferSize, errorChan),
74-
reconnectFn: reconnectFn,
75-
errorChan: errorChan,
75+
ctx: pipeCtx,
76+
cancel: cancel,
77+
reader: NewBackedReader(readerErrorChan),
78+
writer: NewBackedWriter(DefaultBufferSize, writerErrorChan),
79+
reconnector: reconnector,
80+
readerErrorChan: readerErrorChan,
81+
writerErrorChan: writerErrorChan,
7682
}
7783

78-
// Set up error callback for reader only (writer uses error channel directly)
79-
bp.reader.SetErrorCallback(func(err error) {
80-
select {
81-
case bp.errorChan <- err:
82-
case <-bp.ctx.Done():
83-
}
84-
})
85-
8684
// Start error handler goroutine
8785
go bp.handleErrors()
8886

@@ -109,16 +107,7 @@ func (bp *BackedPipe) Connect() error {
109107

110108
// Read implements io.Reader by delegating to the BackedReader.
111109
func (bp *BackedPipe) Read(p []byte) (int, error) {
112-
bp.mu.RLock()
113-
reader := bp.reader
114-
closed := bp.closed
115-
bp.mu.RUnlock()
116-
117-
if closed {
118-
return 0, io.EOF
119-
}
120-
121-
return reader.Read(p)
110+
return bp.reader.Read(p)
122111
}
123112

124113
// Write implements io.Writer by delegating to the BackedWriter.
@@ -147,32 +136,38 @@ func (bp *BackedPipe) Close() error {
147136
bp.closed = true
148137
bp.cancel() // Cancel main context
149138

150-
// Close underlying components
151-
var readerErr, writerErr, connErr error
139+
// Close all components in parallel to avoid deadlocks
140+
//
141+
// IMPORTANT: The connection must be closed first to unblock any
142+
// readers or writers that might be holding the mutex on Read/Write
143+
var g errgroup.Group
152144

153-
if bp.reader != nil {
154-
readerErr = bp.reader.Close()
145+
if bp.conn != nil {
146+
conn := bp.conn
147+
g.Go(func() error {
148+
return conn.Close()
149+
})
150+
bp.conn = nil
155151
}
156152

157-
if bp.writer != nil {
158-
writerErr = bp.writer.Close()
153+
if bp.reader != nil {
154+
reader := bp.reader
155+
g.Go(func() error {
156+
return reader.Close()
157+
})
159158
}
160159

161-
if bp.conn != nil {
162-
connErr = bp.conn.Close()
163-
bp.conn = nil
160+
if bp.writer != nil {
161+
writer := bp.writer
162+
g.Go(func() error {
163+
return writer.Close()
164+
})
164165
}
165166

166167
bp.connected = false
167168

168-
// Return first error encountered
169-
if readerErr != nil {
170-
return readerErr
171-
}
172-
if writerErr != nil {
173-
return writerErr
174-
}
175-
return connErr
169+
// Wait for all close operations to complete and return any error
170+
return g.Wait()
176171
}
177172

178173
// Connected returns whether the pipe is currently connected.
@@ -204,7 +199,7 @@ func (bp *BackedPipe) reconnectLocked() error {
204199
// Get current writer sequence number to send to remote
205200
writerSeqNum := bp.writer.SequenceNum()
206201

207-
conn, readerSeqNum, err := bp.reconnectFn(bp.ctx, writerSeqNum)
202+
conn, readerSeqNum, err := bp.reconnector.Reconnect(bp.ctx, writerSeqNum)
208203
if err != nil {
209204
return ErrReconnectFailed
210205
}
@@ -244,32 +239,40 @@ func (bp *BackedPipe) handleErrors() {
244239
select {
245240
case <-bp.ctx.Done():
246241
return
247-
case err := <-bp.errorChan:
248-
// Connection error occurred
249-
bp.mu.Lock()
250-
251-
// Skip if already closed or not connected
252-
if bp.closed || !bp.connected {
253-
bp.mu.Unlock()
254-
continue
255-
}
256-
257-
// Mark as disconnected
258-
bp.connected = false
259-
260-
// Try to reconnect using internal context
261-
reconnectErr := bp.reconnectLocked()
262-
bp.mu.Unlock()
263-
264-
if reconnectErr != nil {
265-
// Reconnection failed - log or handle as needed
266-
// For now, we'll just continue and wait for manual reconnection
267-
_ = err // Use the original error
268-
}
242+
case err := <-bp.readerErrorChan:
243+
// Reader connection error occurred
244+
bp.handleConnectionError(err, "reader")
245+
case err := <-bp.writerErrorChan:
246+
// Writer connection error occurred
247+
bp.handleConnectionError(err, "writer")
269248
}
270249
}
271250
}
272251

252+
// handleConnectionError handles errors from either reader or writer components.
253+
func (bp *BackedPipe) handleConnectionError(err error, component string) {
254+
bp.mu.Lock()
255+
defer bp.mu.Unlock()
256+
257+
// Skip if already closed or not connected
258+
if bp.closed || !bp.connected {
259+
return
260+
}
261+
262+
// Mark as disconnected
263+
bp.connected = false
264+
265+
// Try to reconnect using internal context
266+
reconnectErr := bp.reconnectLocked()
267+
268+
if reconnectErr != nil {
269+
// Reconnection failed - log or handle as needed
270+
// For now, we'll just continue and wait for manual reconnection
271+
_ = err // Use the original error from the component
272+
_ = component // Component info available for potential logging by higher layers
273+
}
274+
}
275+
273276
// ForceReconnect forces a reconnection attempt immediately.
274277
// This can be used to force a reconnection if a new connection is established.
275278
func (bp *BackedPipe) ForceReconnect() error {

0 commit comments

Comments
 (0)