@@ -49,15 +49,17 @@ const (
49
49
// Implementations should:
50
50
// 1. Establish a new connection to the remote side
51
51
// 2. Exchange sequence numbers with the remote side
52
- // 3. Return the new connection and the remote's current sequence number
52
+ // 3. Return the new connection and the remote's reader sequence number
53
53
//
54
- // The writerSeqNum parameter is the local writer's current sequence number,
55
- // which should be sent to the remote side so it knows where to resume reading from.
54
+ // The readerSeqNum parameter is the local reader's current sequence number
55
+ // (total bytes successfully read from the remote). This must be sent to the
56
+ // remote so it can replay its data to us starting from this number.
56
57
//
57
- // The returned readerSeqNum should be the remote side's current sequence number,
58
- // which indicates where the local reader should resume from.
58
+ // The returned remoteReaderSeqNum should be the remote side's reader sequence
59
+ // number (how many bytes of our outbound data it has successfully read). This
60
+ // informs our writer where to resume (i.e., which bytes to replay to the remote).
59
61
type Reconnector interface {
60
- Reconnect (ctx context.Context , writerSeqNum uint64 ) (conn io.ReadWriteCloser , readerSeqNum uint64 , err error )
62
+ Reconnect (ctx context.Context , readerSeqNum uint64 ) (conn io.ReadWriteCloser , remoteReaderSeqNum uint64 , err error )
61
63
}
62
64
63
65
// BackedPipe provides a reliable bidirectional byte stream over unreliable network connections.
@@ -77,7 +79,7 @@ type BackedPipe struct {
77
79
connGen uint64 // Increments on each successful reconnection
78
80
79
81
// Unified error handling with generation filtering
80
- errorChan chan ErrorEvent
82
+ errChan chan ErrorEvent
81
83
82
84
// singleflight group to dedupe concurrent ForceReconnect calls
83
85
sf singleflight.Group
@@ -91,20 +93,20 @@ type BackedPipe struct {
91
93
func NewBackedPipe (ctx context.Context , reconnector Reconnector ) * BackedPipe {
92
94
pipeCtx , cancel := context .WithCancel (ctx )
93
95
94
- errorChan := make (chan ErrorEvent , 10 ) // Buffered for async error reporting
96
+ errChan := make (chan ErrorEvent , 1 )
95
97
96
98
bp := & BackedPipe {
97
99
ctx : pipeCtx ,
98
100
cancel : cancel ,
99
101
reconnector : reconnector ,
100
102
state : disconnected ,
101
103
connGen : 0 , // Start with generation 0
102
- errorChan : errorChan ,
104
+ errChan : errChan ,
103
105
}
104
106
105
107
// Create reader and writer with typed error channel for generation-aware error reporting
106
- bp .reader = NewBackedReader (errorChan )
107
- bp .writer = NewBackedWriter (DefaultBufferSize , errorChan )
108
+ bp .reader = NewBackedReader (errChan )
109
+ bp .writer = NewBackedWriter (DefaultBufferSize , errChan )
108
110
109
111
// Start error handler goroutine
110
112
go bp .handleErrors ()
@@ -221,16 +223,29 @@ func (bp *BackedPipe) reconnectLocked() error {
221
223
bp .conn = nil
222
224
}
223
225
224
- // Get current writer sequence number to send to remote
226
+ // Gather local sequence numbers:
227
+ // - readerSeqNum: how many bytes we've successfully read from remote (send to remote)
228
+ // - writerSeqNum: how many bytes we've written so far (used for validation)
229
+ readerSeqNum := bp .reader .SequenceNum ()
225
230
writerSeqNum := bp .writer .SequenceNum ()
226
231
227
- conn , readerSeqNum , err := bp .reconnector .Reconnect (bp .ctx , writerSeqNum )
232
+ // Increment the generation and update both reader and writer.
233
+ // We do it now to track even the connections that fail during
234
+ // Reconnect.
235
+ bp .connGen ++
236
+ bp .reader .SetGeneration (bp .connGen )
237
+ bp .writer .SetGeneration (bp .connGen )
238
+
239
+ // Send our reader sequence number to the remote and receive its reader sequence
240
+ // number. We will replay our outbound data from that point and backed pipe on the other
241
+ // side will replay from our reader sequence number.
242
+ conn , remoteReaderSeqNum , err := bp .reconnector .Reconnect (bp .ctx , readerSeqNum )
228
243
if err != nil {
229
244
return ErrReconnectFailed
230
245
}
231
246
232
247
// Validate sequence numbers
233
- if readerSeqNum > writerSeqNum {
248
+ if remoteReaderSeqNum > writerSeqNum {
234
249
_ = conn .Close ()
235
250
return ErrInvalidSequenceNumber
236
251
}
@@ -245,21 +260,17 @@ func (bp *BackedPipe) reconnectLocked() error {
245
260
<- seqNum
246
261
newR <- conn
247
262
248
- err = bp .writer .Reconnect (readerSeqNum , conn )
263
+ // Replay our outbound data from the remote's reader sequence number
264
+ err = bp .writer .Reconnect (remoteReaderSeqNum , conn )
249
265
if err != nil {
250
266
_ = conn .Close ()
251
267
return ErrReconnectWriterFailed
252
268
}
253
269
254
- // Success - update state and increment connection generation
270
+ // Success - update state
255
271
bp .conn = conn
256
- bp .connGen ++
257
272
bp .state = connected
258
273
259
- // Update the generation on reader and writer for error reporting
260
- bp .reader .SetGeneration (bp .connGen )
261
- bp .writer .SetGeneration (bp .connGen )
262
-
263
274
return nil
264
275
}
265
276
@@ -271,7 +282,7 @@ func (bp *BackedPipe) handleErrors() {
271
282
select {
272
283
case <- bp .ctx .Done ():
273
284
return
274
- case errorEvt := <- bp .errorChan :
285
+ case errorEvt := <- bp .errChan :
275
286
bp .handleConnectionError (errorEvt )
276
287
}
277
288
}
0 commit comments