@@ -36,6 +36,8 @@ type Stream struct {
36
36
37
37
// Reconnection coordination
38
38
pendingReconnect * reconnectRequest
39
+ // Event to signal that a reconnect request is pending
40
+ reconnectRequested chan struct {}
39
41
40
42
// Disconnection detection
41
43
disconnectChan chan struct {}
@@ -60,13 +62,14 @@ type reconnectResponse struct {
60
62
// NewStream creates a new immortal stream
61
63
func NewStream (id uuid.UUID , name string , port int , logger slog.Logger ) * Stream {
62
64
stream := & Stream {
63
- id : id ,
64
- name : name ,
65
- port : port ,
66
- createdAt : time .Now (),
67
- logger : logger ,
68
- disconnectChan : make (chan struct {}, 1 ),
69
- shutdownChan : make (chan struct {}, 1 ),
65
+ id : id ,
66
+ name : name ,
67
+ port : port ,
68
+ createdAt : time .Now (),
69
+ logger : logger ,
70
+ disconnectChan : make (chan struct {}, 1 ),
71
+ shutdownChan : make (chan struct {}),
72
+ reconnectRequested : make (chan struct {}),
70
73
}
71
74
72
75
// Create a reconnect function that waits for a client connection
@@ -79,8 +82,25 @@ func NewStream(id uuid.UUID, name string, port int, logger slog.Logger) *Stream
79
82
writerSeqNum : writerSeqNum ,
80
83
response : responseChan ,
81
84
}
85
+ // Replace any previous pending request and signal waiters
86
+ // Close and recreate the event channel to broadcast the event
87
+ close (stream .reconnectRequested )
88
+ stream .reconnectRequested = make (chan struct {})
82
89
stream .mu .Unlock ()
83
90
91
+ // Fast path: if the stream is already shutting down, abort immediately
92
+ select {
93
+ case <- stream .shutdownChan :
94
+ stream .mu .Lock ()
95
+ // Clear the pending request since we're aborting
96
+ if stream .pendingReconnect != nil {
97
+ stream .pendingReconnect = nil
98
+ }
99
+ stream .mu .Unlock ()
100
+ return nil , 0 , xerrors .New ("stream is shutting down" )
101
+ default :
102
+ }
103
+
84
104
// Wait for response from HandleReconnect or context cancellation
85
105
stream .logger .Debug (context .Background (), "reconnect function waiting for response" )
86
106
select {
@@ -108,14 +128,6 @@ func NewStream(id uuid.UUID, name string, port int, logger slog.Logger) *Stream
108
128
// Create BackedPipe with background context
109
129
stream .pipe = backedpipe .NewBackedPipe (context .Background (), reconnectFn )
110
130
111
- // Immediately initiate a background connection so the BackedPipe
112
- // is provided with an io.ReadWriteCloser as soon as one is available.
113
- go func () {
114
- if err := stream .pipe .ForceReconnect (context .Background ()); err != nil {
115
- stream .logger .Debug (context .Background (), "initial backed pipe connect returned" , slog .Error (err ))
116
- }
117
- }()
118
-
119
131
return stream
120
132
}
121
133
@@ -174,55 +186,43 @@ func (s *Stream) HandleReconnect(clientConn io.ReadWriteCloser, readSeqNum uint6
174
186
// No pending request - we need to trigger a reconnection
175
187
s .logger .Debug (context .Background (), "no pending request, will trigger reconnection" )
176
188
177
- // Use a channel to coordinate with the reconnect function
178
- readyChan := make ( chan struct {})
189
+ // Wait for the reconnect function to post a pending request
190
+ reconnectWait := s . reconnectRequested
179
191
connectDone := make (chan error , 1 )
180
-
181
- // Prepare to intercept the next pending request
182
- interceptConn := clientConn
183
- interceptReadSeq := readSeqNum
184
-
185
192
s .mu .Unlock ()
186
193
187
- // Start a goroutine that will wait for the pending request and fulfill it
188
- go func () {
189
- // Signal when we're ready to intercept
190
- close (readyChan )
191
-
192
- // Poll for the pending request
193
- for {
194
- s .mu .Lock ()
195
- if s .pendingReconnect != nil {
196
- // Found the pending request, fulfill it
197
- s .pendingReconnect .response <- reconnectResponse {
198
- conn : interceptConn ,
199
- readSeq : interceptReadSeq ,
200
- err : nil ,
201
- }
202
- s .pendingReconnect = nil
203
- s .mu .Unlock ()
204
- return
205
- }
206
- s .mu .Unlock ()
207
-
208
- // Small sleep to avoid busy waiting
209
- time .Sleep (1 * time .Millisecond )
210
- }
211
- }()
212
-
213
- // Wait for the interceptor to be ready
214
- <- readyChan
215
-
216
- // Now trigger the reconnection - this will call our reconnect function
194
+ // Trigger the reconnection - this will call our reconnect function
217
195
go func () {
218
196
s .logger .Debug (context .Background (), "calling ForceReconnect" )
219
197
err := s .pipe .ForceReconnect (context .Background ())
220
198
s .logger .Debug (context .Background (), "force reconnect returned" , slog .Error (err ))
221
199
connectDone <- err
222
200
}()
223
201
224
- // Wait for the connection to complete
225
- err := <- connectDone
202
+ // Wait for reconnectFn to signal the request, then fulfill it
203
+ var earlyDone bool
204
+ var earlyErr error
205
+ select {
206
+ case <- reconnectWait :
207
+ s .mu .Lock ()
208
+ if s .pendingReconnect != nil {
209
+ s .pendingReconnect .response <- reconnectResponse {conn : clientConn , readSeq : readSeqNum , err : nil }
210
+ s .pendingReconnect = nil
211
+ }
212
+ s .mu .Unlock ()
213
+ case err := <- connectDone :
214
+ // Reconnect returned before we got a pending request
215
+ earlyDone = true
216
+ earlyErr = err
217
+ }
218
+
219
+ // Wait for the connection to complete if we didn't already
220
+ var err error
221
+ if earlyDone {
222
+ err = earlyErr
223
+ } else {
224
+ err = <- connectDone
225
+ }
226
226
227
227
s .mu .Lock ()
228
228
defer s .mu .Unlock ()
@@ -252,12 +252,13 @@ func (s *Stream) Close() error {
252
252
s .closed = true
253
253
s .connected = false
254
254
255
- // Signal shutdown to any pending reconnect attempts
255
+ // Signal shutdown to any pending reconnect attempts and listeners
256
+ // Closing the channel wakes all waiters exactly once
256
257
select {
257
- case s .shutdownChan <- struct {}{} :
258
- // Signal sent successfully
258
+ case <- s .shutdownChan :
259
+ // already closed
259
260
default :
260
- // Channel is full or already closed, which is fine
261
+ close ( s . shutdownChan )
261
262
}
262
263
263
264
// Clear any pending reconnect request
@@ -429,10 +430,16 @@ func (s *Stream) handleDisconnect() {
429
430
430
431
// SignalDisconnect signals that the connection has been lost
431
432
func (s * Stream ) SignalDisconnect () {
433
+ s .mu .RLock ()
434
+ closed := s .closed
435
+ s .mu .RUnlock ()
436
+ if closed {
437
+ return
438
+ }
432
439
select {
433
440
case s .disconnectChan <- struct {}{}:
434
441
default :
435
- // Channel is full or closed , ignore
442
+ // Channel is full, ignore
436
443
}
437
444
}
438
445
0 commit comments