Skip to content

Commit 06d26b5

Browse files
committed
improve synchronization in metadata loop
1 parent 64be182 commit 06d26b5

File tree

1 file changed

+22
-9
lines changed

1 file changed

+22
-9
lines changed

agent/agent.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (a *agent) collectMetadata(ctx context.Context, md codersdk.WorkspaceAgentM
258258
return result
259259
}
260260

261-
func convertInterval(i int64) time.Duration {
261+
func adjustIntervalForTests(i int64) time.Duration {
262262
// In tests we want to set shorter intervals because engineers are
263263
// impatient.
264264
base := time.Second
@@ -274,12 +274,14 @@ type metadataResultAndKey struct {
274274
}
275275

276276
func (a *agent) reportMetadataLoop(ctx context.Context) {
277-
baseInterval := convertInterval(1)
277+
baseInterval := adjustIntervalForTests(1)
278+
279+
const metadataLimit = 128
278280

279281
var (
280282
baseTicker = time.NewTicker(baseInterval)
281283
lastCollectedAts = make(map[string]time.Time)
282-
metadataResults = make(chan metadataResultAndKey, 16)
284+
metadataResults = make(chan metadataResultAndKey, metadataLimit)
283285
)
284286
defer baseTicker.Stop()
285287

@@ -294,15 +296,13 @@ func (a *agent) reportMetadataLoop(ctx context.Context) {
294296
a.logger.Error(ctx, "report metadata", slog.Error(err))
295297
}
296298
case <-baseTicker.C:
297-
break
298299
}
299300

300-
if len(metadataResults) > cap(metadataResults)/2 {
301+
if len(metadataResults) > 0 {
301302
// If we're backpressured on sending back results, we risk
302303
// runaway goroutine growth and/or overloading coderd. So,
303-
// we just skip the collection. Since we never update
304-
// the collections map, we'll retry the collection
305-
// on the next tick.
304+
// we just skip the collection and give the loop another chance to
305+
// post metadata.
306306
a.logger.Debug(
307307
ctx, "metadata collection backpressured",
308308
slog.F("queue_len", len(metadataResults)),
@@ -314,6 +314,15 @@ func (a *agent) reportMetadataLoop(ctx context.Context) {
314314
if manifest == nil {
315315
continue
316316
}
317+
318+
if len(manifest.Metadata) > metadataLimit {
319+
a.logger.Error(
320+
ctx, "metadata limit exceeded",
321+
slog.F("limit", metadataLimit), slog.F("got", len(manifest.Metadata)),
322+
)
323+
continue
324+
}
325+
317326
// If the manifest changes (e.g. on agent reconnect) we need to
318327
// purge old cache values to prevent lastCollectedAt from growing
319328
// boundlessly.
@@ -337,7 +346,7 @@ func (a *agent) reportMetadataLoop(ctx context.Context) {
337346
continue
338347
}
339348
if collectedAt.Add(
340-
convertInterval(md.Interval),
349+
adjustIntervalForTests(md.Interval),
341350
).After(time.Now()) {
342351
continue
343352
}
@@ -351,6 +360,10 @@ func (a *agent) reportMetadataLoop(ctx context.Context) {
351360
key: md.Key,
352361
result: a.collectMetadata(ctx, md),
353362
}:
363+
default:
364+
// This should be impossible because the channel is empty
365+
// before we start spinning up send goroutines.
366+
a.logger.Error(ctx, "metadataResults channel full")
354367
}
355368
}(md)
356369
}

0 commit comments

Comments
 (0)