6
6
"sync"
7
7
"time"
8
8
9
+ "golang.org/x/sync/singleflight"
9
10
"golang.org/x/xerrors"
10
11
)
11
12
@@ -49,6 +50,9 @@ type BackedPipe struct {
49
50
50
51
// Connection state notification
51
52
connectionChanged chan struct {}
53
+
54
+ // singleflight group to dedupe concurrent ForceReconnect calls
55
+ sf singleflight.Group
52
56
}
53
57
54
58
// NewBackedPipe creates a new BackedPipe with default options and the specified reconnect function.
@@ -88,7 +92,7 @@ func NewBackedPipe(ctx context.Context, reconnectFn ReconnectFunc) *BackedPipe {
88
92
}
89
93
90
94
// Connect establishes the initial connection using the reconnect function.
91
- func (bp * BackedPipe ) Connect (ctx context.Context ) error {
95
+ func (bp * BackedPipe ) Connect (_ context.Context ) error { // external ctx ignored; internal ctx used
92
96
bp .mu .Lock ()
93
97
defer bp .mu .Unlock ()
94
98
@@ -100,7 +104,9 @@ func (bp *BackedPipe) Connect(ctx context.Context) error {
100
104
return xerrors .New ("pipe is already connected" )
101
105
}
102
106
103
- return bp .reconnectLocked (ctx )
107
+ // Use internal context for the actual reconnect operation to ensure
108
+ // Close() reliably cancels any in-flight attempt.
109
+ return bp .reconnectLocked ()
104
110
}
105
111
106
112
// Read implements io.Reader by delegating to the BackedReader.
@@ -189,7 +195,7 @@ func (bp *BackedPipe) signalConnectionChange() {
189
195
}
190
196
191
197
// reconnectLocked handles the reconnection logic. Must be called with write lock held.
192
- func (bp * BackedPipe ) reconnectLocked (ctx context. Context ) error {
198
+ func (bp * BackedPipe ) reconnectLocked () error {
193
199
if bp .reconnecting {
194
200
return xerrors .New ("reconnection already in progress" )
195
201
}
@@ -213,7 +219,7 @@ func (bp *BackedPipe) reconnectLocked(ctx context.Context) error {
213
219
214
220
// Unlock during reconnect attempt to avoid blocking reads/writes
215
221
bp .mu .Unlock ()
216
- conn , readerSeqNum , err := bp .reconnectFn (ctx , writerSeqNum )
222
+ conn , readerSeqNum , err := bp .reconnectFn (bp . ctx , writerSeqNum )
217
223
bp .mu .Lock ()
218
224
219
225
if err != nil {
@@ -281,8 +287,8 @@ func (bp *BackedPipe) handleErrors() {
281
287
bp .connected = false
282
288
bp .signalConnectionChange ()
283
289
284
- // Try to reconnect
285
- reconnectErr := bp .reconnectLocked (bp . ctx )
290
+ // Try to reconnect using internal context
291
+ reconnectErr := bp .reconnectLocked ()
286
292
bp .mu .Unlock ()
287
293
288
294
if reconnectErr != nil {
@@ -323,16 +329,19 @@ func (bp *BackedPipe) WaitForConnection(ctx context.Context) error {
323
329
324
330
// ForceReconnect forces a reconnection attempt immediately.
325
331
// This can be used to force a reconnection if a new connection is established.
326
- func (bp * BackedPipe ) ForceReconnect (ctx context.Context ) error {
327
- bp .mu .Lock ()
328
- defer bp .mu .Unlock ()
329
-
330
- if bp .closed {
331
- return io .ErrClosedPipe
332
- }
332
+ func (bp * BackedPipe ) ForceReconnect () error {
333
+ // Deduplicate concurrent ForceReconnect calls so only one reconnection
334
+ // attempt runs at a time from this API. Use the pipe's internal context
335
+ // to ensure Close() cancels any in-flight attempt.
336
+ _ , err , _ := bp .sf .Do ("backedpipe-reconnect" , func () (interface {}, error ) {
337
+ bp .mu .Lock ()
338
+ defer bp .mu .Unlock ()
339
+
340
+ if bp .closed {
341
+ return nil , io .ErrClosedPipe
342
+ }
333
343
334
- // Use the pipe's internal context so that Close() reliably cancels any
335
- // in-flight reconnection attempts. An external context here can outlive
336
- // the pipe and cause goroutines to block indefinitely.
337
- return bp .reconnectLocked (bp .ctx )
344
+ return nil , bp .reconnectLocked ()
345
+ })
346
+ return err
338
347
}
0 commit comments