Skip to content

Commit a08f7b8

Browse files
authored
fix: catch missing output with reconnecting PTY (#9094)
I forgot that waiting on the cond releases the lock so it was possible to get pty output after writing the buffer but before adding the pty to the map. To fix, add the pty to the map while under the same lock where we read from the buffer. The rest does not need to be behind the lock so I moved it out of doAttach, and that also means we no longer need waitForStateOrContextLocked. Also, this can hit a logger error saying the attach failed which fails the tests however it is not that the attach failed, just that the process already ran and exited, so when the process exits do not set an error, instead for now assume this is an expected close.
1 parent 6ea82c5 commit a08f7b8

File tree

4 files changed

+36
-41
lines changed

4 files changed

+36
-41
lines changed

agent/agent.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1134,7 +1134,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
11341134
rpty.Wait()
11351135
a.reconnectingPTYs.Delete(msg.ID)
11361136
}); err != nil {
1137-
rpty.Close(err.Error())
1137+
rpty.Close(err)
11381138
return xerrors.Errorf("start routine: %w", err)
11391139
}
11401140

agent/reconnectingpty/buffered.go

+28-28
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func newBuffered(ctx context.Context, cmd *pty.Cmd, options *Options, logger slo
5151
// Default to buffer 64KiB.
5252
circularBuffer, err := circbuf.NewBuffer(64 << 10)
5353
if err != nil {
54-
rpty.state.setState(StateDone, xerrors.Errorf("generate screen id: %w", err))
54+
rpty.state.setState(StateDone, xerrors.Errorf("create circular buffer: %w", err))
5555
return rpty
5656
}
5757
rpty.circularBuffer = circularBuffer
@@ -63,7 +63,7 @@ func newBuffered(ctx context.Context, cmd *pty.Cmd, options *Options, logger slo
6363
cmdWithEnv.Dir = rpty.command.Dir
6464
ptty, process, err := pty.Start(cmdWithEnv)
6565
if err != nil {
66-
rpty.state.setState(StateDone, xerrors.Errorf("generate screen id: %w", err))
66+
rpty.state.setState(StateDone, xerrors.Errorf("start pty: %w", err))
6767
return rpty
6868
}
6969
rpty.ptty = ptty
@@ -92,7 +92,7 @@ func newBuffered(ctx context.Context, cmd *pty.Cmd, options *Options, logger slo
9292
// not found for example).
9393
// TODO: Should we check the process's exit code in case the command was
9494
// invalid?
95-
rpty.Close("unable to read pty output, command might have exited")
95+
rpty.Close(nil)
9696
break
9797
}
9898
part := buffer[:read]
@@ -126,7 +126,7 @@ func newBuffered(ctx context.Context, cmd *pty.Cmd, options *Options, logger slo
126126
// or the reconnecting pty closes the pty will be shut down.
127127
func (rpty *bufferedReconnectingPTY) lifecycle(ctx context.Context, logger slog.Logger) {
128128
rpty.timer = time.AfterFunc(attachTimeout, func() {
129-
rpty.Close("reconnecting pty timeout")
129+
rpty.Close(xerrors.New("reconnecting pty timeout"))
130130
})
131131

132132
logger.Debug(ctx, "reconnecting pty ready")
@@ -136,7 +136,7 @@ func (rpty *bufferedReconnectingPTY) lifecycle(ctx context.Context, logger slog.
136136
if state < StateClosing {
137137
// If we have not closed yet then the context is what unblocked us (which
138138
// means the agent is shutting down) so move into the closing phase.
139-
rpty.Close(reasonErr.Error())
139+
rpty.Close(reasonErr)
140140
}
141141
rpty.timer.Stop()
142142

@@ -168,7 +168,7 @@ func (rpty *bufferedReconnectingPTY) lifecycle(ctx context.Context, logger slog.
168168
}
169169

170170
logger.Info(ctx, "closed reconnecting pty")
171-
rpty.state.setState(StateDone, xerrors.Errorf("reconnecting pty closed: %w", reasonErr))
171+
rpty.state.setState(StateDone, reasonErr)
172172
}
173173

174174
func (rpty *bufferedReconnectingPTY) Attach(ctx context.Context, connID string, conn net.Conn, height, width uint16, logger slog.Logger) error {
@@ -178,7 +178,7 @@ func (rpty *bufferedReconnectingPTY) Attach(ctx context.Context, connID string,
178178
ctx, cancel := context.WithCancel(ctx)
179179
defer cancel()
180180

181-
err := rpty.doAttach(ctx, connID, conn, height, width, logger)
181+
err := rpty.doAttach(connID, conn)
182182
if err != nil {
183183
return err
184184
}
@@ -189,15 +189,30 @@ func (rpty *bufferedReconnectingPTY) Attach(ctx context.Context, connID string,
189189
delete(rpty.activeConns, connID)
190190
}()
191191

192+
state, err := rpty.state.waitForStateOrContext(ctx, StateReady)
193+
if state != StateReady {
194+
return err
195+
}
196+
197+
go heartbeat(ctx, rpty.timer, rpty.timeout)
198+
199+
// Resize the PTY to initial height + width.
200+
err = rpty.ptty.Resize(height, width)
201+
if err != nil {
202+
// We can continue after this, it's not fatal!
203+
logger.Warn(ctx, "reconnecting PTY initial resize failed, but will continue", slog.Error(err))
204+
rpty.metrics.WithLabelValues("resize").Add(1)
205+
}
206+
192207
// Pipe conn -> pty and block. pty -> conn is handled in newBuffered().
193208
readConnLoop(ctx, conn, rpty.ptty, rpty.metrics, logger)
194209
return nil
195210
}
196211

197-
// doAttach adds the connection to the map, replays the buffer, and starts the
198-
// heartbeat. It exists separately only so we can defer the mutex unlock which
199-
// is not possible in Attach since it blocks.
200-
func (rpty *bufferedReconnectingPTY) doAttach(ctx context.Context, connID string, conn net.Conn, height, width uint16, logger slog.Logger) error {
212+
// doAttach adds the connection to the map and replays the buffer. It exists
213+
// separately only for convenience to defer the mutex unlock which is not
214+
// possible in Attach since it blocks.
215+
func (rpty *bufferedReconnectingPTY) doAttach(connID string, conn net.Conn) error {
201216
rpty.state.cond.L.Lock()
202217
defer rpty.state.cond.L.Unlock()
203218

@@ -211,21 +226,6 @@ func (rpty *bufferedReconnectingPTY) doAttach(ctx context.Context, connID string
211226
return xerrors.Errorf("write buffer to conn: %w", err)
212227
}
213228

214-
state, err := rpty.state.waitForStateOrContextLocked(ctx, StateReady)
215-
if state != StateReady {
216-
return xerrors.Errorf("reconnecting pty ready wait: %w", err)
217-
}
218-
219-
go heartbeat(ctx, rpty.timer, rpty.timeout)
220-
221-
// Resize the PTY to initial height + width.
222-
err = rpty.ptty.Resize(height, width)
223-
if err != nil {
224-
// We can continue after this, it's not fatal!
225-
logger.Warn(ctx, "reconnecting PTY initial resize failed, but will continue", slog.Error(err))
226-
rpty.metrics.WithLabelValues("resize").Add(1)
227-
}
228-
229229
rpty.activeConns[connID] = conn
230230

231231
return nil
@@ -235,7 +235,7 @@ func (rpty *bufferedReconnectingPTY) Wait() {
235235
_, _ = rpty.state.waitForState(StateClosing)
236236
}
237237

238-
func (rpty *bufferedReconnectingPTY) Close(reason string) {
238+
func (rpty *bufferedReconnectingPTY) Close(error error) {
239239
// The closing state change will be handled by the lifecycle.
240-
rpty.state.setState(StateClosing, xerrors.Errorf("reconnecting pty closing: %s", reason))
240+
rpty.state.setState(StateClosing, error)
241241
}

agent/reconnectingpty/reconnectingpty.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type ReconnectingPTY interface {
4848
// still be exiting.
4949
Wait()
5050
// Close kills the reconnecting pty process.
51-
Close(reason string)
51+
Close(err error)
5252
}
5353

5454
// New sets up a new reconnecting pty that wraps the provided command. Any
@@ -171,12 +171,7 @@ func (s *ptyState) waitForState(state State) (State, error) {
171171
func (s *ptyState) waitForStateOrContext(ctx context.Context, state State) (State, error) {
172172
s.cond.L.Lock()
173173
defer s.cond.L.Unlock()
174-
return s.waitForStateOrContextLocked(ctx, state)
175-
}
176174

177-
// waitForStateOrContextLocked is the same as waitForStateOrContext except it
178-
// assumes the caller has already locked cond.
179-
func (s *ptyState) waitForStateOrContextLocked(ctx context.Context, state State) (State, error) {
180175
nevermind := make(chan struct{})
181176
defer close(nevermind)
182177
go func() {

agent/reconnectingpty/screen.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func newScreen(ctx context.Context, cmd *pty.Cmd, options *Options, logger slog.
124124
// the reconnecting pty will be closed.
125125
func (rpty *screenReconnectingPTY) lifecycle(ctx context.Context, logger slog.Logger) {
126126
rpty.timer = time.AfterFunc(attachTimeout, func() {
127-
rpty.Close("reconnecting pty timeout")
127+
rpty.Close(xerrors.New("reconnecting pty timeout"))
128128
})
129129

130130
logger.Debug(ctx, "reconnecting pty ready")
@@ -134,7 +134,7 @@ func (rpty *screenReconnectingPTY) lifecycle(ctx context.Context, logger slog.Lo
134134
if state < StateClosing {
135135
// If we have not closed yet then the context is what unblocked us (which
136136
// means the agent is shutting down) so move into the closing phase.
137-
rpty.Close(reasonErr.Error())
137+
rpty.Close(reasonErr)
138138
}
139139
rpty.timer.Stop()
140140

@@ -145,7 +145,7 @@ func (rpty *screenReconnectingPTY) lifecycle(ctx context.Context, logger slog.Lo
145145
}
146146

147147
logger.Info(ctx, "closed reconnecting pty")
148-
rpty.state.setState(StateDone, xerrors.Errorf("reconnecting pty closed: %w", reasonErr))
148+
rpty.state.setState(StateDone, reasonErr)
149149
}
150150

151151
func (rpty *screenReconnectingPTY) Attach(ctx context.Context, _ string, conn net.Conn, height, width uint16, logger slog.Logger) error {
@@ -157,7 +157,7 @@ func (rpty *screenReconnectingPTY) Attach(ctx context.Context, _ string, conn ne
157157

158158
state, err := rpty.state.waitForStateOrContext(ctx, StateReady)
159159
if state != StateReady {
160-
return xerrors.Errorf("reconnecting pty ready wait: %w", err)
160+
return err
161161
}
162162

163163
go heartbeat(ctx, rpty.timer, rpty.timeout)
@@ -382,7 +382,7 @@ func (rpty *screenReconnectingPTY) Wait() {
382382
_, _ = rpty.state.waitForState(StateClosing)
383383
}
384384

385-
func (rpty *screenReconnectingPTY) Close(reason string) {
385+
func (rpty *screenReconnectingPTY) Close(err error) {
386386
// The closing state change will be handled by the lifecycle.
387-
rpty.state.setState(StateClosing, xerrors.Errorf("reconnecting pty closing: %s", reason))
387+
rpty.state.setState(StateClosing, err)
388388
}

0 commit comments

Comments
 (0)