@@ -113,11 +113,9 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
113
113
)
114
114
api .Logger .Debug (ctx , "accepting agent details" , slog .F ("agent" , workspaceAgent ))
115
115
116
- defer conn .Close (websocket .StatusNormalClosure , "" )
117
-
118
116
closeCtx , closeCtxCancel := context .WithCancel (ctx )
119
117
defer closeCtxCancel ()
120
- monitor := api .startAgentWebsocketMonitor (closeCtx , workspaceAgent , build , conn )
118
+ monitor := api .startAgentYamuxMonitor (closeCtx , workspaceAgent , build , mux )
121
119
defer monitor .close ()
122
120
123
121
agentAPI := agentapi .New (agentapi.Options {
@@ -214,8 +212,8 @@ func checkBuildIsLatest(ctx context.Context, db database.Store, build database.W
214
212
func (api * API ) startAgentWebsocketMonitor (ctx context.Context ,
215
213
workspaceAgent database.WorkspaceAgent , workspaceBuild database.WorkspaceBuild ,
216
214
conn * websocket.Conn ,
217
- ) * agentWebsocketMonitor {
218
- monitor := & agentWebsocketMonitor {
215
+ ) * agentConnectionMonitor {
216
+ monitor := & agentConnectionMonitor {
219
217
apiCtx : api .ctx ,
220
218
workspaceAgent : workspaceAgent ,
221
219
workspaceBuild : workspaceBuild ,
@@ -236,6 +234,53 @@ func (api *API) startAgentWebsocketMonitor(ctx context.Context,
236
234
return monitor
237
235
}
238
236
237
+ type yamuxPingerCloser struct {
238
+ mux * yamux.Session
239
+ }
240
+
241
+ func (y * yamuxPingerCloser ) Close (websocket.StatusCode , string ) error {
242
+ return y .mux .Close ()
243
+ }
244
+
245
+ func (y * yamuxPingerCloser ) Ping (ctx context.Context ) error {
246
+ errCh := make (chan error , 1 )
247
+ go func () {
248
+ _ , err := y .mux .Ping ()
249
+ errCh <- err
250
+ }()
251
+ select {
252
+ case <- ctx .Done ():
253
+ return ctx .Err ()
254
+ case err := <- errCh :
255
+ return err
256
+ }
257
+ }
258
+
259
+ func (api * API ) startAgentYamuxMonitor (ctx context.Context ,
260
+ workspaceAgent database.WorkspaceAgent , workspaceBuild database.WorkspaceBuild ,
261
+ mux * yamux.Session ,
262
+ ) * agentConnectionMonitor {
263
+ monitor := & agentConnectionMonitor {
264
+ apiCtx : api .ctx ,
265
+ workspaceAgent : workspaceAgent ,
266
+ workspaceBuild : workspaceBuild ,
267
+ conn : & yamuxPingerCloser {mux : mux },
268
+ pingPeriod : api .AgentConnectionUpdateFrequency ,
269
+ db : api .Database ,
270
+ replicaID : api .ID ,
271
+ updater : api ,
272
+ disconnectTimeout : api .AgentInactiveDisconnectTimeout ,
273
+ logger : api .Logger .With (
274
+ slog .F ("workspace_id" , workspaceBuild .WorkspaceID ),
275
+ slog .F ("agent_id" , workspaceAgent .ID ),
276
+ ),
277
+ }
278
+ monitor .init ()
279
+ monitor .start (ctx )
280
+
281
+ return monitor
282
+ }
283
+
239
284
type workspaceUpdater interface {
240
285
publishWorkspaceUpdate (ctx context.Context , workspaceID uuid.UUID )
241
286
}
@@ -245,7 +290,7 @@ type pingerCloser interface {
245
290
Close (code websocket.StatusCode , reason string ) error
246
291
}
247
292
248
- type agentWebsocketMonitor struct {
293
+ type agentConnectionMonitor struct {
249
294
apiCtx context.Context
250
295
cancel context.CancelFunc
251
296
wg sync.WaitGroup
@@ -272,7 +317,7 @@ type agentWebsocketMonitor struct {
272
317
//
273
318
// We use a custom heartbeat routine here instead of `httpapi.Heartbeat`
274
319
// because we want to log the agent's last ping time.
275
- func (m * agentWebsocketMonitor ) sendPings (ctx context.Context ) {
320
+ func (m * agentConnectionMonitor ) sendPings (ctx context.Context ) {
276
321
t := time .NewTicker (m .pingPeriod )
277
322
defer t .Stop ()
278
323
@@ -295,7 +340,7 @@ func (m *agentWebsocketMonitor) sendPings(ctx context.Context) {
295
340
}
296
341
}
297
342
298
- func (m * agentWebsocketMonitor ) updateConnectionTimes (ctx context.Context ) error {
343
+ func (m * agentConnectionMonitor ) updateConnectionTimes (ctx context.Context ) error {
299
344
//nolint:gocritic // We only update the agent we are minding.
300
345
err := m .db .UpdateWorkspaceAgentConnectionByID (dbauthz .AsSystemRestricted (ctx ), database.UpdateWorkspaceAgentConnectionByIDParams {
301
346
ID : m .workspaceAgent .ID ,
@@ -314,7 +359,7 @@ func (m *agentWebsocketMonitor) updateConnectionTimes(ctx context.Context) error
314
359
return nil
315
360
}
316
361
317
- func (m * agentWebsocketMonitor ) init () {
362
+ func (m * agentConnectionMonitor ) init () {
318
363
now := dbtime .Now ()
319
364
m .firstConnectedAt = m .workspaceAgent .FirstConnectedAt
320
365
if ! m .firstConnectedAt .Valid {
@@ -331,7 +376,7 @@ func (m *agentWebsocketMonitor) init() {
331
376
m .lastPing .Store (ptr .Ref (time .Now ())) // Since the agent initiated the request, assume it's alive.
332
377
}
333
378
334
- func (m * agentWebsocketMonitor ) start (ctx context.Context ) {
379
+ func (m * agentConnectionMonitor ) start (ctx context.Context ) {
335
380
ctx , m .cancel = context .WithCancel (ctx )
336
381
m .wg .Add (2 )
337
382
go pprof .Do (ctx , pprof .Labels ("agent" , m .workspaceAgent .ID .String ()),
@@ -346,7 +391,7 @@ func (m *agentWebsocketMonitor) start(ctx context.Context) {
346
391
})
347
392
}
348
393
349
- func (m * agentWebsocketMonitor ) monitor (ctx context.Context ) {
394
+ func (m * agentConnectionMonitor ) monitor (ctx context.Context ) {
350
395
defer func () {
351
396
// If connection closed then context will be canceled, try to
352
397
// ensure our final update is sent. By waiting at most the agent
@@ -384,7 +429,7 @@ func (m *agentWebsocketMonitor) monitor(ctx context.Context) {
384
429
}()
385
430
reason := "disconnect"
386
431
defer func () {
387
- m .logger .Debug (ctx , "agent websocket monitor is closing connection" ,
432
+ m .logger .Debug (ctx , "agent connection monitor is closing connection" ,
388
433
slog .F ("reason" , reason ))
389
434
_ = m .conn .Close (websocket .StatusGoingAway , reason )
390
435
}()
@@ -409,6 +454,7 @@ func (m *agentWebsocketMonitor) monitor(ctx context.Context) {
409
454
lastPing := * m .lastPing .Load ()
410
455
if time .Since (lastPing ) > m .disconnectTimeout {
411
456
reason = "ping timeout"
457
+ m .logger .Warn (ctx , "connection to agent timed out" )
412
458
return
413
459
}
414
460
connectionStatusChanged := m .disconnectedAt .Valid
@@ -421,6 +467,7 @@ func (m *agentWebsocketMonitor) monitor(ctx context.Context) {
421
467
err = m .updateConnectionTimes (ctx )
422
468
if err != nil {
423
469
reason = err .Error ()
470
+ m .logger .Error (ctx , "failed to update agent connection times" , slog .Error (err ))
424
471
return
425
472
}
426
473
if connectionStatusChanged {
@@ -429,12 +476,13 @@ func (m *agentWebsocketMonitor) monitor(ctx context.Context) {
429
476
err = checkBuildIsLatest (ctx , m .db , m .workspaceBuild )
430
477
if err != nil {
431
478
reason = err .Error ()
479
+ m .logger .Info (ctx , "disconnected possibly outdated agent" , slog .Error (err ))
432
480
return
433
481
}
434
482
}
435
483
}
436
484
437
- func (m * agentWebsocketMonitor ) close () {
485
+ func (m * agentConnectionMonitor ) close () {
438
486
m .cancel ()
439
487
m .wg .Wait ()
440
488
}
0 commit comments