@@ -90,7 +90,7 @@ type Client interface {
90
90
PostLifecycle (ctx context.Context , state agentsdk.PostLifecycleRequest ) error
91
91
PostAppHealth (ctx context.Context , req agentsdk.PostAppHealthsRequest ) error
92
92
PostStartup (ctx context.Context , req agentsdk.PostStartupRequest ) error
93
- PostMetadata (ctx context.Context , key string , req agentsdk.PostMetadataRequest ) error
93
+ PostMetadata (ctx context.Context , req agentsdk.PostMetadataRequest ) error
94
94
PatchLogs (ctx context.Context , req agentsdk.PatchLogs ) error
95
95
GetServiceBanner (ctx context.Context ) (codersdk.ServiceBannerConfig , error )
96
96
}
@@ -362,140 +362,210 @@ func (t *trySingleflight) Do(key string, fn func()) {
362
362
}
363
363
364
364
func (a * agent ) reportMetadataLoop (ctx context.Context ) {
365
- const metadataLimit = 128
365
+ tickerDone := make (chan struct {})
366
+ collectDone := make (chan struct {})
367
+ ctx , cancel := context .WithCancel (ctx )
368
+ defer func () {
369
+ cancel ()
370
+ <- collectDone
371
+ <- tickerDone
372
+ }()
366
373
367
374
var (
368
- baseTicker = time .NewTicker (a .reportMetadataInterval )
369
- lastCollectedAtMu sync.RWMutex
370
- lastCollectedAts = make (map [string ]time.Time )
371
- metadataResults = make (chan metadataResultAndKey , metadataLimit )
372
- logger = a .logger .Named ("metadata" )
375
+ logger = a .logger .Named ("metadata" )
376
+ report = make (chan struct {}, 1 )
377
+ collect = make (chan struct {}, 1 )
378
+ metadataResults = make (chan metadataResultAndKey , 1 )
373
379
)
374
- defer baseTicker .Stop ()
375
-
376
- // We use a custom singleflight that immediately returns if there is already
377
- // a goroutine running for a given key. This is to prevent a build-up of
378
- // goroutines waiting on Do when the script takes many multiples of
379
- // baseInterval to run.
380
- flight := trySingleflight {m : map [string ]struct {}{}}
381
-
382
- postMetadata := func (mr metadataResultAndKey ) {
383
- err := a .client .PostMetadata (ctx , mr .key , * mr .result )
384
- if err != nil {
385
- a .logger .Error (ctx , "agent failed to report metadata" , slog .Error (err ))
386
- }
387
- }
388
380
389
- for {
390
- select {
391
- case <- ctx .Done ():
392
- return
393
- case mr := <- metadataResults :
394
- postMetadata (mr )
395
- continue
396
- case <- baseTicker .C :
381
+ // Set up collect and report as a single ticker with two channels,
382
+ // this is to allow collection and reporting to be triggered
383
+ // independently of each other.
384
+ go func () {
385
+ t := time .NewTicker (a .reportMetadataInterval )
386
+ defer func () {
387
+ t .Stop ()
388
+ close (report )
389
+ close (collect )
390
+ close (tickerDone )
391
+ }()
392
+ wake := func (c chan <- struct {}) {
393
+ select {
394
+ case c <- struct {}{}:
395
+ default :
396
+ }
397
397
}
398
+ wake (collect ) // Start immediately.
398
399
399
- if len (metadataResults ) > 0 {
400
- // The inner collection loop expects the channel is empty before spinning up
401
- // all the collection goroutines.
402
- logger .Debug (ctx , "metadata collection backpressured" ,
403
- slog .F ("queue_len" , len (metadataResults )),
404
- )
405
- continue
400
+ for {
401
+ select {
402
+ case <- ctx .Done ():
403
+ return
404
+ case <- t .C :
405
+ wake (report )
406
+ wake (collect )
407
+ }
406
408
}
409
+ }()
407
410
408
- manifest := a .manifest .Load ()
409
- if manifest == nil {
410
- continue
411
- }
411
+ go func () {
412
+ defer close (collectDone )
413
+
414
+ var (
415
+ // We use a custom singleflight that immediately returns if there is already
416
+ // a goroutine running for a given key. This is to prevent a build-up of
417
+ // goroutines waiting on Do when the script takes many multiples of
418
+ // baseInterval to run.
419
+ flight = trySingleflight {m : map [string ]struct {}{}}
420
+ lastCollectedAtMu sync.RWMutex
421
+ lastCollectedAts = make (map [string ]time.Time )
422
+ )
423
+ for {
424
+ select {
425
+ case <- ctx .Done ():
426
+ return
427
+ case <- collect :
428
+ }
412
429
413
- if len (manifest .Metadata ) > metadataLimit {
414
- logger .Error (
415
- ctx , "metadata limit exceeded" ,
416
- slog .F ("limit" , metadataLimit ), slog .F ("got" , len (manifest .Metadata )),
417
- )
418
- continue
419
- }
430
+ manifest := a .manifest .Load ()
431
+ if manifest == nil {
432
+ continue
433
+ }
420
434
421
- // If the manifest changes (e.g. on agent reconnect) we need to
422
- // purge old cache values to prevent lastCollectedAt from growing
423
- // boundlessly.
424
- lastCollectedAtMu .Lock ()
425
- for key := range lastCollectedAts {
426
- if slices .IndexFunc (manifest .Metadata , func (md codersdk.WorkspaceAgentMetadataDescription ) bool {
427
- return md .Key == key
428
- }) < 0 {
429
- logger .Debug (ctx , "deleting lastCollected key, missing from manifest" ,
430
- slog .F ("key" , key ),
431
- )
432
- delete (lastCollectedAts , key )
435
+ // If the manifest changes (e.g. on agent reconnect) we need to
436
+ // purge old cache values to prevent lastCollectedAt from growing
437
+ // boundlessly.
438
+ lastCollectedAtMu .Lock ()
439
+ for key := range lastCollectedAts {
440
+ if slices .IndexFunc (manifest .Metadata , func (md codersdk.WorkspaceAgentMetadataDescription ) bool {
441
+ return md .Key == key
442
+ }) < 0 {
443
+ logger .Debug (ctx , "deleting lastCollected key, missing from manifest" ,
444
+ slog .F ("key" , key ),
445
+ )
446
+ delete (lastCollectedAts , key )
447
+ }
433
448
}
434
- }
435
- lastCollectedAtMu .Unlock ()
436
-
437
- // Spawn a goroutine for each metadata collection, and use a
438
- // channel to synchronize the results and avoid both messy
439
- // mutex logic and overloading the API.
440
- for _ , md := range manifest .Metadata {
441
- md := md
442
- // We send the result to the channel in the goroutine to avoid
443
- // sending the same result multiple times. So, we don't care about
444
- // the return values.
445
- go flight .Do (md .Key , func () {
446
- ctx := slog .With (ctx , slog .F ("key" , md .Key ))
447
- lastCollectedAtMu .RLock ()
448
- collectedAt , ok := lastCollectedAts [md .Key ]
449
- lastCollectedAtMu .RUnlock ()
450
- if ok {
451
- // If the interval is zero, we assume the user just wants
452
- // a single collection at startup, not a spinning loop.
453
- if md .Interval == 0 {
454
- return
449
+ lastCollectedAtMu .Unlock ()
450
+
451
+ // Spawn a goroutine for each metadata collection, and use a
452
+ // channel to synchronize the results and avoid both messy
453
+ // mutex logic and overloading the API.
454
+ for _ , md := range manifest .Metadata {
455
+ md := md
456
+ // We send the result to the channel in the goroutine to avoid
457
+ // sending the same result multiple times. So, we don't care about
458
+ // the return values.
459
+ go flight .Do (md .Key , func () {
460
+ ctx := slog .With (ctx , slog .F ("key" , md .Key ))
461
+ lastCollectedAtMu .RLock ()
462
+ collectedAt , ok := lastCollectedAts [md .Key ]
463
+ lastCollectedAtMu .RUnlock ()
464
+ if ok {
465
+ // If the interval is zero, we assume the user just wants
466
+ // a single collection at startup, not a spinning loop.
467
+ if md .Interval == 0 {
468
+ return
469
+ }
470
+ intervalUnit := time .Second
471
+ // reportMetadataInterval is only less than a second in tests,
472
+ // so adjust the interval unit for them.
473
+ if a .reportMetadataInterval < time .Second {
474
+ intervalUnit = 100 * time .Millisecond
475
+ }
476
+ // The last collected value isn't quite stale yet, so we skip it.
477
+ if collectedAt .Add (time .Duration (md .Interval ) * intervalUnit ).After (time .Now ()) {
478
+ return
479
+ }
455
480
}
456
- intervalUnit := time .Second
457
- // reportMetadataInterval is only less than a second in tests,
458
- // so adjust the interval unit for them.
459
- if a .reportMetadataInterval < time .Second {
460
- intervalUnit = 100 * time .Millisecond
481
+
482
+ timeout := md .Timeout
483
+ if timeout == 0 {
484
+ if md .Interval != 0 {
485
+ timeout = md .Interval
486
+ } else if interval := int64 (a .reportMetadataInterval .Seconds ()); interval != 0 {
487
+ // Fallback to the report interval
488
+ timeout = interval * 3
489
+ } else {
490
+ // If the interval is still 0 (possible if the interval
491
+ // is less than a second), default to 5. This was
492
+ // randomly picked.
493
+ timeout = 5
494
+ }
461
495
}
462
- // The last collected value isn't quite stale yet, so we skip it.
463
- if collectedAt .Add (time .Duration (md .Interval ) * intervalUnit ).After (time .Now ()) {
464
- return
496
+ ctxTimeout := time .Duration (timeout ) * time .Second
497
+ ctx , cancel := context .WithTimeout (ctx , ctxTimeout )
498
+ defer cancel ()
499
+
500
+ now := time .Now ()
501
+ select {
502
+ case <- ctx .Done ():
503
+ logger .Warn (ctx , "metadata collection timed out" , slog .F ("timeout" , ctxTimeout ))
504
+ case metadataResults <- metadataResultAndKey {
505
+ key : md .Key ,
506
+ result : a .collectMetadata (ctx , md , now ),
507
+ }:
508
+ lastCollectedAtMu .Lock ()
509
+ lastCollectedAts [md .Key ] = now
510
+ lastCollectedAtMu .Unlock ()
465
511
}
466
- }
512
+ })
513
+ }
514
+ }
515
+ }()
467
516
468
- timeout := md .Timeout
469
- if timeout == 0 {
470
- if md .Interval != 0 {
471
- timeout = md .Interval
472
- } else if interval := int64 (a .reportMetadataInterval .Seconds ()); interval != 0 {
473
- // Fallback to the report interval
474
- timeout = interval * 3
475
- } else {
476
- // If the interval is still 0 (possible if the interval
477
- // is less than a second), default to 5. This was
478
- // randomly picked.
479
- timeout = 5
480
- }
517
+ // Gather metadata updates and report them once every interval. If a
518
+ // previous report is in flight, wait for it to complete before
519
+ // sending a new one. If the network conditions are bad, we won't
520
+ // benefit from canceling the previous send and starting a new one.
521
+ var (
522
+ updatedMetadata = make (map [string ]* codersdk.WorkspaceAgentMetadataResult )
523
+ reportTimeout = 30 * time .Second
524
+ reportSemaphore = make (chan struct {}, 1 )
525
+ )
526
+ reportSemaphore <- struct {}{}
527
+
528
+ for {
529
+ select {
530
+ case <- ctx .Done ():
531
+ return
532
+ case mr := <- metadataResults :
533
+ // This can overwrite unsent values, but that's fine because
534
+ // we're only interested about up-to-date values.
535
+ updatedMetadata [mr .key ] = mr .result
536
+ continue
537
+ case <- report :
538
+ if len (updatedMetadata ) > 0 {
539
+ metadata := make ([]agentsdk.Metadata , 0 , len (updatedMetadata ))
540
+ for key , result := range updatedMetadata {
541
+ metadata = append (metadata , agentsdk.Metadata {
542
+ Key : key ,
543
+ WorkspaceAgentMetadataResult : * result ,
544
+ })
545
+ delete (updatedMetadata , key )
481
546
}
482
- ctxTimeout := time .Duration (timeout ) * time .Second
483
- ctx , cancel := context .WithTimeout (ctx , ctxTimeout )
484
- defer cancel ()
485
547
486
- now := time .Now ()
487
548
select {
488
- case <- ctx .Done ():
489
- logger .Warn (ctx , "metadata collection timed out" , slog .F ("timeout" , ctxTimeout ))
490
- case metadataResults <- metadataResultAndKey {
491
- key : md .Key ,
492
- result : a .collectMetadata (ctx , md , now ),
493
- }:
494
- lastCollectedAtMu .Lock ()
495
- lastCollectedAts [md .Key ] = now
496
- lastCollectedAtMu .Unlock ()
549
+ case <- reportSemaphore :
550
+ default :
551
+ // If there's already a report in flight, don't send
552
+ // another one, wait for next tick instead.
553
+ continue
497
554
}
498
- })
555
+
556
+ go func () {
557
+ ctx , cancel := context .WithTimeout (ctx , reportTimeout )
558
+ defer func () {
559
+ cancel ()
560
+ reportSemaphore <- struct {}{}
561
+ }()
562
+
563
+ err := a .client .PostMetadata (ctx , agentsdk.PostMetadataRequest {Metadata : metadata })
564
+ if err != nil {
565
+ a .logger .Error (ctx , "agent failed to report metadata" , slog .Error (err ))
566
+ }
567
+ }()
568
+ }
499
569
}
500
570
}
501
571
}
0 commit comments