@@ -51,7 +51,7 @@ func newBuffered(ctx context.Context, cmd *pty.Cmd, options *Options, logger slo
51
51
// Default to buffer 64KiB.
52
52
circularBuffer , err := circbuf .NewBuffer (64 << 10 )
53
53
if err != nil {
54
- rpty .state .setState (StateDone , xerrors .Errorf ("generate screen id : %w" , err ))
54
+ rpty .state .setState (StateDone , xerrors .Errorf ("create circular buffer : %w" , err ))
55
55
return rpty
56
56
}
57
57
rpty .circularBuffer = circularBuffer
@@ -63,7 +63,7 @@ func newBuffered(ctx context.Context, cmd *pty.Cmd, options *Options, logger slo
63
63
cmdWithEnv .Dir = rpty .command .Dir
64
64
ptty , process , err := pty .Start (cmdWithEnv )
65
65
if err != nil {
66
- rpty .state .setState (StateDone , xerrors .Errorf ("generate screen id : %w" , err ))
66
+ rpty .state .setState (StateDone , xerrors .Errorf ("start pty : %w" , err ))
67
67
return rpty
68
68
}
69
69
rpty .ptty = ptty
@@ -92,7 +92,7 @@ func newBuffered(ctx context.Context, cmd *pty.Cmd, options *Options, logger slo
92
92
// not found for example).
93
93
// TODO: Should we check the process's exit code in case the command was
94
94
// invalid?
95
- rpty .Close ("unable to read pty output, command might have exited" )
95
+ rpty .Close (nil )
96
96
break
97
97
}
98
98
part := buffer [:read ]
@@ -126,7 +126,7 @@ func newBuffered(ctx context.Context, cmd *pty.Cmd, options *Options, logger slo
126
126
// or the reconnecting pty closes the pty will be shut down.
127
127
func (rpty * bufferedReconnectingPTY ) lifecycle (ctx context.Context , logger slog.Logger ) {
128
128
rpty .timer = time .AfterFunc (attachTimeout , func () {
129
- rpty .Close ("reconnecting pty timeout" )
129
+ rpty .Close (xerrors . New ( "reconnecting pty timeout" ) )
130
130
})
131
131
132
132
logger .Debug (ctx , "reconnecting pty ready" )
@@ -136,7 +136,7 @@ func (rpty *bufferedReconnectingPTY) lifecycle(ctx context.Context, logger slog.
136
136
if state < StateClosing {
137
137
// If we have not closed yet then the context is what unblocked us (which
138
138
// means the agent is shutting down) so move into the closing phase.
139
- rpty .Close (reasonErr . Error () )
139
+ rpty .Close (reasonErr )
140
140
}
141
141
rpty .timer .Stop ()
142
142
@@ -168,7 +168,7 @@ func (rpty *bufferedReconnectingPTY) lifecycle(ctx context.Context, logger slog.
168
168
}
169
169
170
170
logger .Info (ctx , "closed reconnecting pty" )
171
- rpty .state .setState (StateDone , xerrors . Errorf ( "reconnecting pty closed: %w" , reasonErr ) )
171
+ rpty .state .setState (StateDone , reasonErr )
172
172
}
173
173
174
174
func (rpty * bufferedReconnectingPTY ) Attach (ctx context.Context , connID string , conn net.Conn , height , width uint16 , logger slog.Logger ) error {
@@ -178,7 +178,7 @@ func (rpty *bufferedReconnectingPTY) Attach(ctx context.Context, connID string,
178
178
ctx , cancel := context .WithCancel (ctx )
179
179
defer cancel ()
180
180
181
- err := rpty .doAttach (ctx , connID , conn , height , width , logger )
181
+ err := rpty .doAttach (connID , conn )
182
182
if err != nil {
183
183
return err
184
184
}
@@ -189,15 +189,30 @@ func (rpty *bufferedReconnectingPTY) Attach(ctx context.Context, connID string,
189
189
delete (rpty .activeConns , connID )
190
190
}()
191
191
192
+ state , err := rpty .state .waitForStateOrContext (ctx , StateReady )
193
+ if state != StateReady {
194
+ return err
195
+ }
196
+
197
+ go heartbeat (ctx , rpty .timer , rpty .timeout )
198
+
199
+ // Resize the PTY to initial height + width.
200
+ err = rpty .ptty .Resize (height , width )
201
+ if err != nil {
202
+ // We can continue after this, it's not fatal!
203
+ logger .Warn (ctx , "reconnecting PTY initial resize failed, but will continue" , slog .Error (err ))
204
+ rpty .metrics .WithLabelValues ("resize" ).Add (1 )
205
+ }
206
+
192
207
// Pipe conn -> pty and block. pty -> conn is handled in newBuffered().
193
208
readConnLoop (ctx , conn , rpty .ptty , rpty .metrics , logger )
194
209
return nil
195
210
}
196
211
197
- // doAttach adds the connection to the map, replays the buffer, and starts the
198
- // heartbeat. It exists separately only so we can defer the mutex unlock which
199
- // is not possible in Attach since it blocks.
200
- func (rpty * bufferedReconnectingPTY ) doAttach (ctx context. Context , connID string , conn net.Conn , height , width uint16 , logger slog. Logger ) error {
212
+ // doAttach adds the connection to the map and replays the buffer. It exists
213
+ // separately only for convenience to defer the mutex unlock which is not
214
+ // possible in Attach since it blocks.
215
+ func (rpty * bufferedReconnectingPTY ) doAttach (connID string , conn net.Conn ) error {
201
216
rpty .state .cond .L .Lock ()
202
217
defer rpty .state .cond .L .Unlock ()
203
218
@@ -211,21 +226,6 @@ func (rpty *bufferedReconnectingPTY) doAttach(ctx context.Context, connID string
211
226
return xerrors .Errorf ("write buffer to conn: %w" , err )
212
227
}
213
228
214
- state , err := rpty .state .waitForStateOrContextLocked (ctx , StateReady )
215
- if state != StateReady {
216
- return xerrors .Errorf ("reconnecting pty ready wait: %w" , err )
217
- }
218
-
219
- go heartbeat (ctx , rpty .timer , rpty .timeout )
220
-
221
- // Resize the PTY to initial height + width.
222
- err = rpty .ptty .Resize (height , width )
223
- if err != nil {
224
- // We can continue after this, it's not fatal!
225
- logger .Warn (ctx , "reconnecting PTY initial resize failed, but will continue" , slog .Error (err ))
226
- rpty .metrics .WithLabelValues ("resize" ).Add (1 )
227
- }
228
-
229
229
rpty .activeConns [connID ] = conn
230
230
231
231
return nil
@@ -235,7 +235,7 @@ func (rpty *bufferedReconnectingPTY) Wait() {
235
235
_ , _ = rpty .state .waitForState (StateClosing )
236
236
}
237
237
238
- func (rpty * bufferedReconnectingPTY ) Close (reason string ) {
238
+ func (rpty * bufferedReconnectingPTY ) Close (error error ) {
239
239
// The closing state change will be handled by the lifecycle.
240
- rpty .state .setState (StateClosing , xerrors . Errorf ( "reconnecting pty closing: %s" , reason ) )
240
+ rpty .state .setState (StateClosing , error )
241
241
}
0 commit comments