Skip to content

Commit b79785c

Browse files
authored
feat: move agent v2 API connection monitoring to yamux layer (#11910)
Moves monitoring of the agent v2 API connection to the yamux layer. Present behavior monitors this at the websocket layer, and closes the websocket on completion. This can cause yamux to hit unexpected errors since the connection is closed underneath it. This might be the cause of yamux errors that some customers are seeing ![image.png](https://graphite-user-uploaded-assets-prod.s3.amazonaws.com/tCz4CxRU9jhAJ7zH8RTi/53b8b5ef-e9e5-44a5-b559-99c37c136071.png) In any case, it's more graceful to close yamux first and let yamux close the underlying websocket. That should limit yamux error logging to truly unexpected/error cases. The only downside is that the yamux `Close()` doesn't accept a reason, so if the agent becomes outdated and we close the API connection, the agent just sees the connection close without a reason. I'm not sure we log this at the agent anyway, but it would be nice. I think more accurate logging on Coderd are more important. I've also added some logging when the monitor disconnects for reasons other than the context being canceled (e.g. agent outdated, failed pings).
1 parent 13e214f commit b79785c

File tree

2 files changed

+71
-23
lines changed

2 files changed

+71
-23
lines changed

coderd/workspaceagentsrpc.go

+61-13
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,9 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
113113
)
114114
api.Logger.Debug(ctx, "accepting agent details", slog.F("agent", workspaceAgent))
115115

116-
defer conn.Close(websocket.StatusNormalClosure, "")
117-
118116
closeCtx, closeCtxCancel := context.WithCancel(ctx)
119117
defer closeCtxCancel()
120-
monitor := api.startAgentWebsocketMonitor(closeCtx, workspaceAgent, build, conn)
118+
monitor := api.startAgentYamuxMonitor(closeCtx, workspaceAgent, build, mux)
121119
defer monitor.close()
122120

123121
agentAPI := agentapi.New(agentapi.Options{
@@ -214,8 +212,8 @@ func checkBuildIsLatest(ctx context.Context, db database.Store, build database.W
214212
func (api *API) startAgentWebsocketMonitor(ctx context.Context,
215213
workspaceAgent database.WorkspaceAgent, workspaceBuild database.WorkspaceBuild,
216214
conn *websocket.Conn,
217-
) *agentWebsocketMonitor {
218-
monitor := &agentWebsocketMonitor{
215+
) *agentConnectionMonitor {
216+
monitor := &agentConnectionMonitor{
219217
apiCtx: api.ctx,
220218
workspaceAgent: workspaceAgent,
221219
workspaceBuild: workspaceBuild,
@@ -236,6 +234,53 @@ func (api *API) startAgentWebsocketMonitor(ctx context.Context,
236234
return monitor
237235
}
238236

237+
type yamuxPingerCloser struct {
238+
mux *yamux.Session
239+
}
240+
241+
func (y *yamuxPingerCloser) Close(websocket.StatusCode, string) error {
242+
return y.mux.Close()
243+
}
244+
245+
func (y *yamuxPingerCloser) Ping(ctx context.Context) error {
246+
errCh := make(chan error, 1)
247+
go func() {
248+
_, err := y.mux.Ping()
249+
errCh <- err
250+
}()
251+
select {
252+
case <-ctx.Done():
253+
return ctx.Err()
254+
case err := <-errCh:
255+
return err
256+
}
257+
}
258+
259+
func (api *API) startAgentYamuxMonitor(ctx context.Context,
260+
workspaceAgent database.WorkspaceAgent, workspaceBuild database.WorkspaceBuild,
261+
mux *yamux.Session,
262+
) *agentConnectionMonitor {
263+
monitor := &agentConnectionMonitor{
264+
apiCtx: api.ctx,
265+
workspaceAgent: workspaceAgent,
266+
workspaceBuild: workspaceBuild,
267+
conn: &yamuxPingerCloser{mux: mux},
268+
pingPeriod: api.AgentConnectionUpdateFrequency,
269+
db: api.Database,
270+
replicaID: api.ID,
271+
updater: api,
272+
disconnectTimeout: api.AgentInactiveDisconnectTimeout,
273+
logger: api.Logger.With(
274+
slog.F("workspace_id", workspaceBuild.WorkspaceID),
275+
slog.F("agent_id", workspaceAgent.ID),
276+
),
277+
}
278+
monitor.init()
279+
monitor.start(ctx)
280+
281+
return monitor
282+
}
283+
239284
type workspaceUpdater interface {
240285
publishWorkspaceUpdate(ctx context.Context, workspaceID uuid.UUID)
241286
}
@@ -245,7 +290,7 @@ type pingerCloser interface {
245290
Close(code websocket.StatusCode, reason string) error
246291
}
247292

248-
type agentWebsocketMonitor struct {
293+
type agentConnectionMonitor struct {
249294
apiCtx context.Context
250295
cancel context.CancelFunc
251296
wg sync.WaitGroup
@@ -272,7 +317,7 @@ type agentWebsocketMonitor struct {
272317
//
273318
// We use a custom heartbeat routine here instead of `httpapi.Heartbeat`
274319
// because we want to log the agent's last ping time.
275-
func (m *agentWebsocketMonitor) sendPings(ctx context.Context) {
320+
func (m *agentConnectionMonitor) sendPings(ctx context.Context) {
276321
t := time.NewTicker(m.pingPeriod)
277322
defer t.Stop()
278323

@@ -295,7 +340,7 @@ func (m *agentWebsocketMonitor) sendPings(ctx context.Context) {
295340
}
296341
}
297342

298-
func (m *agentWebsocketMonitor) updateConnectionTimes(ctx context.Context) error {
343+
func (m *agentConnectionMonitor) updateConnectionTimes(ctx context.Context) error {
299344
//nolint:gocritic // We only update the agent we are minding.
300345
err := m.db.UpdateWorkspaceAgentConnectionByID(dbauthz.AsSystemRestricted(ctx), database.UpdateWorkspaceAgentConnectionByIDParams{
301346
ID: m.workspaceAgent.ID,
@@ -314,7 +359,7 @@ func (m *agentWebsocketMonitor) updateConnectionTimes(ctx context.Context) error
314359
return nil
315360
}
316361

317-
func (m *agentWebsocketMonitor) init() {
362+
func (m *agentConnectionMonitor) init() {
318363
now := dbtime.Now()
319364
m.firstConnectedAt = m.workspaceAgent.FirstConnectedAt
320365
if !m.firstConnectedAt.Valid {
@@ -331,7 +376,7 @@ func (m *agentWebsocketMonitor) init() {
331376
m.lastPing.Store(ptr.Ref(time.Now())) // Since the agent initiated the request, assume it's alive.
332377
}
333378

334-
func (m *agentWebsocketMonitor) start(ctx context.Context) {
379+
func (m *agentConnectionMonitor) start(ctx context.Context) {
335380
ctx, m.cancel = context.WithCancel(ctx)
336381
m.wg.Add(2)
337382
go pprof.Do(ctx, pprof.Labels("agent", m.workspaceAgent.ID.String()),
@@ -346,7 +391,7 @@ func (m *agentWebsocketMonitor) start(ctx context.Context) {
346391
})
347392
}
348393

349-
func (m *agentWebsocketMonitor) monitor(ctx context.Context) {
394+
func (m *agentConnectionMonitor) monitor(ctx context.Context) {
350395
defer func() {
351396
// If connection closed then context will be canceled, try to
352397
// ensure our final update is sent. By waiting at most the agent
@@ -384,7 +429,7 @@ func (m *agentWebsocketMonitor) monitor(ctx context.Context) {
384429
}()
385430
reason := "disconnect"
386431
defer func() {
387-
m.logger.Debug(ctx, "agent websocket monitor is closing connection",
432+
m.logger.Debug(ctx, "agent connection monitor is closing connection",
388433
slog.F("reason", reason))
389434
_ = m.conn.Close(websocket.StatusGoingAway, reason)
390435
}()
@@ -409,6 +454,7 @@ func (m *agentWebsocketMonitor) monitor(ctx context.Context) {
409454
lastPing := *m.lastPing.Load()
410455
if time.Since(lastPing) > m.disconnectTimeout {
411456
reason = "ping timeout"
457+
m.logger.Warn(ctx, "connection to agent timed out")
412458
return
413459
}
414460
connectionStatusChanged := m.disconnectedAt.Valid
@@ -421,6 +467,7 @@ func (m *agentWebsocketMonitor) monitor(ctx context.Context) {
421467
err = m.updateConnectionTimes(ctx)
422468
if err != nil {
423469
reason = err.Error()
470+
m.logger.Error(ctx, "failed to update agent connection times", slog.Error(err))
424471
return
425472
}
426473
if connectionStatusChanged {
@@ -429,12 +476,13 @@ func (m *agentWebsocketMonitor) monitor(ctx context.Context) {
429476
err = checkBuildIsLatest(ctx, m.db, m.workspaceBuild)
430477
if err != nil {
431478
reason = err.Error()
479+
m.logger.Info(ctx, "disconnected possibly outdated agent", slog.Error(err))
432480
return
433481
}
434482
}
435483
}
436484

437-
func (m *agentWebsocketMonitor) close() {
485+
func (m *agentConnectionMonitor) close() {
438486
m.cancel()
439487
m.wg.Wait()
440488
}

coderd/workspaceagentsrpc_internal_test.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"github.com/coder/coder/v2/testutil"
2424
)
2525

26-
func TestAgentWebsocketMonitor_ContextCancel(t *testing.T) {
26+
func TestAgentConnectionMonitor_ContextCancel(t *testing.T) {
2727
t.Parallel()
2828
ctx := testutil.Context(t, testutil.WaitShort)
2929
now := dbtime.Now()
@@ -45,7 +45,7 @@ func TestAgentWebsocketMonitor_ContextCancel(t *testing.T) {
4545
}
4646
replicaID := uuid.New()
4747

48-
uut := &agentWebsocketMonitor{
48+
uut := &agentConnectionMonitor{
4949
apiCtx: ctx,
5050
workspaceAgent: agent,
5151
workspaceBuild: build,
@@ -97,7 +97,7 @@ func TestAgentWebsocketMonitor_ContextCancel(t *testing.T) {
9797
require.Greater(t, m, n)
9898
}
9999

100-
func TestAgentWebsocketMonitor_PingTimeout(t *testing.T) {
100+
func TestAgentConnectionMonitor_PingTimeout(t *testing.T) {
101101
t.Parallel()
102102
ctx := testutil.Context(t, testutil.WaitShort)
103103
now := dbtime.Now()
@@ -119,7 +119,7 @@ func TestAgentWebsocketMonitor_PingTimeout(t *testing.T) {
119119
}
120120
replicaID := uuid.New()
121121

122-
uut := &agentWebsocketMonitor{
122+
uut := &agentConnectionMonitor{
123123
apiCtx: ctx,
124124
workspaceAgent: agent,
125125
workspaceBuild: build,
@@ -157,7 +157,7 @@ func TestAgentWebsocketMonitor_PingTimeout(t *testing.T) {
157157
fUpdater.requireEventuallySomeUpdates(t, build.WorkspaceID)
158158
}
159159

160-
func TestAgentWebsocketMonitor_BuildOutdated(t *testing.T) {
160+
func TestAgentConnectionMonitor_BuildOutdated(t *testing.T) {
161161
t.Parallel()
162162
ctx := testutil.Context(t, testutil.WaitShort)
163163
now := dbtime.Now()
@@ -179,7 +179,7 @@ func TestAgentWebsocketMonitor_BuildOutdated(t *testing.T) {
179179
}
180180
replicaID := uuid.New()
181181

182-
uut := &agentWebsocketMonitor{
182+
uut := &agentConnectionMonitor{
183183
apiCtx: ctx,
184184
workspaceAgent: agent,
185185
workspaceBuild: build,
@@ -217,12 +217,12 @@ func TestAgentWebsocketMonitor_BuildOutdated(t *testing.T) {
217217
fUpdater.requireEventuallySomeUpdates(t, build.WorkspaceID)
218218
}
219219

220-
func TestAgentWebsocketMonitor_SendPings(t *testing.T) {
220+
func TestAgentConnectionMonitor_SendPings(t *testing.T) {
221221
t.Parallel()
222222
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
223223
t.Cleanup(cancel)
224224
fConn := &fakePingerCloser{}
225-
uut := &agentWebsocketMonitor{
225+
uut := &agentConnectionMonitor{
226226
pingPeriod: testutil.IntervalFast,
227227
conn: fConn,
228228
}
@@ -238,7 +238,7 @@ func TestAgentWebsocketMonitor_SendPings(t *testing.T) {
238238
require.NotNil(t, lastPing)
239239
}
240240

241-
func TestAgentWebsocketMonitor_StartClose(t *testing.T) {
241+
func TestAgentConnectionMonitor_StartClose(t *testing.T) {
242242
t.Parallel()
243243
ctx := testutil.Context(t, testutil.WaitShort)
244244
fConn := &fakePingerCloser{}
@@ -259,7 +259,7 @@ func TestAgentWebsocketMonitor_StartClose(t *testing.T) {
259259
WorkspaceID: uuid.New(),
260260
}
261261
replicaID := uuid.New()
262-
uut := &agentWebsocketMonitor{
262+
uut := &agentConnectionMonitor{
263263
apiCtx: ctx,
264264
workspaceAgent: agent,
265265
workspaceBuild: build,

0 commit comments

Comments
 (0)