Skip to content

Commit b993cab

Browse files
authored
fix: use screen for reconnecting terminal sessions on Linux if available (#8640)
* Add screen backend for reconnecting ptys The screen portion is a port from wsep. There is an interface that lets you choose between screen and the previous method. By default it will choose screen if it is installed but this can be overidden (mostly for tests). The tests use a scanner instead of a reader now because the reader will loop infinitely at the end of a stream. Replace /bin/bash with bash since bash is not always in /bin. * Remove connection_id from reconnecting PTY logger This serves multiple connections so it makes no sense to scope it to a single connection. Also lets us use "connection_id" when logging write errors instead of "other_conn_id". * Use PATH to test buffered reconnecting pty
1 parent 878315d commit b993cab

File tree

9 files changed

+1092
-313
lines changed

9 files changed

+1092
-313
lines changed

agent/agent.go

+19-193
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"sync"
2222
"time"
2323

24-
"github.com/armon/circbuf"
2524
"github.com/go-chi/chi/v5"
2625
"github.com/google/uuid"
2726
"github.com/prometheus/client_golang/prometheus"
@@ -36,12 +35,12 @@ import (
3635

3736
"cdr.dev/slog"
3837
"github.com/coder/coder/agent/agentssh"
38+
"github.com/coder/coder/agent/reconnectingpty"
3939
"github.com/coder/coder/buildinfo"
4040
"github.com/coder/coder/coderd/database"
4141
"github.com/coder/coder/coderd/gitauth"
4242
"github.com/coder/coder/codersdk"
4343
"github.com/coder/coder/codersdk/agentsdk"
44-
"github.com/coder/coder/pty"
4544
"github.com/coder/coder/tailnet"
4645
"github.com/coder/retry"
4746
)
@@ -92,9 +91,6 @@ type Agent interface {
9291
}
9392

9493
func New(options Options) Agent {
95-
if options.ReconnectingPTYTimeout == 0 {
96-
options.ReconnectingPTYTimeout = 5 * time.Minute
97-
}
9894
if options.Filesystem == nil {
9995
options.Filesystem = afero.NewOsFs()
10096
}
@@ -1075,8 +1071,8 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
10751071
defer a.connCountReconnectingPTY.Add(-1)
10761072

10771073
connectionID := uuid.NewString()
1078-
logger = logger.With(slog.F("message_id", msg.ID), slog.F("connection_id", connectionID))
1079-
logger.Debug(ctx, "starting handler")
1074+
connLogger := logger.With(slog.F("message_id", msg.ID), slog.F("connection_id", connectionID))
1075+
connLogger.Debug(ctx, "starting handler")
10801076

10811077
defer func() {
10821078
if err := retErr; err != nil {
@@ -1087,22 +1083,22 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
10871083
// If the agent is closed, we don't want to
10881084
// log this as an error since it's expected.
10891085
if closed {
1090-
logger.Debug(ctx, "reconnecting PTY failed with session error (agent closed)", slog.Error(err))
1086+
connLogger.Debug(ctx, "reconnecting pty failed with attach error (agent closed)", slog.Error(err))
10911087
} else {
1092-
logger.Error(ctx, "reconnecting PTY failed with session error", slog.Error(err))
1088+
connLogger.Error(ctx, "reconnecting pty failed with attach error", slog.Error(err))
10931089
}
10941090
}
1095-
logger.Debug(ctx, "session closed")
1091+
connLogger.Debug(ctx, "reconnecting pty connection closed")
10961092
}()
10971093

1098-
var rpty *reconnectingPTY
1099-
sendConnected := make(chan *reconnectingPTY, 1)
1094+
var rpty reconnectingpty.ReconnectingPTY
1095+
sendConnected := make(chan reconnectingpty.ReconnectingPTY, 1)
11001096
// On store, reserve this ID to prevent multiple concurrent new connections.
11011097
waitReady, ok := a.reconnectingPTYs.LoadOrStore(msg.ID, sendConnected)
11021098
if ok {
11031099
close(sendConnected) // Unused.
1104-
logger.Debug(ctx, "connecting to existing session")
1105-
c, ok := waitReady.(chan *reconnectingPTY)
1100+
connLogger.Debug(ctx, "connecting to existing reconnecting pty")
1101+
c, ok := waitReady.(chan reconnectingpty.ReconnectingPTY)
11061102
if !ok {
11071103
return xerrors.Errorf("found invalid type in reconnecting pty map: %T", waitReady)
11081104
}
@@ -1112,7 +1108,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
11121108
}
11131109
c <- rpty // Put it back for the next reconnect.
11141110
} else {
1115-
logger.Debug(ctx, "creating new session")
1111+
connLogger.Debug(ctx, "creating new reconnecting pty")
11161112

11171113
connected := false
11181114
defer func() {
@@ -1128,169 +1124,24 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
11281124
a.metrics.reconnectingPTYErrors.WithLabelValues("create_command").Add(1)
11291125
return xerrors.Errorf("create command: %w", err)
11301126
}
1131-
cmd.Env = append(cmd.Env, "TERM=xterm-256color")
1132-
1133-
// Default to buffer 64KiB.
1134-
circularBuffer, err := circbuf.NewBuffer(64 << 10)
1135-
if err != nil {
1136-
return xerrors.Errorf("create circular buffer: %w", err)
1137-
}
11381127

1139-
ptty, process, err := pty.Start(cmd)
1140-
if err != nil {
1141-
a.metrics.reconnectingPTYErrors.WithLabelValues("start_command").Add(1)
1142-
return xerrors.Errorf("start command: %w", err)
1143-
}
1128+
rpty = reconnectingpty.New(ctx, cmd, &reconnectingpty.Options{
1129+
Timeout: a.reconnectingPTYTimeout,
1130+
Metrics: a.metrics.reconnectingPTYErrors,
1131+
}, logger.With(slog.F("message_id", msg.ID)))
11441132

1145-
ctx, cancel := context.WithCancel(ctx)
1146-
rpty = &reconnectingPTY{
1147-
activeConns: map[string]net.Conn{
1148-
// We have to put the connection in the map instantly otherwise
1149-
// the connection won't be closed if the process instantly dies.
1150-
connectionID: conn,
1151-
},
1152-
ptty: ptty,
1153-
// Timeouts created with an after func can be reset!
1154-
timeout: time.AfterFunc(a.reconnectingPTYTimeout, cancel),
1155-
circularBuffer: circularBuffer,
1156-
}
1157-
// We don't need to separately monitor for the process exiting.
1158-
// When it exits, our ptty.OutputReader() will return EOF after
1159-
// reading all process output.
11601133
if err = a.trackConnGoroutine(func() {
1161-
buffer := make([]byte, 1024)
1162-
for {
1163-
read, err := rpty.ptty.OutputReader().Read(buffer)
1164-
if err != nil {
1165-
// When the PTY is closed, this is triggered.
1166-
// Error is typically a benign EOF, so only log for debugging.
1167-
if errors.Is(err, io.EOF) {
1168-
logger.Debug(ctx, "unable to read pty output, command might have exited", slog.Error(err))
1169-
} else {
1170-
logger.Warn(ctx, "unable to read pty output, command might have exited", slog.Error(err))
1171-
a.metrics.reconnectingPTYErrors.WithLabelValues("output_reader").Add(1)
1172-
}
1173-
break
1174-
}
1175-
part := buffer[:read]
1176-
rpty.circularBufferMutex.Lock()
1177-
_, err = rpty.circularBuffer.Write(part)
1178-
rpty.circularBufferMutex.Unlock()
1179-
if err != nil {
1180-
logger.Error(ctx, "write to circular buffer", slog.Error(err))
1181-
break
1182-
}
1183-
rpty.activeConnsMutex.Lock()
1184-
for cid, conn := range rpty.activeConns {
1185-
_, err = conn.Write(part)
1186-
if err != nil {
1187-
logger.Warn(ctx,
1188-
"error writing to active conn",
1189-
slog.F("other_conn_id", cid),
1190-
slog.Error(err),
1191-
)
1192-
a.metrics.reconnectingPTYErrors.WithLabelValues("write").Add(1)
1193-
}
1194-
}
1195-
rpty.activeConnsMutex.Unlock()
1196-
}
1197-
1198-
// Cleanup the process, PTY, and delete it's
1199-
// ID from memory.
1200-
_ = process.Kill()
1201-
rpty.Close()
1134+
rpty.Wait()
12021135
a.reconnectingPTYs.Delete(msg.ID)
12031136
}); err != nil {
1204-
_ = process.Kill()
1205-
_ = ptty.Close()
1137+
rpty.Close(err.Error())
12061138
return xerrors.Errorf("start routine: %w", err)
12071139
}
1140+
12081141
connected = true
12091142
sendConnected <- rpty
12101143
}
1211-
// Resize the PTY to initial height + width.
1212-
err := rpty.ptty.Resize(msg.Height, msg.Width)
1213-
if err != nil {
1214-
// We can continue after this, it's not fatal!
1215-
logger.Error(ctx, "reconnecting PTY initial resize failed, but will continue", slog.Error(err))
1216-
a.metrics.reconnectingPTYErrors.WithLabelValues("resize").Add(1)
1217-
}
1218-
// Write any previously stored data for the TTY.
1219-
rpty.circularBufferMutex.RLock()
1220-
prevBuf := slices.Clone(rpty.circularBuffer.Bytes())
1221-
rpty.circularBufferMutex.RUnlock()
1222-
// Note that there is a small race here between writing buffered
1223-
// data and storing conn in activeConns. This is likely a very minor
1224-
// edge case, but we should look into ways to avoid it. Holding
1225-
// activeConnsMutex would be one option, but holding this mutex
1226-
// while also holding circularBufferMutex seems dangerous.
1227-
_, err = conn.Write(prevBuf)
1228-
if err != nil {
1229-
a.metrics.reconnectingPTYErrors.WithLabelValues("write").Add(1)
1230-
return xerrors.Errorf("write buffer to conn: %w", err)
1231-
}
1232-
// Multiple connections to the same TTY are permitted.
1233-
// This could easily be used for terminal sharing, but
1234-
// we do it because it's a nice user experience to
1235-
// copy/paste a terminal URL and have it _just work_.
1236-
rpty.activeConnsMutex.Lock()
1237-
rpty.activeConns[connectionID] = conn
1238-
rpty.activeConnsMutex.Unlock()
1239-
// Resetting this timeout prevents the PTY from exiting.
1240-
rpty.timeout.Reset(a.reconnectingPTYTimeout)
1241-
1242-
ctx, cancelFunc := context.WithCancel(ctx)
1243-
defer cancelFunc()
1244-
heartbeat := time.NewTicker(a.reconnectingPTYTimeout / 2)
1245-
defer heartbeat.Stop()
1246-
go func() {
1247-
// Keep updating the activity while this
1248-
// connection is alive!
1249-
for {
1250-
select {
1251-
case <-ctx.Done():
1252-
return
1253-
case <-heartbeat.C:
1254-
}
1255-
rpty.timeout.Reset(a.reconnectingPTYTimeout)
1256-
}
1257-
}()
1258-
defer func() {
1259-
// After this connection ends, remove it from
1260-
// the PTYs active connections. If it isn't
1261-
// removed, all PTY data will be sent to it.
1262-
rpty.activeConnsMutex.Lock()
1263-
delete(rpty.activeConns, connectionID)
1264-
rpty.activeConnsMutex.Unlock()
1265-
}()
1266-
decoder := json.NewDecoder(conn)
1267-
var req codersdk.ReconnectingPTYRequest
1268-
for {
1269-
err = decoder.Decode(&req)
1270-
if xerrors.Is(err, io.EOF) {
1271-
return nil
1272-
}
1273-
if err != nil {
1274-
logger.Warn(ctx, "reconnecting PTY failed with read error", slog.Error(err))
1275-
return nil
1276-
}
1277-
_, err = rpty.ptty.InputWriter().Write([]byte(req.Data))
1278-
if err != nil {
1279-
logger.Warn(ctx, "reconnecting PTY failed with write error", slog.Error(err))
1280-
a.metrics.reconnectingPTYErrors.WithLabelValues("input_writer").Add(1)
1281-
return nil
1282-
}
1283-
// Check if a resize needs to happen!
1284-
if req.Height == 0 || req.Width == 0 {
1285-
continue
1286-
}
1287-
err = rpty.ptty.Resize(req.Height, req.Width)
1288-
if err != nil {
1289-
// We can continue after this, it's not fatal!
1290-
logger.Error(ctx, "reconnecting PTY resize failed, but will continue", slog.Error(err))
1291-
a.metrics.reconnectingPTYErrors.WithLabelValues("resize").Add(1)
1292-
}
1293-
}
1144+
return rpty.Attach(ctx, connectionID, conn, msg.Height, msg.Width, connLogger)
12941145
}
12951146

12961147
// startReportingConnectionStats runs the connection stats reporting goroutine.
@@ -1541,31 +1392,6 @@ lifecycleWaitLoop:
15411392
return nil
15421393
}
15431394

1544-
type reconnectingPTY struct {
1545-
activeConnsMutex sync.Mutex
1546-
activeConns map[string]net.Conn
1547-
1548-
circularBuffer *circbuf.Buffer
1549-
circularBufferMutex sync.RWMutex
1550-
timeout *time.Timer
1551-
ptty pty.PTYCmd
1552-
}
1553-
1554-
// Close ends all connections to the reconnecting
1555-
// PTY and clear the circular buffer.
1556-
func (r *reconnectingPTY) Close() {
1557-
r.activeConnsMutex.Lock()
1558-
defer r.activeConnsMutex.Unlock()
1559-
for _, conn := range r.activeConns {
1560-
_ = conn.Close()
1561-
}
1562-
_ = r.ptty.Close()
1563-
r.circularBufferMutex.Lock()
1564-
r.circularBuffer.Reset()
1565-
r.circularBufferMutex.Unlock()
1566-
r.timeout.Stop()
1567-
}
1568-
15691395
// userHomeDir returns the home directory of the current user, giving
15701396
// priority to the $HOME environment variable.
15711397
func userHomeDir() (string, error) {

0 commit comments

Comments
 (0)