Skip to content

Commit 80b7ff3

Browse files
committed
Merge branch 'main' into mafredri/feat-add-app-usage-to-template-insights
2 parents ca83430 + 41433cd commit 80b7ff3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+18238
-507
lines changed

.github/workflows/ci.yaml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ name: ci
22

33
on:
44
push:
5+
branches:
6+
- main
7+
8+
pull_request:
59
workflow_dispatch:
610

711
permissions:
@@ -20,7 +24,7 @@ permissions:
2024
# additional changes
2125
concurrency:
2226
group: ${{ github.workflow }}-${{ github.ref }}
23-
cancel-in-progress: ${{ github.event_name == 'pull_request' }}
27+
cancel-in-progress: ${{ github.ref != 'refs/heads/main' }}
2428

2529
jobs:
2630
changes:
@@ -43,7 +47,6 @@ jobs:
4347
uses: dorny/paths-filter@v2
4448
id: filter
4549
with:
46-
base: ${{ github.ref }}
4750
filters: |
4851
all:
4952
- "**"

agent/agent.go

Lines changed: 20 additions & 193 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"sync"
2222
"time"
2323

24-
"github.com/armon/circbuf"
2524
"github.com/go-chi/chi/v5"
2625
"github.com/google/uuid"
2726
"github.com/prometheus/client_golang/prometheus"
@@ -36,12 +35,12 @@ import (
3635

3736
"cdr.dev/slog"
3837
"github.com/coder/coder/agent/agentssh"
38+
"github.com/coder/coder/agent/reconnectingpty"
3939
"github.com/coder/coder/buildinfo"
4040
"github.com/coder/coder/coderd/database"
4141
"github.com/coder/coder/coderd/gitauth"
4242
"github.com/coder/coder/codersdk"
4343
"github.com/coder/coder/codersdk/agentsdk"
44-
"github.com/coder/coder/pty"
4544
"github.com/coder/coder/tailnet"
4645
"github.com/coder/retry"
4746
)
@@ -92,9 +91,6 @@ type Agent interface {
9291
}
9392

9493
func New(options Options) Agent {
95-
if options.ReconnectingPTYTimeout == 0 {
96-
options.ReconnectingPTYTimeout = 5 * time.Minute
97-
}
9894
if options.Filesystem == nil {
9995
options.Filesystem = afero.NewOsFs()
10096
}
@@ -762,6 +758,7 @@ func (a *agent) trackConnGoroutine(fn func()) error {
762758

763759
func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *tailcfg.DERPMap, disableDirectConnections bool) (_ *tailnet.Conn, err error) {
764760
network, err := tailnet.NewConn(&tailnet.Options{
761+
ID: agentID,
765762
Addresses: a.wireguardAddresses(agentID),
766763
DERPMap: derpMap,
767764
Logger: a.logger.Named("net.tailnet"),
@@ -1075,8 +1072,8 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
10751072
defer a.connCountReconnectingPTY.Add(-1)
10761073

10771074
connectionID := uuid.NewString()
1078-
logger = logger.With(slog.F("message_id", msg.ID), slog.F("connection_id", connectionID))
1079-
logger.Debug(ctx, "starting handler")
1075+
connLogger := logger.With(slog.F("message_id", msg.ID), slog.F("connection_id", connectionID))
1076+
connLogger.Debug(ctx, "starting handler")
10801077

10811078
defer func() {
10821079
if err := retErr; err != nil {
@@ -1087,22 +1084,22 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
10871084
// If the agent is closed, we don't want to
10881085
// log this as an error since it's expected.
10891086
if closed {
1090-
logger.Debug(ctx, "reconnecting PTY failed with session error (agent closed)", slog.Error(err))
1087+
connLogger.Debug(ctx, "reconnecting pty failed with attach error (agent closed)", slog.Error(err))
10911088
} else {
1092-
logger.Error(ctx, "reconnecting PTY failed with session error", slog.Error(err))
1089+
connLogger.Error(ctx, "reconnecting pty failed with attach error", slog.Error(err))
10931090
}
10941091
}
1095-
logger.Debug(ctx, "session closed")
1092+
connLogger.Debug(ctx, "reconnecting pty connection closed")
10961093
}()
10971094

1098-
var rpty *reconnectingPTY
1099-
sendConnected := make(chan *reconnectingPTY, 1)
1095+
var rpty reconnectingpty.ReconnectingPTY
1096+
sendConnected := make(chan reconnectingpty.ReconnectingPTY, 1)
11001097
// On store, reserve this ID to prevent multiple concurrent new connections.
11011098
waitReady, ok := a.reconnectingPTYs.LoadOrStore(msg.ID, sendConnected)
11021099
if ok {
11031100
close(sendConnected) // Unused.
1104-
logger.Debug(ctx, "connecting to existing session")
1105-
c, ok := waitReady.(chan *reconnectingPTY)
1101+
connLogger.Debug(ctx, "connecting to existing reconnecting pty")
1102+
c, ok := waitReady.(chan reconnectingpty.ReconnectingPTY)
11061103
if !ok {
11071104
return xerrors.Errorf("found invalid type in reconnecting pty map: %T", waitReady)
11081105
}
@@ -1112,7 +1109,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
11121109
}
11131110
c <- rpty // Put it back for the next reconnect.
11141111
} else {
1115-
logger.Debug(ctx, "creating new session")
1112+
connLogger.Debug(ctx, "creating new reconnecting pty")
11161113

11171114
connected := false
11181115
defer func() {
@@ -1128,169 +1125,24 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
11281125
a.metrics.reconnectingPTYErrors.WithLabelValues("create_command").Add(1)
11291126
return xerrors.Errorf("create command: %w", err)
11301127
}
1131-
cmd.Env = append(cmd.Env, "TERM=xterm-256color")
1132-
1133-
// Default to buffer 64KiB.
1134-
circularBuffer, err := circbuf.NewBuffer(64 << 10)
1135-
if err != nil {
1136-
return xerrors.Errorf("create circular buffer: %w", err)
1137-
}
11381128

1139-
ptty, process, err := pty.Start(cmd)
1140-
if err != nil {
1141-
a.metrics.reconnectingPTYErrors.WithLabelValues("start_command").Add(1)
1142-
return xerrors.Errorf("start command: %w", err)
1143-
}
1129+
rpty = reconnectingpty.New(ctx, cmd, &reconnectingpty.Options{
1130+
Timeout: a.reconnectingPTYTimeout,
1131+
Metrics: a.metrics.reconnectingPTYErrors,
1132+
}, logger.With(slog.F("message_id", msg.ID)))
11441133

1145-
ctx, cancel := context.WithCancel(ctx)
1146-
rpty = &reconnectingPTY{
1147-
activeConns: map[string]net.Conn{
1148-
// We have to put the connection in the map instantly otherwise
1149-
// the connection won't be closed if the process instantly dies.
1150-
connectionID: conn,
1151-
},
1152-
ptty: ptty,
1153-
// Timeouts created with an after func can be reset!
1154-
timeout: time.AfterFunc(a.reconnectingPTYTimeout, cancel),
1155-
circularBuffer: circularBuffer,
1156-
}
1157-
// We don't need to separately monitor for the process exiting.
1158-
// When it exits, our ptty.OutputReader() will return EOF after
1159-
// reading all process output.
11601134
if err = a.trackConnGoroutine(func() {
1161-
buffer := make([]byte, 1024)
1162-
for {
1163-
read, err := rpty.ptty.OutputReader().Read(buffer)
1164-
if err != nil {
1165-
// When the PTY is closed, this is triggered.
1166-
// Error is typically a benign EOF, so only log for debugging.
1167-
if errors.Is(err, io.EOF) {
1168-
logger.Debug(ctx, "unable to read pty output, command might have exited", slog.Error(err))
1169-
} else {
1170-
logger.Warn(ctx, "unable to read pty output, command might have exited", slog.Error(err))
1171-
a.metrics.reconnectingPTYErrors.WithLabelValues("output_reader").Add(1)
1172-
}
1173-
break
1174-
}
1175-
part := buffer[:read]
1176-
rpty.circularBufferMutex.Lock()
1177-
_, err = rpty.circularBuffer.Write(part)
1178-
rpty.circularBufferMutex.Unlock()
1179-
if err != nil {
1180-
logger.Error(ctx, "write to circular buffer", slog.Error(err))
1181-
break
1182-
}
1183-
rpty.activeConnsMutex.Lock()
1184-
for cid, conn := range rpty.activeConns {
1185-
_, err = conn.Write(part)
1186-
if err != nil {
1187-
logger.Warn(ctx,
1188-
"error writing to active conn",
1189-
slog.F("other_conn_id", cid),
1190-
slog.Error(err),
1191-
)
1192-
a.metrics.reconnectingPTYErrors.WithLabelValues("write").Add(1)
1193-
}
1194-
}
1195-
rpty.activeConnsMutex.Unlock()
1196-
}
1197-
1198-
// Cleanup the process, PTY, and delete it's
1199-
// ID from memory.
1200-
_ = process.Kill()
1201-
rpty.Close()
1135+
rpty.Wait()
12021136
a.reconnectingPTYs.Delete(msg.ID)
12031137
}); err != nil {
1204-
_ = process.Kill()
1205-
_ = ptty.Close()
1138+
rpty.Close(err)
12061139
return xerrors.Errorf("start routine: %w", err)
12071140
}
1141+
12081142
connected = true
12091143
sendConnected <- rpty
12101144
}
1211-
// Resize the PTY to initial height + width.
1212-
err := rpty.ptty.Resize(msg.Height, msg.Width)
1213-
if err != nil {
1214-
// We can continue after this, it's not fatal!
1215-
logger.Error(ctx, "reconnecting PTY initial resize failed, but will continue", slog.Error(err))
1216-
a.metrics.reconnectingPTYErrors.WithLabelValues("resize").Add(1)
1217-
}
1218-
// Write any previously stored data for the TTY.
1219-
rpty.circularBufferMutex.RLock()
1220-
prevBuf := slices.Clone(rpty.circularBuffer.Bytes())
1221-
rpty.circularBufferMutex.RUnlock()
1222-
// Note that there is a small race here between writing buffered
1223-
// data and storing conn in activeConns. This is likely a very minor
1224-
// edge case, but we should look into ways to avoid it. Holding
1225-
// activeConnsMutex would be one option, but holding this mutex
1226-
// while also holding circularBufferMutex seems dangerous.
1227-
_, err = conn.Write(prevBuf)
1228-
if err != nil {
1229-
a.metrics.reconnectingPTYErrors.WithLabelValues("write").Add(1)
1230-
return xerrors.Errorf("write buffer to conn: %w", err)
1231-
}
1232-
// Multiple connections to the same TTY are permitted.
1233-
// This could easily be used for terminal sharing, but
1234-
// we do it because it's a nice user experience to
1235-
// copy/paste a terminal URL and have it _just work_.
1236-
rpty.activeConnsMutex.Lock()
1237-
rpty.activeConns[connectionID] = conn
1238-
rpty.activeConnsMutex.Unlock()
1239-
// Resetting this timeout prevents the PTY from exiting.
1240-
rpty.timeout.Reset(a.reconnectingPTYTimeout)
1241-
1242-
ctx, cancelFunc := context.WithCancel(ctx)
1243-
defer cancelFunc()
1244-
heartbeat := time.NewTicker(a.reconnectingPTYTimeout / 2)
1245-
defer heartbeat.Stop()
1246-
go func() {
1247-
// Keep updating the activity while this
1248-
// connection is alive!
1249-
for {
1250-
select {
1251-
case <-ctx.Done():
1252-
return
1253-
case <-heartbeat.C:
1254-
}
1255-
rpty.timeout.Reset(a.reconnectingPTYTimeout)
1256-
}
1257-
}()
1258-
defer func() {
1259-
// After this connection ends, remove it from
1260-
// the PTYs active connections. If it isn't
1261-
// removed, all PTY data will be sent to it.
1262-
rpty.activeConnsMutex.Lock()
1263-
delete(rpty.activeConns, connectionID)
1264-
rpty.activeConnsMutex.Unlock()
1265-
}()
1266-
decoder := json.NewDecoder(conn)
1267-
var req codersdk.ReconnectingPTYRequest
1268-
for {
1269-
err = decoder.Decode(&req)
1270-
if xerrors.Is(err, io.EOF) {
1271-
return nil
1272-
}
1273-
if err != nil {
1274-
logger.Warn(ctx, "reconnecting PTY failed with read error", slog.Error(err))
1275-
return nil
1276-
}
1277-
_, err = rpty.ptty.InputWriter().Write([]byte(req.Data))
1278-
if err != nil {
1279-
logger.Warn(ctx, "reconnecting PTY failed with write error", slog.Error(err))
1280-
a.metrics.reconnectingPTYErrors.WithLabelValues("input_writer").Add(1)
1281-
return nil
1282-
}
1283-
// Check if a resize needs to happen!
1284-
if req.Height == 0 || req.Width == 0 {
1285-
continue
1286-
}
1287-
err = rpty.ptty.Resize(req.Height, req.Width)
1288-
if err != nil {
1289-
// We can continue after this, it's not fatal!
1290-
logger.Error(ctx, "reconnecting PTY resize failed, but will continue", slog.Error(err))
1291-
a.metrics.reconnectingPTYErrors.WithLabelValues("resize").Add(1)
1292-
}
1293-
}
1145+
return rpty.Attach(ctx, connectionID, conn, msg.Height, msg.Width, connLogger)
12941146
}
12951147

12961148
// startReportingConnectionStats runs the connection stats reporting goroutine.
@@ -1541,31 +1393,6 @@ lifecycleWaitLoop:
15411393
return nil
15421394
}
15431395

1544-
type reconnectingPTY struct {
1545-
activeConnsMutex sync.Mutex
1546-
activeConns map[string]net.Conn
1547-
1548-
circularBuffer *circbuf.Buffer
1549-
circularBufferMutex sync.RWMutex
1550-
timeout *time.Timer
1551-
ptty pty.PTYCmd
1552-
}
1553-
1554-
// Close ends all connections to the reconnecting
1555-
// PTY and clear the circular buffer.
1556-
func (r *reconnectingPTY) Close() {
1557-
r.activeConnsMutex.Lock()
1558-
defer r.activeConnsMutex.Unlock()
1559-
for _, conn := range r.activeConns {
1560-
_ = conn.Close()
1561-
}
1562-
_ = r.ptty.Close()
1563-
r.circularBufferMutex.Lock()
1564-
r.circularBuffer.Reset()
1565-
r.circularBufferMutex.Unlock()
1566-
r.timeout.Stop()
1567-
}
1568-
15691396
// userHomeDir returns the home directory of the current user, giving
15701397
// priority to the $HOME environment variable.
15711398
func userHomeDir() (string, error) {

0 commit comments

Comments
 (0)