Skip to content

Commit 16abe05

Browse files
committed
improvement to backed pipe concurrency and tests
1 parent 77e912f commit 16abe05

File tree

4 files changed

+81
-25
lines changed

4 files changed

+81
-25
lines changed

coderd/agentapi/backedpipe/backed_pipe.go

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"io"
66
"sync"
7+
"time"
78

9+
"golang.org/x/sync/singleflight"
810
"golang.org/x/xerrors"
911
)
1012

@@ -48,6 +50,9 @@ type BackedPipe struct {
4850

4951
// Connection state notification
5052
connectionChanged chan struct{}
53+
54+
// singleflight group to dedupe concurrent ForceReconnect calls
55+
sf singleflight.Group
5156
}
5257

5358
// NewBackedPipe creates a new BackedPipe with default options and the specified reconnect function.
@@ -87,7 +92,7 @@ func NewBackedPipe(ctx context.Context, reconnectFn ReconnectFunc) *BackedPipe {
8792
}
8893

8994
// Connect establishes the initial connection using the reconnect function.
90-
func (bp *BackedPipe) Connect(ctx context.Context) error {
95+
func (bp *BackedPipe) Connect(_ context.Context) error { // external ctx ignored; internal ctx used
9196
bp.mu.Lock()
9297
defer bp.mu.Unlock()
9398

@@ -99,7 +104,9 @@ func (bp *BackedPipe) Connect(ctx context.Context) error {
99104
return xerrors.New("pipe is already connected")
100105
}
101106

102-
return bp.reconnectLocked(ctx)
107+
// Use internal context for the actual reconnect operation to ensure
108+
// Close() reliably cancels any in-flight attempt.
109+
return bp.reconnectLocked()
103110
}
104111

105112
// Read implements io.Reader by delegating to the BackedReader.
@@ -188,7 +195,7 @@ func (bp *BackedPipe) signalConnectionChange() {
188195
}
189196

190197
// reconnectLocked handles the reconnection logic. Must be called with write lock held.
191-
func (bp *BackedPipe) reconnectLocked(ctx context.Context) error {
198+
func (bp *BackedPipe) reconnectLocked() error {
192199
if bp.reconnecting {
193200
return xerrors.New("reconnection already in progress")
194201
}
@@ -212,7 +219,7 @@ func (bp *BackedPipe) reconnectLocked(ctx context.Context) error {
212219

213220
// Unlock during reconnect attempt to avoid blocking reads/writes
214221
bp.mu.Unlock()
215-
conn, readerSeqNum, err := bp.reconnectFn(ctx, writerSeqNum)
222+
conn, readerSeqNum, err := bp.reconnectFn(bp.ctx, writerSeqNum)
216223
bp.mu.Lock()
217224

218225
if err != nil {
@@ -280,8 +287,8 @@ func (bp *BackedPipe) handleErrors() {
280287
bp.connected = false
281288
bp.signalConnectionChange()
282289

283-
// Try to reconnect
284-
reconnectErr := bp.reconnectLocked(bp.ctx)
290+
// Try to reconnect using internal context
291+
reconnectErr := bp.reconnectLocked()
285292
bp.mu.Unlock()
286293

287294
if reconnectErr != nil {
@@ -314,19 +321,27 @@ func (bp *BackedPipe) WaitForConnection(ctx context.Context) error {
314321
return ctx.Err()
315322
case <-bp.connectionChanged:
316323
// Connection state changed, check again
324+
case <-time.After(10 * time.Millisecond):
325+
// Periodically re-check to avoid missed notifications
317326
}
318327
}
319328
}
320329

321330
// ForceReconnect forces a reconnection attempt immediately.
322331
// This can be used to force a reconnection if a new connection is established.
323-
func (bp *BackedPipe) ForceReconnect(ctx context.Context) error {
324-
bp.mu.Lock()
325-
defer bp.mu.Unlock()
326-
327-
if bp.closed {
328-
return io.ErrClosedPipe
329-
}
332+
func (bp *BackedPipe) ForceReconnect() error {
333+
// Deduplicate concurrent ForceReconnect calls so only one reconnection
334+
// attempt runs at a time from this API. Use the pipe's internal context
335+
// to ensure Close() cancels any in-flight attempt.
336+
_, err, _ := bp.sf.Do("backedpipe-reconnect", func() (interface{}, error) {
337+
bp.mu.Lock()
338+
defer bp.mu.Unlock()
339+
340+
if bp.closed {
341+
return nil, io.ErrClosedPipe
342+
}
330343

331-
return bp.reconnectLocked(ctx)
344+
return nil, bp.reconnectLocked()
345+
})
346+
return err
332347
}

coderd/agentapi/backedpipe/backed_pipe_test.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ func TestBackedPipe_NewBackedPipe(t *testing.T) {
152152
reconnectFn, _, _ := mockReconnectFunc(newMockConnection())
153153

154154
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
155+
defer bp.Close()
155156
require.NotNil(t, bp)
156157
require.False(t, bp.Connected())
157158
}
@@ -164,6 +165,7 @@ func TestBackedPipe_Connect(t *testing.T) {
164165
reconnectFn, callCount, _ := mockReconnectFunc(conn)
165166

166167
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
168+
defer bp.Close()
167169

168170
err := bp.Connect(ctx)
169171
require.NoError(t, err)
@@ -179,6 +181,7 @@ func TestBackedPipe_ConnectAlreadyConnected(t *testing.T) {
179181
reconnectFn, _, _ := mockReconnectFunc(conn)
180182

181183
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
184+
defer bp.Close()
182185

183186
err := bp.Connect(ctx)
184187
require.NoError(t, err)
@@ -214,6 +217,7 @@ func TestBackedPipe_BasicReadWrite(t *testing.T) {
214217
reconnectFn, _, _ := mockReconnectFunc(conn)
215218

216219
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
220+
defer bp.Close()
217221

218222
err := bp.Connect(ctx)
219223
require.NoError(t, err)
@@ -242,6 +246,7 @@ func TestBackedPipe_WriteBeforeConnect(t *testing.T) {
242246
reconnectFn, _, _ := mockReconnectFunc(conn)
243247

244248
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
249+
defer bp.Close()
245250

246251
// Write before connecting should succeed (buffered)
247252
n, err := bp.Write([]byte("hello"))
@@ -263,6 +268,7 @@ func TestBackedPipe_ReadBlocksWhenDisconnected(t *testing.T) {
263268
reconnectFn, _, _ := mockReconnectFunc(newMockConnection())
264269

265270
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
271+
defer bp.Close()
266272

267273
// Start a read that should block
268274
readDone := make(chan struct{})
@@ -311,6 +317,7 @@ func TestBackedPipe_Reconnection(t *testing.T) {
311317
reconnectFn, _, signalChan := mockReconnectFunc(conn1, conn2)
312318

313319
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
320+
defer bp.Close()
314321

315322
// Initial connect
316323
err := bp.Connect(ctx)
@@ -392,6 +399,7 @@ func TestBackedPipe_WaitForConnection(t *testing.T) {
392399
reconnectFn, _, _ := mockReconnectFunc(conn)
393400

394401
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
402+
defer bp.Close()
395403

396404
// Should timeout when not connected
397405
// Use a shorter timeout for this test to speed up test runs
@@ -426,6 +434,7 @@ func TestBackedPipe_ConcurrentReadWrite(t *testing.T) {
426434
reconnectFn, _, _ := mockReconnectFunc(conn)
427435

428436
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
437+
defer bp.Close()
429438

430439
err := bp.Connect(ctx)
431440
require.NoError(t, err)
@@ -514,6 +523,7 @@ func TestBackedPipe_ReconnectFunctionFailure(t *testing.T) {
514523
}
515524

516525
bp := backedpipe.NewBackedPipe(ctx, failingReconnectFn)
526+
defer bp.Close()
517527

518528
err := bp.Connect(ctx)
519529
require.Error(t, err)
@@ -530,6 +540,7 @@ func TestBackedPipe_ForceReconnect(t *testing.T) {
530540
reconnectFn, callCount, _ := mockReconnectFunc(conn1, conn2)
531541

532542
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
543+
defer bp.Close()
533544

534545
// Initial connect
535546
err := bp.Connect(ctx)
@@ -543,7 +554,7 @@ func TestBackedPipe_ForceReconnect(t *testing.T) {
543554
require.Equal(t, "test data", conn1.ReadString())
544555

545556
// Force a reconnection
546-
err = bp.ForceReconnect(ctx)
557+
err = bp.ForceReconnect()
547558
require.NoError(t, err)
548559
require.True(t, bp.Connected())
549560
require.Equal(t, 2, *callCount)
@@ -580,7 +591,7 @@ func TestBackedPipe_ForceReconnectWhenClosed(t *testing.T) {
580591
require.NoError(t, err)
581592

582593
// Try to force reconnect when closed
583-
err = bp.ForceReconnect(ctx)
594+
err = bp.ForceReconnect()
584595
require.Error(t, err)
585596
require.Equal(t, io.ErrClosedPipe, err)
586597
}
@@ -593,9 +604,10 @@ func TestBackedPipe_ForceReconnectWhenDisconnected(t *testing.T) {
593604
reconnectFn, callCount, _ := mockReconnectFunc(conn)
594605

595606
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
607+
defer bp.Close()
596608

597609
// Don't connect initially, just force reconnect
598-
err := bp.ForceReconnect(ctx)
610+
err := bp.ForceReconnect()
599611
require.NoError(t, err)
600612
require.True(t, bp.Connected())
601613
require.Equal(t, 1, *callCount)
@@ -655,6 +667,7 @@ func TestBackedPipe_EOFTriggersReconnection(t *testing.T) {
655667
}
656668

657669
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
670+
defer bp.Close()
658671

659672
// Initial connect
660673
err := bp.Connect(ctx)
@@ -699,6 +712,9 @@ func BenchmarkBackedPipe_Write(b *testing.B) {
699712

700713
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
701714
bp.Connect(ctx)
715+
b.Cleanup(func() {
716+
_ = bp.Close()
717+
})
702718

703719
data := make([]byte, 1024) // 1KB writes
704720

@@ -715,6 +731,9 @@ func BenchmarkBackedPipe_Read(b *testing.B) {
715731

716732
bp := backedpipe.NewBackedPipe(ctx, reconnectFn)
717733
bp.Connect(ctx)
734+
b.Cleanup(func() {
735+
_ = bp.Close()
736+
})
718737

719738
buf := make([]byte, 1024)
720739

coderd/agentapi/backedpipe/backed_reader.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,41 +34,50 @@ func NewBackedReader() *BackedReader {
3434
// When connected, it reads from the underlying reader and updates sequence numbers.
3535
// Connection failures are automatically detected and reported to the higher layer via callback.
3636
func (br *BackedReader) Read(p []byte) (int, error) {
37-
br.mu.Lock()
38-
defer br.mu.Unlock()
39-
4037
for {
38+
// Step 1: Wait until we have a reader or are closed
39+
br.mu.Lock()
4140
for br.reader == nil && !br.closed {
4241
br.cond.Wait()
4342
}
4443

45-
// Check if closed
4644
if br.closed {
45+
br.mu.Unlock()
4746
return 0, io.ErrClosedPipe
4847
}
4948

49+
// Capture the current reader and release the lock while performing
50+
// the potentially blocking I/O operation to avoid deadlocks with Close().
51+
r := br.reader
5052
br.mu.Unlock()
51-
n, err := br.reader.Read(p)
52-
br.mu.Lock()
5353

54+
// Step 2: Perform the read without holding the mutex
55+
n, err := r.Read(p)
56+
57+
// Step 3: Reacquire the lock to update state based on the result
58+
br.mu.Lock()
5459
if err == nil {
5560
br.sequenceNum += uint64(n) // #nosec G115 -- n is always >= 0 per io.Reader contract
61+
br.mu.Unlock()
5662
return n, nil
5763
}
5864

65+
// Mark disconnected so future reads will wait for reconnection
5966
br.reader = nil
6067

6168
if br.onError != nil {
6269
br.onError(err)
6370
}
6471

65-
// If we got some data before the error, return it
72+
// If we got some data before the error, return it now
6673
if n > 0 {
6774
br.sequenceNum += uint64(n)
75+
br.mu.Unlock()
6876
return n, nil
6977
}
7078

71-
// Return to Step 2 (continue the loop)
79+
// Otherwise loop and wait for reconnection or close
80+
br.mu.Unlock()
7281
}
7382
}
7483

coderd/agentapi/backedpipe/ring_buffer_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,27 @@ package backedpipe_test
33
import (
44
"bytes"
55
"fmt"
6+
"os"
7+
"runtime"
68
"sync"
79
"testing"
810

911
"github.com/stretchr/testify/require"
12+
"go.uber.org/goleak"
1013

1114
"github.com/coder/coder/v2/coderd/agentapi/backedpipe"
15+
"github.com/coder/coder/v2/testutil"
1216
)
1317

18+
func TestMain(m *testing.M) {
19+
if runtime.GOOS == "windows" {
20+
// Don't run goleak on windows tests, they're super flaky right now.
21+
// See: https://github.com/coder/coder/issues/8954
22+
os.Exit(m.Run())
23+
}
24+
goleak.VerifyTestMain(m, testutil.GoleakOptions...)
25+
}
26+
1427
func TestRingBuffer_NewRingBuffer(t *testing.T) {
1528
t.Parallel()
1629

0 commit comments

Comments
 (0)