@@ -242,15 +242,15 @@ func (a *agent) runLoop(ctx context.Context) {
242
242
}
243
243
}
244
244
245
- func (a * agent ) collectMetadata (ctx context.Context , md codersdk.WorkspaceAgentMetadataDescription ) * codersdk.WorkspaceAgentMetadataResult {
245
+ func (a * agent ) collectMetadata (ctx context.Context , md codersdk.WorkspaceAgentMetadataDescription , now time. Time ) * codersdk.WorkspaceAgentMetadataResult {
246
246
var out bytes.Buffer
247
247
result := & codersdk.WorkspaceAgentMetadataResult {
248
248
// CollectedAt is set here for testing purposes and overrode by
249
249
// coderd to the time of server receipt to solve clock skew.
250
250
//
251
251
// In the future, the server may accept the timestamp from the agent
252
252
// if it can guarantee the clocks are synchronized.
253
- CollectedAt : time . Now () ,
253
+ CollectedAt : now ,
254
254
}
255
255
cmdPty , err := a .sshServer .CreateCommand (ctx , md .Script , nil )
256
256
if err != nil {
@@ -298,54 +298,64 @@ type metadataResultAndKey struct {
298
298
}
299
299
300
300
type trySingleflight struct {
301
- m sync.Map
301
+ mu sync.Mutex
302
+ m map [string ]struct {}
302
303
}
303
304
304
305
func (t * trySingleflight ) Do (key string , fn func ()) {
305
- _ , loaded := t .m .LoadOrStore (key , struct {}{})
306
- if ! loaded {
307
- // There is already a goroutine running for this key.
306
+ t .mu .Lock ()
307
+ _ , ok := t .m [key ]
308
+ if ok {
309
+ t .mu .Unlock ()
308
310
return
309
311
}
310
312
311
- defer t .m .Delete (key )
313
+ t .m [key ] = struct {}{}
314
+ t .mu .Unlock ()
315
+ defer func () {
316
+ t .mu .Lock ()
317
+ delete (t .m , key )
318
+ t .mu .Unlock ()
319
+ }()
320
+
312
321
fn ()
313
322
}
314
323
315
324
func (a * agent ) reportMetadataLoop (ctx context.Context ) {
316
325
const metadataLimit = 128
317
326
318
327
var (
319
- baseTicker = time .NewTicker (a .reportMetadataInterval )
320
- lastCollectedAts = make (map [string ]time.Time )
321
- metadataResults = make (chan metadataResultAndKey , metadataLimit )
328
+ baseTicker = time .NewTicker (a .reportMetadataInterval )
329
+ lastCollectedAtMu sync.RWMutex
330
+ lastCollectedAts = make (map [string ]time.Time )
331
+ metadataResults = make (chan metadataResultAndKey , metadataLimit )
332
+ logger = a .logger .Named ("metadata" )
322
333
)
323
334
defer baseTicker .Stop ()
324
335
325
336
// We use a custom singleflight that immediately returns if there is already
326
337
// a goroutine running for a given key. This is to prevent a build-up of
327
338
// goroutines waiting on Do when the script takes many multiples of
328
339
// baseInterval to run.
329
- var flight trySingleflight
340
+ flight := trySingleflight { m : map [ string ] struct {}{}}
330
341
331
342
for {
332
343
select {
333
344
case <- ctx .Done ():
334
345
return
335
346
case mr := <- metadataResults :
336
- lastCollectedAts [mr .key ] = mr .result .CollectedAt
337
347
err := a .client .PostMetadata (ctx , mr .key , * mr .result )
338
348
if err != nil {
339
349
a .logger .Error (ctx , "agent failed to report metadata" , slog .Error (err ))
340
350
}
351
+ continue
341
352
case <- baseTicker .C :
342
353
}
343
354
344
355
if len (metadataResults ) > 0 {
345
356
// The inner collection loop expects the channel is empty before spinning up
346
357
// all the collection goroutines.
347
- a .logger .Debug (
348
- ctx , "metadata collection backpressured" ,
358
+ logger .Debug (ctx , "metadata collection backpressured" ,
349
359
slog .F ("queue_len" , len (metadataResults )),
350
360
)
351
361
continue
@@ -357,7 +367,7 @@ func (a *agent) reportMetadataLoop(ctx context.Context) {
357
367
}
358
368
359
369
if len (manifest .Metadata ) > metadataLimit {
360
- a . logger .Error (
370
+ logger .Error (
361
371
ctx , "metadata limit exceeded" ,
362
372
slog .F ("limit" , metadataLimit ), slog .F ("got" , len (manifest .Metadata )),
363
373
)
@@ -367,51 +377,73 @@ func (a *agent) reportMetadataLoop(ctx context.Context) {
367
377
// If the manifest changes (e.g. on agent reconnect) we need to
368
378
// purge old cache values to prevent lastCollectedAt from growing
369
379
// boundlessly.
380
+ lastCollectedAtMu .Lock ()
370
381
for key := range lastCollectedAts {
371
382
if slices .IndexFunc (manifest .Metadata , func (md codersdk.WorkspaceAgentMetadataDescription ) bool {
372
383
return md .Key == key
373
384
}) < 0 {
385
+ logger .Debug (ctx , "deleting lastCollected key, missing from manifest" ,
386
+ slog .F ("key" , key ),
387
+ )
374
388
delete (lastCollectedAts , key )
375
389
}
376
390
}
391
+ lastCollectedAtMu .Unlock ()
377
392
378
393
// Spawn a goroutine for each metadata collection, and use a
379
394
// channel to synchronize the results and avoid both messy
380
395
// mutex logic and overloading the API.
381
396
for _ , md := range manifest .Metadata {
382
- collectedAt , ok := lastCollectedAts [md .Key ]
383
- if ok {
384
- // If the interval is zero, we assume the user just wants
385
- // a single collection at startup, not a spinning loop.
386
- if md .Interval == 0 {
387
- continue
388
- }
389
- // The last collected value isn't quite stale yet, so we skip it.
390
- if collectedAt .Add (a .reportMetadataInterval ).After (time .Now ()) {
391
- continue
392
- }
393
- }
394
-
395
397
md := md
396
398
// We send the result to the channel in the goroutine to avoid
397
399
// sending the same result multiple times. So, we don't care about
398
400
// the return values.
399
401
go flight .Do (md .Key , func () {
402
+ ctx := slog .With (ctx , slog .F ("key" , md .Key ))
403
+ lastCollectedAtMu .RLock ()
404
+ collectedAt , ok := lastCollectedAts [md .Key ]
405
+ lastCollectedAtMu .RUnlock ()
406
+ if ok {
407
+ // If the interval is zero, we assume the user just wants
408
+ // a single collection at startup, not a spinning loop.
409
+ if md .Interval == 0 {
410
+ return
411
+ }
412
+ // The last collected value isn't quite stale yet, so we skip it.
413
+ if collectedAt .Add (a .reportMetadataInterval ).After (time .Now ()) {
414
+ return
415
+ }
416
+ }
417
+
400
418
timeout := md .Timeout
401
419
if timeout == 0 {
402
- timeout = md .Interval
420
+ if md .Interval != 0 {
421
+ timeout = md .Interval
422
+ } else if interval := int64 (a .reportMetadataInterval .Seconds ()); interval != 0 {
423
+ // Fallback to the report interval
424
+ timeout = interval * 3
425
+ } else {
426
+ // If the interval is still 0 (possible if the interval
427
+ // is less than a second), default to 5. This was
428
+ // randomly picked.
429
+ timeout = 5
430
+ }
403
431
}
404
- ctx , cancel := context .WithTimeout (ctx ,
405
- time .Duration (timeout )* time .Second ,
406
- )
432
+ ctxTimeout := time .Duration (timeout ) * time .Second
433
+ ctx , cancel := context .WithTimeout (ctx , ctxTimeout )
407
434
defer cancel ()
408
435
436
+ now := time .Now ()
409
437
select {
410
438
case <- ctx .Done ():
439
+ logger .Warn (ctx , "metadata collection timed out" , slog .F ("timeout" , ctxTimeout ))
411
440
case metadataResults <- metadataResultAndKey {
412
441
key : md .Key ,
413
- result : a .collectMetadata (ctx , md ),
442
+ result : a .collectMetadata (ctx , md , now ),
414
443
}:
444
+ lastCollectedAtMu .Lock ()
445
+ lastCollectedAts [md .Key ] = now
446
+ lastCollectedAtMu .Unlock ()
415
447
}
416
448
})
417
449
}
0 commit comments