Skip to content

Commit 6e48486

Browse files
committed
WIP
1 parent 3af1086 commit 6e48486

File tree

3 files changed

+211
-53
lines changed

3 files changed

+211
-53
lines changed

agent/immortalstreams/stream.go

Lines changed: 75 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ type Stream struct {
3131
connected bool
3232
closed bool
3333

34-
// Indicates a reconnection attempt is in progress (single-flight)
35-
reconnecting bool
34+
// Indicates a reconnect handshake is in progress (from pending request
35+
// until the pipe reports connected). Prevents a second ForceReconnect
36+
// from racing and closing the just-provided connection.
37+
handshakePending bool
3638

3739
// goroutines manages the copy goroutines
3840
goroutines sync.WaitGroup
@@ -42,6 +44,9 @@ type Stream struct {
4244
// Condition variable to wait for pendingReconnect changes
4345
reconnectCond *sync.Cond
4446

47+
// Reconnect worker signaling (coalesced pokes)
48+
reconnectReq chan struct{}
49+
4550
// Disconnection detection
4651
disconnectChan chan struct{}
4752

@@ -72,6 +77,7 @@ func NewStream(id uuid.UUID, name string, port int, logger slog.Logger) *Stream
7277
logger: logger,
7378
disconnectChan: make(chan struct{}, 1),
7479
shutdownChan: make(chan struct{}),
80+
reconnectReq: make(chan struct{}, 1),
7581
}
7682
stream.reconnectCond = sync.NewCond(&stream.mu)
7783

@@ -85,6 +91,12 @@ func NewStream(id uuid.UUID, name string, port int, logger slog.Logger) *Stream
8591
writerSeqNum: writerSeqNum,
8692
response: responseChan,
8793
}
94+
stream.handshakePending = true
95+
// Mark disconnected if we previously had a client connection
96+
if stream.connected {
97+
stream.connected = false
98+
stream.lastDisconnectionAt = time.Now()
99+
}
88100
stream.logger.Info(context.Background(), "pending reconnect set",
89101
slog.F("writer_seq", writerSeqNum))
90102
// Signal waiters a reconnect request is pending
@@ -117,12 +129,14 @@ func NewStream(id uuid.UUID, name string, port int, logger slog.Logger) *Stream
117129
// Context was canceled, clear pending request and return error
118130
stream.mu.Lock()
119131
stream.pendingReconnect = nil
132+
stream.handshakePending = false
120133
stream.mu.Unlock()
121134
return nil, 0, ctx.Err()
122135
case <-stream.shutdownChan:
123136
// Stream is being shut down, clear pending request and return error
124137
stream.mu.Lock()
125138
stream.pendingReconnect = nil
139+
stream.handshakePending = false
126140
stream.mu.Unlock()
127141
return nil, 0, xerrors.New("stream is shutting down")
128142
}
@@ -131,6 +145,45 @@ func NewStream(id uuid.UUID, name string, port int, logger slog.Logger) *Stream
131145
// Create BackedPipe with background context
132146
stream.pipe = backedpipe.NewBackedPipe(context.Background(), reconnectFn)
133147

148+
// Start reconnect worker: dedupe pokes and call ForceReconnect when safe.
149+
go func() {
150+
for {
151+
select {
152+
case <-stream.shutdownChan:
153+
return
154+
case <-stream.reconnectReq:
155+
// Drain extra pokes to coalesce
156+
for {
157+
select {
158+
case <-stream.reconnectReq:
159+
default:
160+
goto drained
161+
}
162+
}
163+
drained:
164+
stream.mu.Lock()
165+
closed := stream.closed
166+
handshaking := stream.handshakePending
167+
canReconnect := stream.pipe != nil && !stream.pipe.Connected()
168+
stream.mu.Unlock()
169+
if closed || handshaking || !canReconnect {
170+
// Nothing to do now; wait for a future poke.
171+
continue
172+
}
173+
// BackedPipe handles singleflight internally.
174+
stream.logger.Debug(context.Background(), "worker calling ForceReconnect")
175+
err := stream.pipe.ForceReconnect()
176+
stream.logger.Debug(context.Background(), "worker ForceReconnect returned", slog.Error(err))
177+
// Wake any waiters to re-check state after attempt completes.
178+
stream.mu.Lock()
179+
if stream.reconnectCond != nil {
180+
stream.reconnectCond.Broadcast()
181+
}
182+
stream.mu.Unlock()
183+
}
184+
}
185+
}()
186+
134187
return stream
135188
}
136189

@@ -166,25 +219,12 @@ func (s *Stream) HandleReconnect(clientConn io.ReadWriteCloser, readSeqNum uint6
166219
slog.F("read_seq_num", readSeqNum),
167220
slog.F("has_pending", s.pendingReconnect != nil))
168221

169-
// Helper to start a reconnect attempt once (single-flight)
170-
startReconnectIfNeeded := func() {
171-
// Only trigger reconnect if:
172-
// - No reconnect goroutine is in-flight
173-
// - There is no pending reconnect request waiting for a client
174-
// - The pipe is not already connected
175-
if !s.reconnecting && s.pendingReconnect == nil && (s.pipe == nil || !s.pipe.Connected()) {
176-
s.reconnecting = true
177-
go func() {
178-
s.logger.Debug(context.Background(), "calling ForceReconnect")
179-
err := s.pipe.ForceReconnect()
180-
s.logger.Debug(context.Background(), "force reconnect returned", slog.Error(err))
181-
s.mu.Lock()
182-
s.reconnecting = false
183-
if s.reconnectCond != nil {
184-
s.reconnectCond.Broadcast()
185-
}
186-
s.mu.Unlock()
187-
}()
222+
// Helper: request a reconnect attempt by poking the worker
223+
requestReconnect := func() {
224+
select {
225+
case s.reconnectReq <- struct{}{}:
226+
default:
227+
// already requested; coalesced
188228
}
189229
}
190230

@@ -201,7 +241,11 @@ func (s *Stream) HandleReconnect(clientConn io.ReadWriteCloser, readSeqNum uint6
201241
respCh <- reconnectResponse{conn: clientConn, readSeq: readSeqNum, err: nil}
202242

203243
// Wait until the pipe reports a connected state so the handshake fully completes.
204-
if err := s.pipe.WaitForConnection(context.Background()); err != nil {
244+
// Use a bounded timeout to avoid hanging forever in pathological cases.
245+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
246+
err := s.pipe.WaitForConnection(ctx)
247+
cancel()
248+
if err != nil {
205249
s.mu.Lock()
206250
s.connected = false
207251
if s.reconnectCond != nil {
@@ -215,6 +259,7 @@ func (s *Stream) HandleReconnect(clientConn io.ReadWriteCloser, readSeqNum uint6
215259
s.mu.Lock()
216260
s.lastConnectionAt = time.Now()
217261
s.connected = true
262+
s.handshakePending = false
218263
if s.reconnectCond != nil {
219264
s.reconnectCond.Broadcast()
220265
}
@@ -237,12 +282,11 @@ func (s *Stream) HandleReconnect(clientConn io.ReadWriteCloser, readSeqNum uint6
237282
return xerrors.New("stream is already connected")
238283
}
239284

240-
// Ensure a reconnect attempt is in-flight while we wait.
241-
startReconnectIfNeeded()
285+
// Ensure a reconnect attempt is requested while we wait.
286+
requestReconnect()
242287

243288
// Wait until state changes: pendingReconnect set, connection established, or closed.
244289
s.logger.Debug(context.Background(), "waiting for pending request or connection change",
245-
slog.F("reconnecting", s.reconnecting),
246290
slog.F("pending", s.pendingReconnect != nil),
247291
slog.F("connected", s.connected),
248292
slog.F("closed", s.closed))
@@ -286,6 +330,7 @@ func (s *Stream) Close() error {
286330
err: xerrors.New("stream is shutting down"),
287331
}
288332
s.pendingReconnect = nil
333+
s.handshakePending = false
289334
}
290335

291336
// Close the backed pipe
@@ -377,15 +422,17 @@ func (s *Stream) startCopyingLocked() {
377422
s.logger.Debug(context.Background(), "starting copy from pipe to local goroutine")
378423
// Keep copying until the stream is closed
379424
// The BackedPipe will block when no client is connected
425+
buf := make([]byte, 32*1024)
380426
for {
381427
// Use a buffer for copying
382-
buf := make([]byte, 32*1024)
383428
n, err := s.pipe.Read(buf)
384429
// Log significant events
385430
if errors.Is(err, io.EOF) {
386-
s.logger.Debug(context.Background(), "got EOF from pipe, will continue")
431+
s.logger.Debug(context.Background(), "got EOF from pipe")
432+
s.SignalDisconnect()
387433
} else if err != nil && !errors.Is(err, io.ErrClosedPipe) {
388434
s.logger.Debug(context.Background(), "error reading from pipe", slog.Error(err))
435+
s.SignalDisconnect()
389436
}
390437

391438
if n > 0 {
@@ -407,11 +454,7 @@ func (s *Stream) startCopyingLocked() {
407454
s.SignalDisconnect()
408455
return
409456
}
410-
// Any other error (including EOF) is not fatal - the BackedPipe will handle it
411-
// Just continue the loop
412-
if !xerrors.Is(err, io.EOF) {
413-
s.logger.Debug(context.Background(), "non-fatal error reading from pipe, continuing", slog.Error(err))
414-
}
457+
// Any other error (including EOF) is handled by BackedPipe; continue
415458
}
416459
}
417460
}()

0 commit comments

Comments
 (0)