Skip to content

Commit 064514e

Browse files
committed
added connection state to backed_pipe and connection generation for better error tracking
1 parent 2c2db8e commit 064514e

File tree

6 files changed

+452
-114
lines changed

6 files changed

+452
-114
lines changed

agent/immortalstreams/backedpipe/backed_pipe.go

Lines changed: 98 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,27 @@ var (
1919
ErrReconnectWriterFailed = xerrors.New("reconnect writer failed")
2020
)
2121

22+
// connectionState represents the current state of the BackedPipe connection.
23+
type connectionState int
24+
25+
const (
26+
// connected indicates the pipe is connected and operational.
27+
connected connectionState = iota
28+
// disconnected indicates the pipe is not connected but not closed.
29+
disconnected
30+
// reconnecting indicates a reconnection attempt is in progress.
31+
reconnecting
32+
// closed indicates the pipe is permanently closed.
33+
closed
34+
)
35+
36+
// ErrorEvent represents an error from a reader or writer with connection generation info.
37+
type ErrorEvent struct {
38+
Err error
39+
Component string // "reader" or "writer"
40+
Generation uint64 // connection generation when error occurred
41+
}
42+
2243
const (
2344
// Default buffer capacity used by the writer - 64MB
2445
DefaultBufferSize = 64 * 1024 * 1024
@@ -50,37 +71,41 @@ type BackedPipe struct {
5071
writer *BackedWriter
5172
reconnector Reconnector
5273
conn io.ReadWriteCloser
53-
connected bool
54-
closed bool
5574

56-
// Reconnection state
57-
reconnecting bool
75+
// State machine
76+
state connectionState
77+
connGen uint64 // Increments on each successful reconnection
5878

59-
// Error channels for receiving connection errors from reader/writer separately
60-
readerErrorChan chan error
61-
writerErrorChan chan error
79+
// Unified error handling with generation filtering
80+
errorChan chan ErrorEvent
6281

6382
// singleflight group to dedupe concurrent ForceReconnect calls
6483
sf singleflight.Group
84+
85+
// Track first error per generation to avoid duplicate reconnections
86+
lastErrorGen uint64
6587
}
6688

6789
// NewBackedPipe creates a new BackedPipe with default options and the specified reconnector.
6890
// The pipe starts disconnected and must be connected using Connect().
6991
func NewBackedPipe(ctx context.Context, reconnector Reconnector) *BackedPipe {
7092
pipeCtx, cancel := context.WithCancel(ctx)
7193

72-
readerErrorChan := make(chan error, 1) // Buffer for reader errors
73-
writerErrorChan := make(chan error, 1) // Buffer for writer errors
94+
errorChan := make(chan ErrorEvent, 10) // Buffered for async error reporting
95+
7496
bp := &BackedPipe{
75-
ctx: pipeCtx,
76-
cancel: cancel,
77-
reader: NewBackedReader(readerErrorChan),
78-
writer: NewBackedWriter(DefaultBufferSize, writerErrorChan),
79-
reconnector: reconnector,
80-
readerErrorChan: readerErrorChan,
81-
writerErrorChan: writerErrorChan,
97+
ctx: pipeCtx,
98+
cancel: cancel,
99+
reconnector: reconnector,
100+
state: disconnected,
101+
connGen: 0, // Start with generation 0
102+
errorChan: errorChan,
82103
}
83104

105+
// Create reader and writer with typed error channel for generation-aware error reporting
106+
bp.reader = NewBackedReader(errorChan)
107+
bp.writer = NewBackedWriter(DefaultBufferSize, errorChan)
108+
84109
// Start error handler goroutine
85110
go bp.handleErrors()
86111

@@ -92,11 +117,11 @@ func (bp *BackedPipe) Connect() error {
92117
bp.mu.Lock()
93118
defer bp.mu.Unlock()
94119

95-
if bp.closed {
120+
if bp.state == closed {
96121
return ErrPipeClosed
97122
}
98123

99-
if bp.connected {
124+
if bp.state == connected {
100125
return ErrPipeAlreadyConnected
101126
}
102127

@@ -114,10 +139,10 @@ func (bp *BackedPipe) Read(p []byte) (int, error) {
114139
func (bp *BackedPipe) Write(p []byte) (int, error) {
115140
bp.mu.RLock()
116141
writer := bp.writer
117-
closed := bp.closed
142+
state := bp.state
118143
bp.mu.RUnlock()
119144

120-
if closed {
145+
if state == closed {
121146
return 0, io.EOF
122147
}
123148

@@ -129,11 +154,11 @@ func (bp *BackedPipe) Close() error {
129154
bp.mu.Lock()
130155
defer bp.mu.Unlock()
131156

132-
if bp.closed {
157+
if bp.state == closed {
133158
return nil
134159
}
135160

136-
bp.closed = true
161+
bp.state = closed
137162
bp.cancel() // Cancel main context
138163

139164
// Close all components in parallel to avoid deadlocks
@@ -164,8 +189,6 @@ func (bp *BackedPipe) Close() error {
164189
})
165190
}
166191

167-
bp.connected = false
168-
169192
// Wait for all close operations to complete and return any error
170193
return g.Wait()
171194
}
@@ -174,18 +197,22 @@ func (bp *BackedPipe) Close() error {
174197
func (bp *BackedPipe) Connected() bool {
175198
bp.mu.RLock()
176199
defer bp.mu.RUnlock()
177-
return bp.connected
200+
return bp.state == connected
178201
}
179202

180203
// reconnectLocked handles the reconnection logic. Must be called with write lock held.
181204
func (bp *BackedPipe) reconnectLocked() error {
182-
if bp.reconnecting {
205+
if bp.state == reconnecting {
183206
return ErrReconnectionInProgress
184207
}
185208

186-
bp.reconnecting = true
209+
bp.state = reconnecting
187210
defer func() {
188-
bp.reconnecting = false
211+
// Only reset to disconnected if we're still in reconnecting state
212+
// (successful reconnection will set state to connected)
213+
if bp.state == reconnecting {
214+
bp.state = disconnected
215+
}
189216
}()
190217

191218
// Close existing connection if any
@@ -194,8 +221,6 @@ func (bp *BackedPipe) reconnectLocked() error {
194221
bp.conn = nil
195222
}
196223

197-
bp.connected = false
198-
199224
// Get current writer sequence number to send to remote
200225
writerSeqNum := bp.writer.SequenceNum()
201226

@@ -226,55 +251,78 @@ func (bp *BackedPipe) reconnectLocked() error {
226251
return ErrReconnectWriterFailed
227252
}
228253

229-
// Success - update state
254+
// Success - update state and increment connection generation
230255
bp.conn = conn
231-
bp.connected = true
256+
bp.connGen++
257+
bp.state = connected
258+
259+
// Update the generation on reader and writer for error reporting
260+
bp.reader.SetGeneration(bp.connGen)
261+
bp.writer.SetGeneration(bp.connGen)
232262

233263
return nil
234264
}
235265

236266
// handleErrors listens for connection errors from reader/writer and triggers reconnection.
267+
// It filters errors from old connections and ensures only the first error per generation
268+
// triggers reconnection.
237269
func (bp *BackedPipe) handleErrors() {
238270
for {
239271
select {
240272
case <-bp.ctx.Done():
241273
return
242-
case err := <-bp.readerErrorChan:
243-
// Reader connection error occurred
244-
bp.handleConnectionError(err, "reader")
245-
case err := <-bp.writerErrorChan:
246-
// Writer connection error occurred
247-
bp.handleConnectionError(err, "writer")
274+
case errorEvt := <-bp.errorChan:
275+
bp.handleConnectionError(errorEvt)
248276
}
249277
}
250278
}
251279

252280
// handleConnectionError handles errors from either reader or writer components.
253-
func (bp *BackedPipe) handleConnectionError(err error, component string) {
281+
// It filters errors from old connections and ensures only one reconnection per generation.
282+
func (bp *BackedPipe) handleConnectionError(errorEvt ErrorEvent) {
254283
bp.mu.Lock()
255284
defer bp.mu.Unlock()
256285

257-
// Skip if already closed or not connected
258-
if bp.closed || !bp.connected {
286+
// Skip if already closed
287+
if bp.state == closed {
259288
return
260289
}
261290

291+
// Filter errors from old connections (lower generation)
292+
if errorEvt.Generation < bp.connGen {
293+
return
294+
}
295+
296+
// Skip if not connected (already disconnected or reconnecting)
297+
if bp.state != connected {
298+
return
299+
}
300+
301+
// Skip if we've already seen an error for this generation
302+
if bp.lastErrorGen >= errorEvt.Generation {
303+
return
304+
}
305+
306+
// This is the first error for this generation
307+
bp.lastErrorGen = errorEvt.Generation
308+
262309
// Mark as disconnected
263-
bp.connected = false
310+
bp.state = disconnected
264311

265312
// Try to reconnect using internal context
266313
reconnectErr := bp.reconnectLocked()
267314

268315
if reconnectErr != nil {
269316
// Reconnection failed - log or handle as needed
270317
// For now, we'll just continue and wait for manual reconnection
271-
_ = err // Use the original error from the component
272-
_ = component // Component info available for potential logging by higher layers
318+
_ = errorEvt.Err // Use the original error from the component
319+
_ = errorEvt.Component // Component info available for potential logging by higher layers
273320
}
274321
}
275322

276323
// ForceReconnect forces a reconnection attempt immediately.
277324
// This can be used to force a reconnection if a new connection is established.
325+
// It prevents duplicate reconnections when called concurrently.
278326
func (bp *BackedPipe) ForceReconnect() error {
279327
// Deduplicate concurrent ForceReconnect calls so only one reconnection
280328
// attempt runs at a time from this API. Use the pipe's internal context
@@ -283,10 +331,15 @@ func (bp *BackedPipe) ForceReconnect() error {
283331
bp.mu.Lock()
284332
defer bp.mu.Unlock()
285333

286-
if bp.closed {
334+
if bp.state == closed {
287335
return nil, io.EOF
288336
}
289337

338+
// Don't force reconnect if already reconnecting
339+
if bp.state == reconnecting {
340+
return nil, ErrReconnectionInProgress
341+
}
342+
290343
return nil, bp.reconnectLocked()
291344
})
292345
return err

0 commit comments

Comments
 (0)