@@ -31,13 +31,16 @@ type Stream struct {
31
31
connected bool
32
32
closed bool
33
33
34
+ // Indicates a reconnection attempt is in progress (single-flight)
35
+ reconnecting bool
36
+
34
37
// goroutines manages the copy goroutines
35
38
goroutines sync.WaitGroup
36
39
37
40
// Reconnection coordination
38
41
pendingReconnect * reconnectRequest
39
- // Event to signal that a reconnect request is pending
40
- reconnectRequested chan struct {}
42
+ // Condition variable to wait for pendingReconnect changes
43
+ reconnectCond * sync. Cond
41
44
42
45
// Disconnection detection
43
46
disconnectChan chan struct {}
@@ -62,15 +65,15 @@ type reconnectResponse struct {
62
65
// NewStream creates a new immortal stream
63
66
func NewStream (id uuid.UUID , name string , port int , logger slog.Logger ) * Stream {
64
67
stream := & Stream {
65
- id : id ,
66
- name : name ,
67
- port : port ,
68
- createdAt : time .Now (),
69
- logger : logger ,
70
- disconnectChan : make (chan struct {}, 1 ),
71
- shutdownChan : make (chan struct {}),
72
- reconnectRequested : make (chan struct {}),
68
+ id : id ,
69
+ name : name ,
70
+ port : port ,
71
+ createdAt : time .Now (),
72
+ logger : logger ,
73
+ disconnectChan : make (chan struct {}, 1 ),
74
+ shutdownChan : make (chan struct {}),
73
75
}
76
+ stream .reconnectCond = sync .NewCond (& stream .mu )
74
77
75
78
// Create a reconnect function that waits for a client connection
76
79
reconnectFn := func (ctx context.Context , writerSeqNum uint64 ) (io.ReadWriteCloser , uint64 , error ) {
@@ -82,10 +85,8 @@ func NewStream(id uuid.UUID, name string, port int, logger slog.Logger) *Stream
82
85
writerSeqNum : writerSeqNum ,
83
86
response : responseChan ,
84
87
}
85
- // Replace any previous pending request and signal waiters
86
- // Close and recreate the event channel to broadcast the event
87
- close (stream .reconnectRequested )
88
- stream .reconnectRequested = make (chan struct {})
88
+ // Signal waiters a reconnect request is pending
89
+ stream .reconnectCond .Broadcast ()
89
90
stream .mu .Unlock ()
90
91
91
92
// Fast path: if the stream is already shutting down, abort immediately
@@ -183,59 +184,77 @@ func (s *Stream) HandleReconnect(clientConn io.ReadWriteCloser, readSeqNum uint6
183
184
return nil
184
185
}
185
186
186
- // No pending request - we need to trigger a reconnection
187
+ // No pending request - we need to trigger a reconnection (single-flight)
187
188
s .logger .Debug (context .Background (), "no pending request, will trigger reconnection" )
188
189
189
- // Wait for the reconnect function to post a pending request
190
- reconnectWait := s .reconnectRequested
191
- connectDone := make (chan error , 1 )
192
- s .mu .Unlock ()
193
-
194
- // Trigger the reconnection - this will call our reconnect function
195
- go func () {
196
- s .logger .Debug (context .Background (), "calling ForceReconnect" )
197
- err := s .pipe .ForceReconnect (context .Background ())
198
- s .logger .Debug (context .Background (), "force reconnect returned" , slog .Error (err ))
199
- connectDone <- err
200
- }()
190
+ for {
191
+ // Ensure only one goroutine kicks off ForceReconnect
192
+ if ! s .reconnecting {
193
+ s .reconnecting = true
194
+ go func () {
195
+ s .logger .Debug (context .Background (), "calling ForceReconnect" )
196
+ err := s .pipe .ForceReconnect (context .Background ())
197
+ s .logger .Debug (context .Background (), "force reconnect returned" , slog .Error (err ))
198
+ s .mu .Lock ()
199
+ s .reconnecting = false
200
+ // Notify any waiters in case we need to retry
201
+ if s .reconnectCond != nil {
202
+ s .reconnectCond .Broadcast ()
203
+ }
204
+ s .mu .Unlock ()
205
+ }()
206
+ }
201
207
202
- // Wait for reconnectFn to signal the request, then fulfill it
203
- var earlyDone bool
204
- var earlyErr error
205
- select {
206
- case <- reconnectWait :
207
- s .mu .Lock ()
208
+ // If a reconnect request is pending, respond and break
208
209
if s .pendingReconnect != nil {
209
210
s .pendingReconnect .response <- reconnectResponse {conn : clientConn , readSeq : readSeqNum , err : nil }
210
211
s .pendingReconnect = nil
212
+ break
211
213
}
212
- s .mu .Unlock ()
213
- case err := <- connectDone :
214
- // Reconnect returned before we got a pending request
215
- earlyDone = true
216
- earlyErr = err
217
- }
218
214
219
- // Wait for the connection to complete if we didn't already
220
- var err error
221
- if earlyDone {
222
- err = earlyErr
223
- } else {
224
- err = <- connectDone
215
+ // If the stream has been closed, exit
216
+ if s .closed {
217
+ s .mu .Unlock ()
218
+ return xerrors .New ("stream is closed" )
219
+ }
220
+
221
+ // If already connected (another goroutine handled it), we're done
222
+ if s .connected {
223
+ s .mu .Unlock ()
224
+ s .logger .Debug (context .Background (), "another goroutine completed reconnection" )
225
+ return nil
226
+ }
227
+
228
+ // Wait until something changes (pending request posted, reconnect attempt finishes, or close)
229
+ s .reconnectCond .Wait ()
225
230
}
226
231
227
- s .mu .Lock ()
228
- defer s .mu .Unlock ()
232
+ s .mu .Unlock ()
229
233
230
- if err != nil {
234
+ // Wait until the pipe reports a connected state
235
+ // This ensures the reconnection handshake fully completes regardless of
236
+ // which goroutine initiated it.
237
+ if err := s .pipe .WaitForConnection (context .Background ()); err != nil {
238
+ s .mu .Lock ()
231
239
s .connected = false
240
+ // Notify any other waiters to re-check state or exit
241
+ if s .reconnectCond != nil {
242
+ s .reconnectCond .Broadcast ()
243
+ }
244
+ s .mu .Unlock ()
232
245
s .logger .Warn (context .Background (), "failed to connect backed pipe" , slog .Error (err ))
233
246
return xerrors .Errorf ("failed to establish connection: %w" , err )
234
247
}
235
248
236
- // Success
249
+ s . mu . Lock ()
237
250
s .lastConnectionAt = time .Now ()
238
251
s .connected = true
252
+ // Wake any concurrent HandleReconnect callers waiting for connection
253
+ if s .reconnectCond != nil {
254
+ s .reconnectCond .Broadcast ()
255
+ }
256
+ s .mu .Unlock ()
257
+
239
258
s .logger .Debug (context .Background (), "client reconnection successful" )
240
259
return nil
241
260
}
@@ -261,6 +280,12 @@ func (s *Stream) Close() error {
261
280
close (s .shutdownChan )
262
281
}
263
282
283
+ // Wake any goroutines waiting for a pending reconnect request so they
284
+ // observe the closed state and exit promptly.
285
+ if s .reconnectCond != nil {
286
+ s .reconnectCond .Broadcast ()
287
+ }
288
+
264
289
// Clear any pending reconnect request
265
290
if s .pendingReconnect != nil {
266
291
s .pendingReconnect .response <- reconnectResponse {
0 commit comments