Skip to content

Commit 3af1086

Browse files
committed
WIP
1 parent cf9ff39 commit 3af1086

File tree

3 files changed

+35
-24
lines changed

3 files changed

+35
-24
lines changed

agent/immortalstreams/stream.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,11 +168,15 @@ func (s *Stream) HandleReconnect(clientConn io.ReadWriteCloser, readSeqNum uint6
168168

169169
// Helper to start a reconnect attempt once (single-flight)
170170
startReconnectIfNeeded := func() {
171-
if !s.reconnecting {
171+
// Only trigger reconnect if:
172+
// - No reconnect goroutine is in-flight
173+
// - There is no pending reconnect request waiting for a client
174+
// - The pipe is not already connected
175+
if !s.reconnecting && s.pendingReconnect == nil && (s.pipe == nil || !s.pipe.Connected()) {
172176
s.reconnecting = true
173177
go func() {
174178
s.logger.Debug(context.Background(), "calling ForceReconnect")
175-
err := s.pipe.ForceReconnect(context.Background())
179+
err := s.pipe.ForceReconnect()
176180
s.logger.Debug(context.Background(), "force reconnect returned", slog.Error(err))
177181
s.mu.Lock()
178182
s.reconnecting = false
@@ -245,8 +249,6 @@ func (s *Stream) HandleReconnect(clientConn io.ReadWriteCloser, readSeqNum uint6
245249
s.reconnectCond.Wait()
246250
// Loop will re-check conditions under lock to avoid lost wakeups.
247251
}
248-
249-
// Unreachable
250252
}
251253

252254
// Close closes the stream

coderd/agentapi/backedpipe/backed_pipe.go

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"sync"
77
"time"
88

9+
"golang.org/x/sync/singleflight"
910
"golang.org/x/xerrors"
1011
)
1112

@@ -49,6 +50,9 @@ type BackedPipe struct {
4950

5051
// Connection state notification
5152
connectionChanged chan struct{}
53+
54+
// singleflight group to dedupe concurrent ForceReconnect calls
55+
sf singleflight.Group
5256
}
5357

5458
// NewBackedPipe creates a new BackedPipe with default options and the specified reconnect function.
@@ -88,7 +92,7 @@ func NewBackedPipe(ctx context.Context, reconnectFn ReconnectFunc) *BackedPipe {
8892
}
8993

9094
// Connect establishes the initial connection using the reconnect function.
91-
func (bp *BackedPipe) Connect(ctx context.Context) error {
95+
func (bp *BackedPipe) Connect(_ context.Context) error { // external ctx ignored; internal ctx used
9296
bp.mu.Lock()
9397
defer bp.mu.Unlock()
9498

@@ -100,7 +104,9 @@ func (bp *BackedPipe) Connect(ctx context.Context) error {
100104
return xerrors.New("pipe is already connected")
101105
}
102106

103-
return bp.reconnectLocked(ctx)
107+
// Use internal context for the actual reconnect operation to ensure
108+
// Close() reliably cancels any in-flight attempt.
109+
return bp.reconnectLocked()
104110
}
105111

106112
// Read implements io.Reader by delegating to the BackedReader.
@@ -189,7 +195,7 @@ func (bp *BackedPipe) signalConnectionChange() {
189195
}
190196

191197
// reconnectLocked handles the reconnection logic. Must be called with write lock held.
192-
func (bp *BackedPipe) reconnectLocked(ctx context.Context) error {
198+
func (bp *BackedPipe) reconnectLocked() error {
193199
if bp.reconnecting {
194200
return xerrors.New("reconnection already in progress")
195201
}
@@ -213,7 +219,7 @@ func (bp *BackedPipe) reconnectLocked(ctx context.Context) error {
213219

214220
// Unlock during reconnect attempt to avoid blocking reads/writes
215221
bp.mu.Unlock()
216-
conn, readerSeqNum, err := bp.reconnectFn(ctx, writerSeqNum)
222+
conn, readerSeqNum, err := bp.reconnectFn(bp.ctx, writerSeqNum)
217223
bp.mu.Lock()
218224

219225
if err != nil {
@@ -281,8 +287,8 @@ func (bp *BackedPipe) handleErrors() {
281287
bp.connected = false
282288
bp.signalConnectionChange()
283289

284-
// Try to reconnect
285-
reconnectErr := bp.reconnectLocked(bp.ctx)
290+
// Try to reconnect using internal context
291+
reconnectErr := bp.reconnectLocked()
286292
bp.mu.Unlock()
287293

288294
if reconnectErr != nil {
@@ -323,16 +329,19 @@ func (bp *BackedPipe) WaitForConnection(ctx context.Context) error {
323329

324330
// ForceReconnect forces a reconnection attempt immediately.
325331
// This can be used to force a reconnection if a new connection is established.
326-
func (bp *BackedPipe) ForceReconnect(ctx context.Context) error {
327-
bp.mu.Lock()
328-
defer bp.mu.Unlock()
329-
330-
if bp.closed {
331-
return io.ErrClosedPipe
332-
}
332+
func (bp *BackedPipe) ForceReconnect() error {
333+
// Deduplicate concurrent ForceReconnect calls so only one reconnection
334+
// attempt runs at a time from this API. Use the pipe's internal context
335+
// to ensure Close() cancels any in-flight attempt.
336+
_, err, _ := bp.sf.Do("backedpipe-reconnect", func() (interface{}, error) {
337+
bp.mu.Lock()
338+
defer bp.mu.Unlock()
339+
340+
if bp.closed {
341+
return nil, io.ErrClosedPipe
342+
}
333343

334-
// Use the pipe's internal context so that Close() reliably cancels any
335-
// in-flight reconnection attempts. An external context here can outlive
336-
// the pipe and cause goroutines to block indefinitely.
337-
return bp.reconnectLocked(bp.ctx)
344+
return nil, bp.reconnectLocked()
345+
})
346+
return err
338347
}

coderd/agentapi/backedpipe/backed_pipe_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ func TestBackedPipe_ForceReconnect(t *testing.T) {
554554
require.Equal(t, "test data", conn1.ReadString())
555555

556556
// Force a reconnection
557-
err = bp.ForceReconnect(ctx)
557+
err = bp.ForceReconnect()
558558
require.NoError(t, err)
559559
require.True(t, bp.Connected())
560560
require.Equal(t, 2, *callCount)
@@ -591,7 +591,7 @@ func TestBackedPipe_ForceReconnectWhenClosed(t *testing.T) {
591591
require.NoError(t, err)
592592

593593
// Try to force reconnect when closed
594-
err = bp.ForceReconnect(ctx)
594+
err = bp.ForceReconnect()
595595
require.Error(t, err)
596596
require.Equal(t, io.ErrClosedPipe, err)
597597
}
@@ -607,7 +607,7 @@ func TestBackedPipe_ForceReconnectWhenDisconnected(t *testing.T) {
607607
defer bp.Close()
608608

609609
// Don't connect initially, just force reconnect
610-
err := bp.ForceReconnect(ctx)
610+
err := bp.ForceReconnect()
611611
require.NoError(t, err)
612612
require.True(t, bp.Connected())
613613
require.Equal(t, 1, *callCount)

0 commit comments

Comments
 (0)