Skip to content

Commit 28927dc

Browse files
committed
PR review fixes
1 parent 85c505d commit 28927dc

File tree

4 files changed

+131
-125
lines changed

4 files changed

+131
-125
lines changed

agent/immortalstreams/backedpipe/backed_pipe.go

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,17 @@ const (
4949
// Implementations should:
5050
// 1. Establish a new connection to the remote side
5151
// 2. Exchange sequence numbers with the remote side
52-
// 3. Return the new connection and the remote's current sequence number
52+
// 3. Return the new connection and the remote's reader sequence number
5353
//
54-
// The writerSeqNum parameter is the local writer's current sequence number,
55-
// which should be sent to the remote side so it knows where to resume reading from.
54+
// The readerSeqNum parameter is the local reader's current sequence number
55+
// (total bytes successfully read from the remote). This must be sent to the
56+
// remote so it can replay its data to us starting from this number.
5657
//
57-
// The returned readerSeqNum should be the remote side's current sequence number,
58-
// which indicates where the local reader should resume from.
58+
// The returned remoteReaderSeqNum should be the remote side's reader sequence
59+
// number (how many bytes of our outbound data it has successfully read). This
60+
// informs our writer where to resume (i.e., which bytes to replay to the remote).
5961
type Reconnector interface {
60-
Reconnect(ctx context.Context, writerSeqNum uint64) (conn io.ReadWriteCloser, readerSeqNum uint64, err error)
62+
Reconnect(ctx context.Context, readerSeqNum uint64) (conn io.ReadWriteCloser, remoteReaderSeqNum uint64, err error)
6163
}
6264

6365
// BackedPipe provides a reliable bidirectional byte stream over unreliable network connections.
@@ -77,7 +79,7 @@ type BackedPipe struct {
7779
connGen uint64 // Increments on each successful reconnection
7880

7981
// Unified error handling with generation filtering
80-
errorChan chan ErrorEvent
82+
errChan chan ErrorEvent
8183

8284
// singleflight group to dedupe concurrent ForceReconnect calls
8385
sf singleflight.Group
@@ -91,20 +93,20 @@ type BackedPipe struct {
9193
func NewBackedPipe(ctx context.Context, reconnector Reconnector) *BackedPipe {
9294
pipeCtx, cancel := context.WithCancel(ctx)
9395

94-
errorChan := make(chan ErrorEvent, 10) // Buffered for async error reporting
96+
errChan := make(chan ErrorEvent, 1)
9597

9698
bp := &BackedPipe{
9799
ctx: pipeCtx,
98100
cancel: cancel,
99101
reconnector: reconnector,
100102
state: disconnected,
101103
connGen: 0, // Start with generation 0
102-
errorChan: errorChan,
104+
errChan: errChan,
103105
}
104106

105107
// Create reader and writer with typed error channel for generation-aware error reporting
106-
bp.reader = NewBackedReader(errorChan)
107-
bp.writer = NewBackedWriter(DefaultBufferSize, errorChan)
108+
bp.reader = NewBackedReader(errChan)
109+
bp.writer = NewBackedWriter(DefaultBufferSize, errChan)
108110

109111
// Start error handler goroutine
110112
go bp.handleErrors()
@@ -221,16 +223,29 @@ func (bp *BackedPipe) reconnectLocked() error {
221223
bp.conn = nil
222224
}
223225

224-
// Get current writer sequence number to send to remote
226+
// Gather local sequence numbers:
227+
// - readerSeqNum: how many bytes we've successfully read from remote (send to remote)
228+
// - writerSeqNum: how many bytes we've written so far (used for validation)
229+
readerSeqNum := bp.reader.SequenceNum()
225230
writerSeqNum := bp.writer.SequenceNum()
226231

227-
conn, readerSeqNum, err := bp.reconnector.Reconnect(bp.ctx, writerSeqNum)
232+
// Increment the generation and update both reader and writer.
233+
// We do it now to track even the connections that fail during
234+
// Reconnect.
235+
bp.connGen++
236+
bp.reader.SetGeneration(bp.connGen)
237+
bp.writer.SetGeneration(bp.connGen)
238+
239+
// Send our reader sequence number to the remote and receive its reader sequence
240+
// number. We will replay our outbound data from that point and backed pipe on the other
241+
// side will replay from our reader sequence number.
242+
conn, remoteReaderSeqNum, err := bp.reconnector.Reconnect(bp.ctx, readerSeqNum)
228243
if err != nil {
229244
return ErrReconnectFailed
230245
}
231246

232247
// Validate sequence numbers
233-
if readerSeqNum > writerSeqNum {
248+
if remoteReaderSeqNum > writerSeqNum {
234249
_ = conn.Close()
235250
return ErrInvalidSequenceNumber
236251
}
@@ -245,21 +260,17 @@ func (bp *BackedPipe) reconnectLocked() error {
245260
<-seqNum
246261
newR <- conn
247262

248-
err = bp.writer.Reconnect(readerSeqNum, conn)
263+
// Replay our outbound data from the remote's reader sequence number
264+
err = bp.writer.Reconnect(remoteReaderSeqNum, conn)
249265
if err != nil {
250266
_ = conn.Close()
251267
return ErrReconnectWriterFailed
252268
}
253269

254-
// Success - update state and increment connection generation
270+
// Success - update state
255271
bp.conn = conn
256-
bp.connGen++
257272
bp.state = connected
258273

259-
// Update the generation on reader and writer for error reporting
260-
bp.reader.SetGeneration(bp.connGen)
261-
bp.writer.SetGeneration(bp.connGen)
262-
263274
return nil
264275
}
265276

@@ -271,7 +282,7 @@ func (bp *BackedPipe) handleErrors() {
271282
select {
272283
case <-bp.ctx.Done():
273284
return
274-
case errorEvt := <-bp.errorChan:
285+
case errorEvt := <-bp.errChan:
275286
bp.handleConnectionError(errorEvt)
276287
}
277288
}

agent/immortalstreams/backedpipe/backed_pipe_test.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ type mockReconnector struct {
113113
}
114114

115115
// Reconnect implements the Reconnector interface
116-
func (m *mockReconnector) Reconnect(ctx context.Context, writerSeqNum uint64) (io.ReadWriteCloser, uint64, error) {
116+
func (m *mockReconnector) Reconnect(ctx context.Context, readerSeqNum uint64) (io.ReadWriteCloser, uint64, error) {
117117
m.callCount++
118118

119119
if m.connectionIndex >= len(m.connections) {
@@ -131,18 +131,19 @@ func (m *mockReconnector) Reconnect(ctx context.Context, writerSeqNum uint64) (i
131131
}
132132
}
133133

134-
// Determine readerSeqNum based on call count
135-
var readerSeqNum uint64
134+
// Determine remoteReaderSeqNum (how many bytes of our outbound data the remote has read)
135+
var remoteReaderSeqNum uint64
136136
switch {
137137
case m.callCount == 1:
138-
readerSeqNum = 0
138+
remoteReaderSeqNum = 0
139139
case conn.seqNum != 0:
140-
readerSeqNum = conn.seqNum
140+
remoteReaderSeqNum = conn.seqNum
141141
default:
142-
readerSeqNum = writerSeqNum
142+
// Default to 0 if unspecified
143+
remoteReaderSeqNum = 0
143144
}
144145

145-
return conn, readerSeqNum, nil
146+
return conn, remoteReaderSeqNum, nil
146147
}
147148

148149
// mockReconnectFunc creates a unified reconnector with all behaviors enabled
@@ -168,7 +169,7 @@ type blockingReconnector struct {
168169
signalOnce sync.Once // Ensure we only signal once for the first actual reconnect
169170
}
170171

171-
func (b *blockingReconnector) Reconnect(ctx context.Context, writerSeqNum uint64) (io.ReadWriteCloser, uint64, error) {
172+
func (b *blockingReconnector) Reconnect(ctx context.Context, readerSeqNum uint64) (io.ReadWriteCloser, uint64, error) {
172173
b.mu.Lock()
173174
b.callCount++
174175
currentCall := b.callCount
@@ -197,7 +198,7 @@ func (b *blockingReconnector) Reconnect(ctx context.Context, writerSeqNum uint64
197198
return nil, 0, ctx.Err()
198199
}
199200

200-
return b.conn2, writerSeqNum, nil
201+
return b.conn2, 0, nil
201202
}
202203

203204
func mockBlockingReconnectFunc(conn1, conn2 *mockConnection, blockChan <-chan struct{}) (backedpipe.Reconnector, *int, chan struct{}) {
@@ -219,15 +220,16 @@ type eofTestReconnector struct {
219220
callCount *int
220221
}
221222

222-
func (e *eofTestReconnector) Reconnect(ctx context.Context, writerSeqNum uint64) (io.ReadWriteCloser, uint64, error) {
223+
func (e *eofTestReconnector) Reconnect(ctx context.Context, readerSeqNum uint64) (io.ReadWriteCloser, uint64, error) {
223224
*e.callCount++
224225

225226
if *e.callCount == 1 {
226227
return e.conn1, 0, nil
227228
}
228229
if *e.callCount == 2 {
229230
// Second call is the reconnection after EOF
230-
return e.conn2, writerSeqNum, nil // conn2 already has the reader sequence at writerSeqNum
231+
// Return 5 to indicate remote has read all 5 bytes of "hello"
232+
return e.conn2, 5, nil
231233
}
232234

233235
return nil, 0, xerrors.New("no more connections")
@@ -518,6 +520,8 @@ func TestBackedPipe_ForceReconnect(t *testing.T) {
518520
ctx := context.Background()
519521
conn1 := newMockConnection()
520522
conn2 := newMockConnection()
523+
// Set conn2 sequence number to 9 to indicate remote has read all 9 bytes of "test data"
524+
conn2.seqNum = 9
521525
reconnectFn, callCount, _ := mockReconnectFunc(conn1, conn2)
522526

523527
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
@@ -540,7 +544,7 @@ func TestBackedPipe_ForceReconnect(t *testing.T) {
540544
require.True(t, bp.Connected())
541545
require.Equal(t, 2, *callCount)
542546

543-
// Since the mock now returns the proper sequence number, no data should be replayed
547+
// Since the mock returns the proper sequence number, no data should be replayed
544548
// The new connection should be empty
545549
require.Equal(t, "", conn2.ReadString())
546550

0 commit comments

Comments
 (0)