Skip to content

Commit 16c9105

Browse files
committed
Fix reconnecting PTY flake
I forgot that waiting on the cond releases the lock so it was possible to get pty output after writing the buffer but before adding the pty to the map. To fix, add the pty to the map while under the same lock where we write to the buffer. The rest does not need to be behind the lock so I moved it out of doAttach, and that also means we no longer need waitForStateOrContextLocked.
1 parent 47b8bf6 commit 16c9105

File tree

2 files changed

+20
-25
lines changed

2 files changed

+20
-25
lines changed

agent/reconnectingpty/buffered.go

+20-20
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func (rpty *bufferedReconnectingPTY) Attach(ctx context.Context, connID string,
178178
ctx, cancel := context.WithCancel(ctx)
179179
defer cancel()
180180

181-
err := rpty.doAttach(ctx, connID, conn, height, width, logger)
181+
err := rpty.doAttach(connID, conn)
182182
if err != nil {
183183
return err
184184
}
@@ -189,15 +189,30 @@ func (rpty *bufferedReconnectingPTY) Attach(ctx context.Context, connID string,
189189
delete(rpty.activeConns, connID)
190190
}()
191191

192+
state, err := rpty.state.waitForStateOrContext(ctx, StateReady)
193+
if state != StateReady {
194+
return xerrors.Errorf("reconnecting pty ready wait: %w", err)
195+
}
196+
197+
go heartbeat(ctx, rpty.timer, rpty.timeout)
198+
199+
// Resize the PTY to initial height + width.
200+
err = rpty.ptty.Resize(height, width)
201+
if err != nil {
202+
// We can continue after this, it's not fatal!
203+
logger.Warn(ctx, "reconnecting PTY initial resize failed, but will continue", slog.Error(err))
204+
rpty.metrics.WithLabelValues("resize").Add(1)
205+
}
206+
192207
// Pipe conn -> pty and block. pty -> conn is handled in newBuffered().
193208
readConnLoop(ctx, conn, rpty.ptty, rpty.metrics, logger)
194209
return nil
195210
}
196211

197-
// doAttach adds the connection to the map, replays the buffer, and starts the
198-
// heartbeat. It exists separately only so we can defer the mutex unlock which
199-
// is not possible in Attach since it blocks.
200-
func (rpty *bufferedReconnectingPTY) doAttach(ctx context.Context, connID string, conn net.Conn, height, width uint16, logger slog.Logger) error {
212+
// doAttach adds the connection to the map and replays the buffer. It exists
213+
// separately only for convenience to defer the mutex unlock which is not
214+
// possible in Attach since it blocks.
215+
func (rpty *bufferedReconnectingPTY) doAttach(connID string, conn net.Conn) error {
201216
rpty.state.cond.L.Lock()
202217
defer rpty.state.cond.L.Unlock()
203218

@@ -211,21 +226,6 @@ func (rpty *bufferedReconnectingPTY) doAttach(ctx context.Context, connID string
211226
return xerrors.Errorf("write buffer to conn: %w", err)
212227
}
213228

214-
state, err := rpty.state.waitForStateOrContextLocked(ctx, StateReady)
215-
if state != StateReady {
216-
return xerrors.Errorf("reconnecting pty ready wait: %w", err)
217-
}
218-
219-
go heartbeat(ctx, rpty.timer, rpty.timeout)
220-
221-
// Resize the PTY to initial height + width.
222-
err = rpty.ptty.Resize(height, width)
223-
if err != nil {
224-
// We can continue after this, it's not fatal!
225-
logger.Warn(ctx, "reconnecting PTY initial resize failed, but will continue", slog.Error(err))
226-
rpty.metrics.WithLabelValues("resize").Add(1)
227-
}
228-
229229
rpty.activeConns[connID] = conn
230230

231231
return nil

agent/reconnectingpty/reconnectingpty.go

-5
Original file line numberDiff line numberDiff line change
@@ -171,12 +171,7 @@ func (s *ptyState) waitForState(state State) (State, error) {
171171
func (s *ptyState) waitForStateOrContext(ctx context.Context, state State) (State, error) {
172172
s.cond.L.Lock()
173173
defer s.cond.L.Unlock()
174-
return s.waitForStateOrContextLocked(ctx, state)
175-
}
176174

177-
// waitForStateOrContextLocked is the same as waitForStateOrContext except it
178-
// assumes the caller has already locked cond.
179-
func (s *ptyState) waitForStateOrContextLocked(ctx context.Context, state State) (State, error) {
180175
nevermind := make(chan struct{})
181176
defer close(nevermind)
182177
go func() {

0 commit comments

Comments
 (0)