@@ -21,7 +21,6 @@ import (
21
21
"sync"
22
22
"time"
23
23
24
- "github.com/armon/circbuf"
25
24
"github.com/go-chi/chi/v5"
26
25
"github.com/google/uuid"
27
26
"github.com/prometheus/client_golang/prometheus"
@@ -36,12 +35,12 @@ import (
36
35
37
36
"cdr.dev/slog"
38
37
"github.com/coder/coder/agent/agentssh"
38
+ "github.com/coder/coder/agent/reconnectingpty"
39
39
"github.com/coder/coder/buildinfo"
40
40
"github.com/coder/coder/coderd/database"
41
41
"github.com/coder/coder/coderd/gitauth"
42
42
"github.com/coder/coder/codersdk"
43
43
"github.com/coder/coder/codersdk/agentsdk"
44
- "github.com/coder/coder/pty"
45
44
"github.com/coder/coder/tailnet"
46
45
"github.com/coder/retry"
47
46
)
@@ -92,9 +91,6 @@ type Agent interface {
92
91
}
93
92
94
93
func New (options Options ) Agent {
95
- if options .ReconnectingPTYTimeout == 0 {
96
- options .ReconnectingPTYTimeout = 5 * time .Minute
97
- }
98
94
if options .Filesystem == nil {
99
95
options .Filesystem = afero .NewOsFs ()
100
96
}
@@ -1075,8 +1071,8 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1075
1071
defer a .connCountReconnectingPTY .Add (- 1 )
1076
1072
1077
1073
connectionID := uuid .NewString ()
1078
- logger = logger .With (slog .F ("message_id" , msg .ID ), slog .F ("connection_id" , connectionID ))
1079
- logger .Debug (ctx , "starting handler" )
1074
+ connLogger : = logger .With (slog .F ("message_id" , msg .ID ), slog .F ("connection_id" , connectionID ))
1075
+ connLogger .Debug (ctx , "starting handler" )
1080
1076
1081
1077
defer func () {
1082
1078
if err := retErr ; err != nil {
@@ -1087,22 +1083,22 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1087
1083
// If the agent is closed, we don't want to
1088
1084
// log this as an error since it's expected.
1089
1085
if closed {
1090
- logger .Debug (ctx , "reconnecting PTY failed with session error (agent closed)" , slog .Error (err ))
1086
+ connLogger .Debug (ctx , "reconnecting pty failed with attach error (agent closed)" , slog .Error (err ))
1091
1087
} else {
1092
- logger .Error (ctx , "reconnecting PTY failed with session error" , slog .Error (err ))
1088
+ connLogger .Error (ctx , "reconnecting pty failed with attach error" , slog .Error (err ))
1093
1089
}
1094
1090
}
1095
- logger .Debug (ctx , "session closed" )
1091
+ connLogger .Debug (ctx , "reconnecting pty connection closed" )
1096
1092
}()
1097
1093
1098
- var rpty * reconnectingPTY
1099
- sendConnected := make (chan * reconnectingPTY , 1 )
1094
+ var rpty reconnectingpty. ReconnectingPTY
1095
+ sendConnected := make (chan reconnectingpty. ReconnectingPTY , 1 )
1100
1096
// On store, reserve this ID to prevent multiple concurrent new connections.
1101
1097
waitReady , ok := a .reconnectingPTYs .LoadOrStore (msg .ID , sendConnected )
1102
1098
if ok {
1103
1099
close (sendConnected ) // Unused.
1104
- logger .Debug (ctx , "connecting to existing session " )
1105
- c , ok := waitReady .(chan * reconnectingPTY )
1100
+ connLogger .Debug (ctx , "connecting to existing reconnecting pty " )
1101
+ c , ok := waitReady .(chan reconnectingpty. ReconnectingPTY )
1106
1102
if ! ok {
1107
1103
return xerrors .Errorf ("found invalid type in reconnecting pty map: %T" , waitReady )
1108
1104
}
@@ -1112,7 +1108,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1112
1108
}
1113
1109
c <- rpty // Put it back for the next reconnect.
1114
1110
} else {
1115
- logger .Debug (ctx , "creating new session " )
1111
+ connLogger .Debug (ctx , "creating new reconnecting pty " )
1116
1112
1117
1113
connected := false
1118
1114
defer func () {
@@ -1128,169 +1124,24 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1128
1124
a .metrics .reconnectingPTYErrors .WithLabelValues ("create_command" ).Add (1 )
1129
1125
return xerrors .Errorf ("create command: %w" , err )
1130
1126
}
1131
- cmd .Env = append (cmd .Env , "TERM=xterm-256color" )
1132
-
1133
- // Default to buffer 64KiB.
1134
- circularBuffer , err := circbuf .NewBuffer (64 << 10 )
1135
- if err != nil {
1136
- return xerrors .Errorf ("create circular buffer: %w" , err )
1137
- }
1138
1127
1139
- ptty , process , err := pty .Start (cmd )
1140
- if err != nil {
1141
- a .metrics .reconnectingPTYErrors .WithLabelValues ("start_command" ).Add (1 )
1142
- return xerrors .Errorf ("start command: %w" , err )
1143
- }
1128
+ rpty = reconnectingpty .New (ctx , cmd , & reconnectingpty.Options {
1129
+ Timeout : a .reconnectingPTYTimeout ,
1130
+ Metrics : a .metrics .reconnectingPTYErrors ,
1131
+ }, logger .With (slog .F ("message_id" , msg .ID )))
1144
1132
1145
- ctx , cancel := context .WithCancel (ctx )
1146
- rpty = & reconnectingPTY {
1147
- activeConns : map [string ]net.Conn {
1148
- // We have to put the connection in the map instantly otherwise
1149
- // the connection won't be closed if the process instantly dies.
1150
- connectionID : conn ,
1151
- },
1152
- ptty : ptty ,
1153
- // Timeouts created with an after func can be reset!
1154
- timeout : time .AfterFunc (a .reconnectingPTYTimeout , cancel ),
1155
- circularBuffer : circularBuffer ,
1156
- }
1157
- // We don't need to separately monitor for the process exiting.
1158
- // When it exits, our ptty.OutputReader() will return EOF after
1159
- // reading all process output.
1160
1133
if err = a .trackConnGoroutine (func () {
1161
- buffer := make ([]byte , 1024 )
1162
- for {
1163
- read , err := rpty .ptty .OutputReader ().Read (buffer )
1164
- if err != nil {
1165
- // When the PTY is closed, this is triggered.
1166
- // Error is typically a benign EOF, so only log for debugging.
1167
- if errors .Is (err , io .EOF ) {
1168
- logger .Debug (ctx , "unable to read pty output, command might have exited" , slog .Error (err ))
1169
- } else {
1170
- logger .Warn (ctx , "unable to read pty output, command might have exited" , slog .Error (err ))
1171
- a .metrics .reconnectingPTYErrors .WithLabelValues ("output_reader" ).Add (1 )
1172
- }
1173
- break
1174
- }
1175
- part := buffer [:read ]
1176
- rpty .circularBufferMutex .Lock ()
1177
- _ , err = rpty .circularBuffer .Write (part )
1178
- rpty .circularBufferMutex .Unlock ()
1179
- if err != nil {
1180
- logger .Error (ctx , "write to circular buffer" , slog .Error (err ))
1181
- break
1182
- }
1183
- rpty .activeConnsMutex .Lock ()
1184
- for cid , conn := range rpty .activeConns {
1185
- _ , err = conn .Write (part )
1186
- if err != nil {
1187
- logger .Warn (ctx ,
1188
- "error writing to active conn" ,
1189
- slog .F ("other_conn_id" , cid ),
1190
- slog .Error (err ),
1191
- )
1192
- a .metrics .reconnectingPTYErrors .WithLabelValues ("write" ).Add (1 )
1193
- }
1194
- }
1195
- rpty .activeConnsMutex .Unlock ()
1196
- }
1197
-
1198
- // Cleanup the process, PTY, and delete it's
1199
- // ID from memory.
1200
- _ = process .Kill ()
1201
- rpty .Close ()
1134
+ rpty .Wait ()
1202
1135
a .reconnectingPTYs .Delete (msg .ID )
1203
1136
}); err != nil {
1204
- _ = process .Kill ()
1205
- _ = ptty .Close ()
1137
+ rpty .Close (err .Error ())
1206
1138
return xerrors .Errorf ("start routine: %w" , err )
1207
1139
}
1140
+
1208
1141
connected = true
1209
1142
sendConnected <- rpty
1210
1143
}
1211
- // Resize the PTY to initial height + width.
1212
- err := rpty .ptty .Resize (msg .Height , msg .Width )
1213
- if err != nil {
1214
- // We can continue after this, it's not fatal!
1215
- logger .Error (ctx , "reconnecting PTY initial resize failed, but will continue" , slog .Error (err ))
1216
- a .metrics .reconnectingPTYErrors .WithLabelValues ("resize" ).Add (1 )
1217
- }
1218
- // Write any previously stored data for the TTY.
1219
- rpty .circularBufferMutex .RLock ()
1220
- prevBuf := slices .Clone (rpty .circularBuffer .Bytes ())
1221
- rpty .circularBufferMutex .RUnlock ()
1222
- // Note that there is a small race here between writing buffered
1223
- // data and storing conn in activeConns. This is likely a very minor
1224
- // edge case, but we should look into ways to avoid it. Holding
1225
- // activeConnsMutex would be one option, but holding this mutex
1226
- // while also holding circularBufferMutex seems dangerous.
1227
- _ , err = conn .Write (prevBuf )
1228
- if err != nil {
1229
- a .metrics .reconnectingPTYErrors .WithLabelValues ("write" ).Add (1 )
1230
- return xerrors .Errorf ("write buffer to conn: %w" , err )
1231
- }
1232
- // Multiple connections to the same TTY are permitted.
1233
- // This could easily be used for terminal sharing, but
1234
- // we do it because it's a nice user experience to
1235
- // copy/paste a terminal URL and have it _just work_.
1236
- rpty .activeConnsMutex .Lock ()
1237
- rpty .activeConns [connectionID ] = conn
1238
- rpty .activeConnsMutex .Unlock ()
1239
- // Resetting this timeout prevents the PTY from exiting.
1240
- rpty .timeout .Reset (a .reconnectingPTYTimeout )
1241
-
1242
- ctx , cancelFunc := context .WithCancel (ctx )
1243
- defer cancelFunc ()
1244
- heartbeat := time .NewTicker (a .reconnectingPTYTimeout / 2 )
1245
- defer heartbeat .Stop ()
1246
- go func () {
1247
- // Keep updating the activity while this
1248
- // connection is alive!
1249
- for {
1250
- select {
1251
- case <- ctx .Done ():
1252
- return
1253
- case <- heartbeat .C :
1254
- }
1255
- rpty .timeout .Reset (a .reconnectingPTYTimeout )
1256
- }
1257
- }()
1258
- defer func () {
1259
- // After this connection ends, remove it from
1260
- // the PTYs active connections. If it isn't
1261
- // removed, all PTY data will be sent to it.
1262
- rpty .activeConnsMutex .Lock ()
1263
- delete (rpty .activeConns , connectionID )
1264
- rpty .activeConnsMutex .Unlock ()
1265
- }()
1266
- decoder := json .NewDecoder (conn )
1267
- var req codersdk.ReconnectingPTYRequest
1268
- for {
1269
- err = decoder .Decode (& req )
1270
- if xerrors .Is (err , io .EOF ) {
1271
- return nil
1272
- }
1273
- if err != nil {
1274
- logger .Warn (ctx , "reconnecting PTY failed with read error" , slog .Error (err ))
1275
- return nil
1276
- }
1277
- _ , err = rpty .ptty .InputWriter ().Write ([]byte (req .Data ))
1278
- if err != nil {
1279
- logger .Warn (ctx , "reconnecting PTY failed with write error" , slog .Error (err ))
1280
- a .metrics .reconnectingPTYErrors .WithLabelValues ("input_writer" ).Add (1 )
1281
- return nil
1282
- }
1283
- // Check if a resize needs to happen!
1284
- if req .Height == 0 || req .Width == 0 {
1285
- continue
1286
- }
1287
- err = rpty .ptty .Resize (req .Height , req .Width )
1288
- if err != nil {
1289
- // We can continue after this, it's not fatal!
1290
- logger .Error (ctx , "reconnecting PTY resize failed, but will continue" , slog .Error (err ))
1291
- a .metrics .reconnectingPTYErrors .WithLabelValues ("resize" ).Add (1 )
1292
- }
1293
- }
1144
+ return rpty .Attach (ctx , connectionID , conn , msg .Height , msg .Width , connLogger )
1294
1145
}
1295
1146
1296
1147
// startReportingConnectionStats runs the connection stats reporting goroutine.
@@ -1541,31 +1392,6 @@ lifecycleWaitLoop:
1541
1392
return nil
1542
1393
}
1543
1394
1544
- type reconnectingPTY struct {
1545
- activeConnsMutex sync.Mutex
1546
- activeConns map [string ]net.Conn
1547
-
1548
- circularBuffer * circbuf.Buffer
1549
- circularBufferMutex sync.RWMutex
1550
- timeout * time.Timer
1551
- ptty pty.PTYCmd
1552
- }
1553
-
1554
- // Close ends all connections to the reconnecting
1555
- // PTY and clear the circular buffer.
1556
- func (r * reconnectingPTY ) Close () {
1557
- r .activeConnsMutex .Lock ()
1558
- defer r .activeConnsMutex .Unlock ()
1559
- for _ , conn := range r .activeConns {
1560
- _ = conn .Close ()
1561
- }
1562
- _ = r .ptty .Close ()
1563
- r .circularBufferMutex .Lock ()
1564
- r .circularBuffer .Reset ()
1565
- r .circularBufferMutex .Unlock ()
1566
- r .timeout .Stop ()
1567
- }
1568
-
1569
1395
// userHomeDir returns the home directory of the current user, giving
1570
1396
// priority to the $HOME environment variable.
1571
1397
func userHomeDir () (string , error ) {
0 commit comments