@@ -31,8 +31,10 @@ type Stream struct {
31
31
connected bool
32
32
closed bool
33
33
34
- // Indicates a reconnection attempt is in progress (single-flight)
35
- reconnecting bool
34
+ // Indicates a reconnect handshake is in progress (from pending request
35
+ // until the pipe reports connected). Prevents a second ForceReconnect
36
+ // from racing and closing the just-provided connection.
37
+ handshakePending bool
36
38
37
39
// goroutines manages the copy goroutines
38
40
goroutines sync.WaitGroup
@@ -42,6 +44,9 @@ type Stream struct {
42
44
// Condition variable to wait for pendingReconnect changes
43
45
reconnectCond * sync.Cond
44
46
47
+ // Reconnect worker signaling (coalesced pokes)
48
+ reconnectReq chan struct {}
49
+
45
50
// Disconnection detection
46
51
disconnectChan chan struct {}
47
52
@@ -72,6 +77,7 @@ func NewStream(id uuid.UUID, name string, port int, logger slog.Logger) *Stream
72
77
logger : logger ,
73
78
disconnectChan : make (chan struct {}, 1 ),
74
79
shutdownChan : make (chan struct {}),
80
+ reconnectReq : make (chan struct {}, 1 ),
75
81
}
76
82
stream .reconnectCond = sync .NewCond (& stream .mu )
77
83
@@ -85,6 +91,12 @@ func NewStream(id uuid.UUID, name string, port int, logger slog.Logger) *Stream
85
91
writerSeqNum : writerSeqNum ,
86
92
response : responseChan ,
87
93
}
94
+ stream .handshakePending = true
95
+ // Mark disconnected if we previously had a client connection
96
+ if stream .connected {
97
+ stream .connected = false
98
+ stream .lastDisconnectionAt = time .Now ()
99
+ }
88
100
stream .logger .Info (context .Background (), "pending reconnect set" ,
89
101
slog .F ("writer_seq" , writerSeqNum ))
90
102
// Signal waiters a reconnect request is pending
@@ -117,12 +129,14 @@ func NewStream(id uuid.UUID, name string, port int, logger slog.Logger) *Stream
117
129
// Context was canceled, clear pending request and return error
118
130
stream .mu .Lock ()
119
131
stream .pendingReconnect = nil
132
+ stream .handshakePending = false
120
133
stream .mu .Unlock ()
121
134
return nil , 0 , ctx .Err ()
122
135
case <- stream .shutdownChan :
123
136
// Stream is being shut down, clear pending request and return error
124
137
stream .mu .Lock ()
125
138
stream .pendingReconnect = nil
139
+ stream .handshakePending = false
126
140
stream .mu .Unlock ()
127
141
return nil , 0 , xerrors .New ("stream is shutting down" )
128
142
}
@@ -131,6 +145,45 @@ func NewStream(id uuid.UUID, name string, port int, logger slog.Logger) *Stream
131
145
// Create BackedPipe with background context
132
146
stream .pipe = backedpipe .NewBackedPipe (context .Background (), reconnectFn )
133
147
148
+ // Start reconnect worker: dedupe pokes and call ForceReconnect when safe.
149
+ go func () {
150
+ for {
151
+ select {
152
+ case <- stream .shutdownChan :
153
+ return
154
+ case <- stream .reconnectReq :
155
+ // Drain extra pokes to coalesce
156
+ for {
157
+ select {
158
+ case <- stream .reconnectReq :
159
+ default :
160
+ goto drained
161
+ }
162
+ }
163
+ drained:
164
+ stream .mu .Lock ()
165
+ closed := stream .closed
166
+ handshaking := stream .handshakePending
167
+ canReconnect := stream .pipe != nil && ! stream .pipe .Connected ()
168
+ stream .mu .Unlock ()
169
+ if closed || handshaking || ! canReconnect {
170
+ // Nothing to do now; wait for a future poke.
171
+ continue
172
+ }
173
+ // BackedPipe handles singleflight internally.
174
+ stream .logger .Debug (context .Background (), "worker calling ForceReconnect" )
175
+ err := stream .pipe .ForceReconnect ()
176
+ stream .logger .Debug (context .Background (), "worker ForceReconnect returned" , slog .Error (err ))
177
+ // Wake any waiters to re-check state after attempt completes.
178
+ stream .mu .Lock ()
179
+ if stream .reconnectCond != nil {
180
+ stream .reconnectCond .Broadcast ()
181
+ }
182
+ stream .mu .Unlock ()
183
+ }
184
+ }
185
+ }()
186
+
134
187
return stream
135
188
}
136
189
@@ -166,25 +219,12 @@ func (s *Stream) HandleReconnect(clientConn io.ReadWriteCloser, readSeqNum uint6
166
219
slog .F ("read_seq_num" , readSeqNum ),
167
220
slog .F ("has_pending" , s .pendingReconnect != nil ))
168
221
169
- // Helper to start a reconnect attempt once (single-flight)
170
- startReconnectIfNeeded := func () {
171
- // Only trigger reconnect if:
172
- // - No reconnect goroutine is in-flight
173
- // - There is no pending reconnect request waiting for a client
174
- // - The pipe is not already connected
175
- if ! s .reconnecting && s .pendingReconnect == nil && (s .pipe == nil || ! s .pipe .Connected ()) {
176
- s .reconnecting = true
177
- go func () {
178
- s .logger .Debug (context .Background (), "calling ForceReconnect" )
179
- err := s .pipe .ForceReconnect ()
180
- s .logger .Debug (context .Background (), "force reconnect returned" , slog .Error (err ))
181
- s .mu .Lock ()
182
- s .reconnecting = false
183
- if s .reconnectCond != nil {
184
- s .reconnectCond .Broadcast ()
185
- }
186
- s .mu .Unlock ()
187
- }()
222
+ // Helper: request a reconnect attempt by poking the worker
223
+ requestReconnect := func () {
224
+ select {
225
+ case s .reconnectReq <- struct {}{}:
226
+ default :
227
+ // already requested; coalesced
188
228
}
189
229
}
190
230
@@ -201,7 +241,11 @@ func (s *Stream) HandleReconnect(clientConn io.ReadWriteCloser, readSeqNum uint6
201
241
respCh <- reconnectResponse {conn : clientConn , readSeq : readSeqNum , err : nil }
202
242
203
243
// Wait until the pipe reports a connected state so the handshake fully completes.
204
- if err := s .pipe .WaitForConnection (context .Background ()); err != nil {
244
+ // Use a bounded timeout to avoid hanging forever in pathological cases.
245
+ ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
246
+ err := s .pipe .WaitForConnection (ctx )
247
+ cancel ()
248
+ if err != nil {
205
249
s .mu .Lock ()
206
250
s .connected = false
207
251
if s .reconnectCond != nil {
@@ -215,6 +259,7 @@ func (s *Stream) HandleReconnect(clientConn io.ReadWriteCloser, readSeqNum uint6
215
259
s .mu .Lock ()
216
260
s .lastConnectionAt = time .Now ()
217
261
s .connected = true
262
+ s .handshakePending = false
218
263
if s .reconnectCond != nil {
219
264
s .reconnectCond .Broadcast ()
220
265
}
@@ -237,12 +282,11 @@ func (s *Stream) HandleReconnect(clientConn io.ReadWriteCloser, readSeqNum uint6
237
282
return xerrors .New ("stream is already connected" )
238
283
}
239
284
240
- // Ensure a reconnect attempt is in-flight while we wait.
241
- startReconnectIfNeeded ()
285
+ // Ensure a reconnect attempt is requested while we wait.
286
+ requestReconnect ()
242
287
243
288
// Wait until state changes: pendingReconnect set, connection established, or closed.
244
289
s .logger .Debug (context .Background (), "waiting for pending request or connection change" ,
245
- slog .F ("reconnecting" , s .reconnecting ),
246
290
slog .F ("pending" , s .pendingReconnect != nil ),
247
291
slog .F ("connected" , s .connected ),
248
292
slog .F ("closed" , s .closed ))
@@ -286,6 +330,7 @@ func (s *Stream) Close() error {
286
330
err : xerrors .New ("stream is shutting down" ),
287
331
}
288
332
s .pendingReconnect = nil
333
+ s .handshakePending = false
289
334
}
290
335
291
336
// Close the backed pipe
@@ -377,15 +422,17 @@ func (s *Stream) startCopyingLocked() {
377
422
s .logger .Debug (context .Background (), "starting copy from pipe to local goroutine" )
378
423
// Keep copying until the stream is closed
379
424
// The BackedPipe will block when no client is connected
425
+ buf := make ([]byte , 32 * 1024 )
380
426
for {
381
427
// Use a buffer for copying
382
- buf := make ([]byte , 32 * 1024 )
383
428
n , err := s .pipe .Read (buf )
384
429
// Log significant events
385
430
if errors .Is (err , io .EOF ) {
386
- s .logger .Debug (context .Background (), "got EOF from pipe, will continue" )
431
+ s .logger .Debug (context .Background (), "got EOF from pipe" )
432
+ s .SignalDisconnect ()
387
433
} else if err != nil && ! errors .Is (err , io .ErrClosedPipe ) {
388
434
s .logger .Debug (context .Background (), "error reading from pipe" , slog .Error (err ))
435
+ s .SignalDisconnect ()
389
436
}
390
437
391
438
if n > 0 {
@@ -407,11 +454,7 @@ func (s *Stream) startCopyingLocked() {
407
454
s .SignalDisconnect ()
408
455
return
409
456
}
410
- // Any other error (including EOF) is not fatal - the BackedPipe will handle it
411
- // Just continue the loop
412
- if ! xerrors .Is (err , io .EOF ) {
413
- s .logger .Debug (context .Background (), "non-fatal error reading from pipe, continuing" , slog .Error (err ))
414
- }
457
+ // Any other error (including EOF) is handled by BackedPipe; continue
415
458
}
416
459
}
417
460
}()
0 commit comments