@@ -1094,6 +1094,7 @@ func convertScripts(dbScripts []database.WorkspaceAgentScript) []codersdk.Worksp
1094
1094
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
1095
1095
// @Router /workspaceagents/{workspaceagent}/watch-metadata [get]
1096
1096
// @x-apidocgen {"skip": true}
1097
+ // @Deprecated Use /workspaceagents/{workspaceagent}/watch-metadata-ws instead
1097
1098
func (api * API ) watchWorkspaceAgentMetadata (rw http.ResponseWriter , r * http.Request ) {
1098
1099
// Allow us to interrupt watch via cancel.
1099
1100
ctx , cancel := context .WithCancel (r .Context ())
@@ -1273,6 +1274,193 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
1273
1274
}
1274
1275
}
1275
1276
1277
+ // @Summary Watch for workspace agent metadata updates
1278
+ // @ID watch-for-workspace-agent-metadata-updates
1279
+ // @Security CoderSessionToken
1280
+ // @Tags Agents
1281
+ // @Success 200 "Success"
1282
+ // @Param workspaceagent path string true "Workspace agent ID" format(uuid)
1283
+ // @Router /workspaceagents/{workspaceagent}/watch-metadata [get]
1284
+ // @x-apidocgen {"skip": true}
1285
+ func (api * API ) watchWorkspaceAgentMetadataWs (rw http.ResponseWriter , r * http.Request ) {
1286
+ // Allow us to interrupt watch via cancel.
1287
+ ctx , cancel := context .WithCancel (r .Context ())
1288
+ defer cancel ()
1289
+ r = r .WithContext (ctx ) // Rewire context for SSE cancellation.
1290
+
1291
+ workspaceAgent := httpmw .WorkspaceAgentParam (r )
1292
+ log := api .Logger .Named ("workspace_metadata_watcher" ).With (
1293
+ slog .F ("workspace_agent_id" , workspaceAgent .ID ),
1294
+ )
1295
+
1296
+ // Send metadata on updates, we must ensure subscription before sending
1297
+ // initial metadata to guarantee that events in-between are not missed.
1298
+ update := make (chan agentapi.WorkspaceAgentMetadataChannelPayload , 1 )
1299
+ cancelSub , err := api .Pubsub .Subscribe (agentapi .WatchWorkspaceAgentMetadataChannel (workspaceAgent .ID ), func (_ context.Context , byt []byte ) {
1300
+ if ctx .Err () != nil {
1301
+ return
1302
+ }
1303
+
1304
+ var payload agentapi.WorkspaceAgentMetadataChannelPayload
1305
+ err := json .Unmarshal (byt , & payload )
1306
+ if err != nil {
1307
+ log .Error (ctx , "failed to unmarshal pubsub message" , slog .Error (err ))
1308
+ return
1309
+ }
1310
+
1311
+ log .Debug (ctx , "received metadata update" , "payload" , payload )
1312
+
1313
+ select {
1314
+ case prev := <- update :
1315
+ payload .Keys = appendUnique (prev .Keys , payload .Keys )
1316
+ default :
1317
+ }
1318
+ // This can never block since we pop and merge beforehand.
1319
+ update <- payload
1320
+ })
1321
+ if err != nil {
1322
+ httpapi .InternalServerError (rw , err )
1323
+ return
1324
+ }
1325
+ defer cancelSub ()
1326
+
1327
+ // We always use the original Request context because it contains
1328
+ // the RBAC actor.
1329
+ initialMD , err := api .Database .GetWorkspaceAgentMetadata (ctx , database.GetWorkspaceAgentMetadataParams {
1330
+ WorkspaceAgentID : workspaceAgent .ID ,
1331
+ Keys : nil ,
1332
+ })
1333
+ if err != nil {
1334
+ // If we can't successfully pull the initial metadata, pubsub
1335
+ // updates will be no-op so we may as well terminate the
1336
+ // connection early.
1337
+ httpapi .InternalServerError (rw , err )
1338
+ return
1339
+ }
1340
+
1341
+ log .Debug (ctx , "got initial metadata" , "num" , len (initialMD ))
1342
+
1343
+ metadataMap := make (map [string ]database.WorkspaceAgentMetadatum , len (initialMD ))
1344
+ for _ , datum := range initialMD {
1345
+ metadataMap [datum .Key ] = datum
1346
+ }
1347
+ //nolint:ineffassign // Release memory.
1348
+ initialMD = nil
1349
+
1350
+ send , closed , err := httpapi .OneWayWebSocket [codersdk.ServerSentEvent ](rw , r )
1351
+ if err != nil {
1352
+ httpapi .Write (ctx , rw , http .StatusInternalServerError , codersdk.Response {
1353
+ Message : "Internal error setting up server-sent events." ,
1354
+ Detail : err .Error (),
1355
+ })
1356
+ return
1357
+ }
1358
+ // Prevent handler from returning until the sender is closed.
1359
+ defer func () {
1360
+ cancel ()
1361
+ <- closed
1362
+ }()
1363
+ // Synchronize cancellation from SSE -> context, this lets us simplify the
1364
+ // cancellation logic.
1365
+ go func () {
1366
+ select {
1367
+ case <- ctx .Done ():
1368
+ case <- closed :
1369
+ cancel ()
1370
+ }
1371
+ }()
1372
+
1373
+ var lastSend time.Time
1374
+ sendMetadata := func () {
1375
+ lastSend = time .Now ()
1376
+ values := maps .Values (metadataMap )
1377
+
1378
+ log .Debug (ctx , "sending metadata" , "num" , len (values ))
1379
+
1380
+ _ = send (codersdk.ServerSentEvent {
1381
+ Type : codersdk .ServerSentEventTypeData ,
1382
+ Data : convertWorkspaceAgentMetadata (values ),
1383
+ })
1384
+ }
1385
+
1386
+ // We send updates exactly every second.
1387
+ const sendInterval = time .Second * 1
1388
+ sendTicker := time .NewTicker (sendInterval )
1389
+ defer sendTicker .Stop ()
1390
+
1391
+ // Send initial metadata.
1392
+ sendMetadata ()
1393
+
1394
+ // Fetch updated metadata keys as they come in.
1395
+ fetchedMetadata := make (chan []database.WorkspaceAgentMetadatum )
1396
+ go func () {
1397
+ defer close (fetchedMetadata )
1398
+ defer cancel ()
1399
+
1400
+ for {
1401
+ select {
1402
+ case <- ctx .Done ():
1403
+ return
1404
+ case payload := <- update :
1405
+ md , err := api .Database .GetWorkspaceAgentMetadata (ctx , database.GetWorkspaceAgentMetadataParams {
1406
+ WorkspaceAgentID : workspaceAgent .ID ,
1407
+ Keys : payload .Keys ,
1408
+ })
1409
+ if err != nil {
1410
+ if ! database .IsQueryCanceledError (err ) {
1411
+ log .Error (ctx , "failed to get metadata" , slog .Error (err ))
1412
+ _ = send (codersdk.ServerSentEvent {
1413
+ Type : codersdk .ServerSentEventTypeError ,
1414
+ Data : codersdk.Response {
1415
+ Message : "Failed to get metadata." ,
1416
+ Detail : err .Error (),
1417
+ },
1418
+ })
1419
+ }
1420
+ return
1421
+ }
1422
+ select {
1423
+ case <- ctx .Done ():
1424
+ return
1425
+ // We want to block here to avoid constantly pinging the
1426
+ // database when the metadata isn't being processed.
1427
+ case fetchedMetadata <- md :
1428
+ log .Debug (ctx , "fetched metadata update for keys" , "keys" , payload .Keys , "num" , len (md ))
1429
+ }
1430
+ }
1431
+ }
1432
+ }()
1433
+ defer func () {
1434
+ <- fetchedMetadata
1435
+ }()
1436
+
1437
+ pendingChanges := true
1438
+ for {
1439
+ select {
1440
+ case <- ctx .Done ():
1441
+ return
1442
+ case md , ok := <- fetchedMetadata :
1443
+ if ! ok {
1444
+ return
1445
+ }
1446
+ for _ , datum := range md {
1447
+ metadataMap [datum .Key ] = datum
1448
+ }
1449
+ pendingChanges = true
1450
+ continue
1451
+ case <- sendTicker .C :
1452
+ // We send an update even if there's no change every 5 seconds
1453
+ // to ensure that the frontend always has an accurate "Result.Age".
1454
+ if ! pendingChanges && time .Since (lastSend ) < 5 * time .Second {
1455
+ continue
1456
+ }
1457
+ pendingChanges = false
1458
+ }
1459
+
1460
+ sendMetadata ()
1461
+ }
1462
+ }
1463
+
1276
1464
// appendUnique is like append and adds elements from src to dst,
1277
1465
// skipping any elements that already exist in dst.
1278
1466
func appendUnique [T comparable ](dst , src []T ) []T {
0 commit comments