19
19
ErrReconnectWriterFailed = xerrors .New ("reconnect writer failed" )
20
20
)
21
21
22
+ // connectionState represents the current state of the BackedPipe connection.
23
+ type connectionState int
24
+
25
+ const (
26
+ // connected indicates the pipe is connected and operational.
27
+ connected connectionState = iota
28
+ // disconnected indicates the pipe is not connected but not closed.
29
+ disconnected
30
+ // reconnecting indicates a reconnection attempt is in progress.
31
+ reconnecting
32
+ // closed indicates the pipe is permanently closed.
33
+ closed
34
+ )
35
+
36
+ // ErrorEvent represents an error from a reader or writer with connection generation info.
37
+ type ErrorEvent struct {
38
+ Err error
39
+ Component string // "reader" or "writer"
40
+ Generation uint64 // connection generation when error occurred
41
+ }
42
+
22
43
const (
23
44
// Default buffer capacity used by the writer - 64MB
24
45
DefaultBufferSize = 64 * 1024 * 1024
@@ -50,37 +71,41 @@ type BackedPipe struct {
50
71
writer * BackedWriter
51
72
reconnector Reconnector
52
73
conn io.ReadWriteCloser
53
- connected bool
54
- closed bool
55
74
56
- // Reconnection state
57
- reconnecting bool
75
+ // State machine
76
+ state connectionState
77
+ connGen uint64 // Increments on each successful reconnection
58
78
59
- // Error channels for receiving connection errors from reader/writer separately
60
- readerErrorChan chan error
61
- writerErrorChan chan error
79
+ // Unified error handling with generation filtering
80
+ errorChan chan ErrorEvent
62
81
63
82
// singleflight group to dedupe concurrent ForceReconnect calls
64
83
sf singleflight.Group
84
+
85
+ // Track first error per generation to avoid duplicate reconnections
86
+ lastErrorGen uint64
65
87
}
66
88
67
89
// NewBackedPipe creates a new BackedPipe with default options and the specified reconnector.
68
90
// The pipe starts disconnected and must be connected using Connect().
69
91
func NewBackedPipe (ctx context.Context , reconnector Reconnector ) * BackedPipe {
70
92
pipeCtx , cancel := context .WithCancel (ctx )
71
93
72
- readerErrorChan := make (chan error , 1 ) // Buffer for reader errors
73
- writerErrorChan := make ( chan error , 1 ) // Buffer for writer errors
94
+ errorChan := make (chan ErrorEvent , 10 ) // Buffered for async error reporting
95
+
74
96
bp := & BackedPipe {
75
- ctx : pipeCtx ,
76
- cancel : cancel ,
77
- reader : NewBackedReader (readerErrorChan ),
78
- writer : NewBackedWriter (DefaultBufferSize , writerErrorChan ),
79
- reconnector : reconnector ,
80
- readerErrorChan : readerErrorChan ,
81
- writerErrorChan : writerErrorChan ,
97
+ ctx : pipeCtx ,
98
+ cancel : cancel ,
99
+ reconnector : reconnector ,
100
+ state : disconnected ,
101
+ connGen : 0 , // Start with generation 0
102
+ errorChan : errorChan ,
82
103
}
83
104
105
+ // Create reader and writer with typed error channel for generation-aware error reporting
106
+ bp .reader = NewBackedReader (errorChan )
107
+ bp .writer = NewBackedWriter (DefaultBufferSize , errorChan )
108
+
84
109
// Start error handler goroutine
85
110
go bp .handleErrors ()
86
111
@@ -92,11 +117,11 @@ func (bp *BackedPipe) Connect() error {
92
117
bp .mu .Lock ()
93
118
defer bp .mu .Unlock ()
94
119
95
- if bp .closed {
120
+ if bp .state == closed {
96
121
return ErrPipeClosed
97
122
}
98
123
99
- if bp .connected {
124
+ if bp .state == connected {
100
125
return ErrPipeAlreadyConnected
101
126
}
102
127
@@ -114,10 +139,10 @@ func (bp *BackedPipe) Read(p []byte) (int, error) {
114
139
func (bp * BackedPipe ) Write (p []byte ) (int , error ) {
115
140
bp .mu .RLock ()
116
141
writer := bp .writer
117
- closed := bp .closed
142
+ state := bp .state
118
143
bp .mu .RUnlock ()
119
144
120
- if closed {
145
+ if state == closed {
121
146
return 0 , io .EOF
122
147
}
123
148
@@ -129,11 +154,11 @@ func (bp *BackedPipe) Close() error {
129
154
bp .mu .Lock ()
130
155
defer bp .mu .Unlock ()
131
156
132
- if bp .closed {
157
+ if bp .state == closed {
133
158
return nil
134
159
}
135
160
136
- bp .closed = true
161
+ bp .state = closed
137
162
bp .cancel () // Cancel main context
138
163
139
164
// Close all components in parallel to avoid deadlocks
@@ -164,8 +189,6 @@ func (bp *BackedPipe) Close() error {
164
189
})
165
190
}
166
191
167
- bp .connected = false
168
-
169
192
// Wait for all close operations to complete and return any error
170
193
return g .Wait ()
171
194
}
@@ -174,18 +197,22 @@ func (bp *BackedPipe) Close() error {
174
197
func (bp * BackedPipe ) Connected () bool {
175
198
bp .mu .RLock ()
176
199
defer bp .mu .RUnlock ()
177
- return bp .connected
200
+ return bp .state == connected
178
201
}
179
202
180
203
// reconnectLocked handles the reconnection logic. Must be called with write lock held.
181
204
func (bp * BackedPipe ) reconnectLocked () error {
182
- if bp .reconnecting {
205
+ if bp .state == reconnecting {
183
206
return ErrReconnectionInProgress
184
207
}
185
208
186
- bp .reconnecting = true
209
+ bp .state = reconnecting
187
210
defer func () {
188
- bp .reconnecting = false
211
+ // Only reset to disconnected if we're still in reconnecting state
212
+ // (successful reconnection will set state to connected)
213
+ if bp .state == reconnecting {
214
+ bp .state = disconnected
215
+ }
189
216
}()
190
217
191
218
// Close existing connection if any
@@ -194,8 +221,6 @@ func (bp *BackedPipe) reconnectLocked() error {
194
221
bp .conn = nil
195
222
}
196
223
197
- bp .connected = false
198
-
199
224
// Get current writer sequence number to send to remote
200
225
writerSeqNum := bp .writer .SequenceNum ()
201
226
@@ -226,55 +251,78 @@ func (bp *BackedPipe) reconnectLocked() error {
226
251
return ErrReconnectWriterFailed
227
252
}
228
253
229
- // Success - update state
254
+ // Success - update state and increment connection generation
230
255
bp .conn = conn
231
- bp .connected = true
256
+ bp .connGen ++
257
+ bp .state = connected
258
+
259
+ // Update the generation on reader and writer for error reporting
260
+ bp .reader .SetGeneration (bp .connGen )
261
+ bp .writer .SetGeneration (bp .connGen )
232
262
233
263
return nil
234
264
}
235
265
236
266
// handleErrors listens for connection errors from reader/writer and triggers reconnection.
267
+ // It filters errors from old connections and ensures only the first error per generation
268
+ // triggers reconnection.
237
269
func (bp * BackedPipe ) handleErrors () {
238
270
for {
239
271
select {
240
272
case <- bp .ctx .Done ():
241
273
return
242
- case err := <- bp .readerErrorChan :
243
- // Reader connection error occurred
244
- bp .handleConnectionError (err , "reader" )
245
- case err := <- bp .writerErrorChan :
246
- // Writer connection error occurred
247
- bp .handleConnectionError (err , "writer" )
274
+ case errorEvt := <- bp .errorChan :
275
+ bp .handleConnectionError (errorEvt )
248
276
}
249
277
}
250
278
}
251
279
252
280
// handleConnectionError handles errors from either reader or writer components.
253
- func (bp * BackedPipe ) handleConnectionError (err error , component string ) {
281
+ // It filters errors from old connections and ensures only one reconnection per generation.
282
+ func (bp * BackedPipe ) handleConnectionError (errorEvt ErrorEvent ) {
254
283
bp .mu .Lock ()
255
284
defer bp .mu .Unlock ()
256
285
257
- // Skip if already closed or not connected
258
- if bp .closed || ! bp . connected {
286
+ // Skip if already closed
287
+ if bp .state == closed {
259
288
return
260
289
}
261
290
291
+ // Filter errors from old connections (lower generation)
292
+ if errorEvt .Generation < bp .connGen {
293
+ return
294
+ }
295
+
296
+ // Skip if not connected (already disconnected or reconnecting)
297
+ if bp .state != connected {
298
+ return
299
+ }
300
+
301
+ // Skip if we've already seen an error for this generation
302
+ if bp .lastErrorGen >= errorEvt .Generation {
303
+ return
304
+ }
305
+
306
+ // This is the first error for this generation
307
+ bp .lastErrorGen = errorEvt .Generation
308
+
262
309
// Mark as disconnected
263
- bp .connected = false
310
+ bp .state = disconnected
264
311
265
312
// Try to reconnect using internal context
266
313
reconnectErr := bp .reconnectLocked ()
267
314
268
315
if reconnectErr != nil {
269
316
// Reconnection failed - log or handle as needed
270
317
// For now, we'll just continue and wait for manual reconnection
271
- _ = err // Use the original error from the component
272
- _ = component // Component info available for potential logging by higher layers
318
+ _ = errorEvt . Err // Use the original error from the component
319
+ _ = errorEvt . Component // Component info available for potential logging by higher layers
273
320
}
274
321
}
275
322
276
323
// ForceReconnect forces a reconnection attempt immediately.
277
324
// This can be used to force a reconnection if a new connection is established.
325
+ // It prevents duplicate reconnections when called concurrently.
278
326
func (bp * BackedPipe ) ForceReconnect () error {
279
327
// Deduplicate concurrent ForceReconnect calls so only one reconnection
280
328
// attempt runs at a time from this API. Use the pipe's internal context
@@ -283,10 +331,15 @@ func (bp *BackedPipe) ForceReconnect() error {
283
331
bp .mu .Lock ()
284
332
defer bp .mu .Unlock ()
285
333
286
- if bp .closed {
334
+ if bp .state == closed {
287
335
return nil , io .EOF
288
336
}
289
337
338
+ // Don't force reconnect if already reconnecting
339
+ if bp .state == reconnecting {
340
+ return nil , ErrReconnectionInProgress
341
+ }
342
+
290
343
return nil , bp .reconnectLocked ()
291
344
})
292
345
return err
0 commit comments