diff --git a/agent/agent.go b/agent/agent.go index b4ddc6eb221f5..93daba559c49e 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -362,147 +362,210 @@ func (t *trySingleflight) Do(key string, fn func()) { } func (a *agent) reportMetadataLoop(ctx context.Context) { - const metadataLimit = 128 + tickerDone := make(chan struct{}) + collectDone := make(chan struct{}) + ctx, cancel := context.WithCancel(ctx) + defer func() { + cancel() + <-collectDone + <-tickerDone + }() var ( - baseTicker = time.NewTicker(a.reportMetadataInterval) - lastCollectedAtMu sync.RWMutex - lastCollectedAts = make(map[string]time.Time) - metadataResults = make(chan metadataResultAndKey, metadataLimit) - logger = a.logger.Named("metadata") + logger = a.logger.Named("metadata") + report = make(chan struct{}, 1) + collect = make(chan struct{}, 1) + metadataResults = make(chan metadataResultAndKey, 1) ) - defer baseTicker.Stop() - - // We use a custom singleflight that immediately returns if there is already - // a goroutine running for a given key. This is to prevent a build-up of - // goroutines waiting on Do when the script takes many multiples of - // baseInterval to run. - flight := trySingleflight{m: map[string]struct{}{}} - - postMetadata := func(mr metadataResultAndKey) { - err := a.client.PostMetadata(ctx, agentsdk.PostMetadataRequest{ - Metadata: []agentsdk.Metadata{ - { - Key: mr.key, - WorkspaceAgentMetadataResult: *mr.result, - }, - }, - }) - if err != nil { - a.logger.Error(ctx, "agent failed to report metadata", slog.Error(err)) - } - } - for { - select { - case <-ctx.Done(): - return - case mr := <-metadataResults: - postMetadata(mr) - continue - case <-baseTicker.C: + // Set up collect and report as a single ticker with two channels, + // this is to allow collection and reporting to be triggered + // independently of each other. + go func() { + t := time.NewTicker(a.reportMetadataInterval) + defer func() { + t.Stop() + close(report) + close(collect) + close(tickerDone) + }() + wake := func(c chan<- struct{}) { + select { + case c <- struct{}{}: + default: + } } + wake(collect) // Start immediately. - if len(metadataResults) > 0 { - // The inner collection loop expects the channel is empty before spinning up - // all the collection goroutines. - logger.Debug(ctx, "metadata collection backpressured", - slog.F("queue_len", len(metadataResults)), - ) - continue + for { + select { + case <-ctx.Done(): + return + case <-t.C: + wake(report) + wake(collect) + } } + }() - manifest := a.manifest.Load() - if manifest == nil { - continue - } + go func() { + defer close(collectDone) + + var ( + // We use a custom singleflight that immediately returns if there is already + // a goroutine running for a given key. This is to prevent a build-up of + // goroutines waiting on Do when the script takes many multiples of + // baseInterval to run. + flight = trySingleflight{m: map[string]struct{}{}} + lastCollectedAtMu sync.RWMutex + lastCollectedAts = make(map[string]time.Time) + ) + for { + select { + case <-ctx.Done(): + return + case <-collect: + } - if len(manifest.Metadata) > metadataLimit { - logger.Error( - ctx, "metadata limit exceeded", - slog.F("limit", metadataLimit), slog.F("got", len(manifest.Metadata)), - ) - continue - } + manifest := a.manifest.Load() + if manifest == nil { + continue + } - // If the manifest changes (e.g. on agent reconnect) we need to - // purge old cache values to prevent lastCollectedAt from growing - // boundlessly. - lastCollectedAtMu.Lock() - for key := range lastCollectedAts { - if slices.IndexFunc(manifest.Metadata, func(md codersdk.WorkspaceAgentMetadataDescription) bool { - return md.Key == key - }) < 0 { - logger.Debug(ctx, "deleting lastCollected key, missing from manifest", - slog.F("key", key), - ) - delete(lastCollectedAts, key) + // If the manifest changes (e.g. on agent reconnect) we need to + // purge old cache values to prevent lastCollectedAt from growing + // boundlessly. + lastCollectedAtMu.Lock() + for key := range lastCollectedAts { + if slices.IndexFunc(manifest.Metadata, func(md codersdk.WorkspaceAgentMetadataDescription) bool { + return md.Key == key + }) < 0 { + logger.Debug(ctx, "deleting lastCollected key, missing from manifest", + slog.F("key", key), + ) + delete(lastCollectedAts, key) + } } - } - lastCollectedAtMu.Unlock() - - // Spawn a goroutine for each metadata collection, and use a - // channel to synchronize the results and avoid both messy - // mutex logic and overloading the API. - for _, md := range manifest.Metadata { - md := md - // We send the result to the channel in the goroutine to avoid - // sending the same result multiple times. So, we don't care about - // the return values. - go flight.Do(md.Key, func() { - ctx := slog.With(ctx, slog.F("key", md.Key)) - lastCollectedAtMu.RLock() - collectedAt, ok := lastCollectedAts[md.Key] - lastCollectedAtMu.RUnlock() - if ok { - // If the interval is zero, we assume the user just wants - // a single collection at startup, not a spinning loop. - if md.Interval == 0 { - return + lastCollectedAtMu.Unlock() + + // Spawn a goroutine for each metadata collection, and use a + // channel to synchronize the results and avoid both messy + // mutex logic and overloading the API. + for _, md := range manifest.Metadata { + md := md + // We send the result to the channel in the goroutine to avoid + // sending the same result multiple times. So, we don't care about + // the return values. + go flight.Do(md.Key, func() { + ctx := slog.With(ctx, slog.F("key", md.Key)) + lastCollectedAtMu.RLock() + collectedAt, ok := lastCollectedAts[md.Key] + lastCollectedAtMu.RUnlock() + if ok { + // If the interval is zero, we assume the user just wants + // a single collection at startup, not a spinning loop. + if md.Interval == 0 { + return + } + intervalUnit := time.Second + // reportMetadataInterval is only less than a second in tests, + // so adjust the interval unit for them. + if a.reportMetadataInterval < time.Second { + intervalUnit = 100 * time.Millisecond + } + // The last collected value isn't quite stale yet, so we skip it. + if collectedAt.Add(time.Duration(md.Interval) * intervalUnit).After(time.Now()) { + return + } } - intervalUnit := time.Second - // reportMetadataInterval is only less than a second in tests, - // so adjust the interval unit for them. - if a.reportMetadataInterval < time.Second { - intervalUnit = 100 * time.Millisecond + + timeout := md.Timeout + if timeout == 0 { + if md.Interval != 0 { + timeout = md.Interval + } else if interval := int64(a.reportMetadataInterval.Seconds()); interval != 0 { + // Fallback to the report interval + timeout = interval * 3 + } else { + // If the interval is still 0 (possible if the interval + // is less than a second), default to 5. This was + // randomly picked. + timeout = 5 + } } - // The last collected value isn't quite stale yet, so we skip it. - if collectedAt.Add(time.Duration(md.Interval) * intervalUnit).After(time.Now()) { - return + ctxTimeout := time.Duration(timeout) * time.Second + ctx, cancel := context.WithTimeout(ctx, ctxTimeout) + defer cancel() + + now := time.Now() + select { + case <-ctx.Done(): + logger.Warn(ctx, "metadata collection timed out", slog.F("timeout", ctxTimeout)) + case metadataResults <- metadataResultAndKey{ + key: md.Key, + result: a.collectMetadata(ctx, md, now), + }: + lastCollectedAtMu.Lock() + lastCollectedAts[md.Key] = now + lastCollectedAtMu.Unlock() } - } + }) + } + } + }() - timeout := md.Timeout - if timeout == 0 { - if md.Interval != 0 { - timeout = md.Interval - } else if interval := int64(a.reportMetadataInterval.Seconds()); interval != 0 { - // Fallback to the report interval - timeout = interval * 3 - } else { - // If the interval is still 0 (possible if the interval - // is less than a second), default to 5. This was - // randomly picked. - timeout = 5 - } + // Gather metadata updates and report them once every interval. If a + // previous report is in flight, wait for it to complete before + // sending a new one. If the network conditions are bad, we won't + // benefit from canceling the previous send and starting a new one. + var ( + updatedMetadata = make(map[string]*codersdk.WorkspaceAgentMetadataResult) + reportTimeout = 30 * time.Second + reportSemaphore = make(chan struct{}, 1) + ) + reportSemaphore <- struct{}{} + + for { + select { + case <-ctx.Done(): + return + case mr := <-metadataResults: + // This can overwrite unsent values, but that's fine because + // we're only interested about up-to-date values. + updatedMetadata[mr.key] = mr.result + continue + case <-report: + if len(updatedMetadata) > 0 { + metadata := make([]agentsdk.Metadata, 0, len(updatedMetadata)) + for key, result := range updatedMetadata { + metadata = append(metadata, agentsdk.Metadata{ + Key: key, + WorkspaceAgentMetadataResult: *result, + }) + delete(updatedMetadata, key) } - ctxTimeout := time.Duration(timeout) * time.Second - ctx, cancel := context.WithTimeout(ctx, ctxTimeout) - defer cancel() - now := time.Now() select { - case <-ctx.Done(): - logger.Warn(ctx, "metadata collection timed out", slog.F("timeout", ctxTimeout)) - case metadataResults <- metadataResultAndKey{ - key: md.Key, - result: a.collectMetadata(ctx, md, now), - }: - lastCollectedAtMu.Lock() - lastCollectedAts[md.Key] = now - lastCollectedAtMu.Unlock() + case <-reportSemaphore: + default: + // If there's already a report in flight, don't send + // another one, wait for next tick instead. + continue } - }) + + go func() { + ctx, cancel := context.WithTimeout(ctx, reportTimeout) + defer func() { + cancel() + reportSemaphore <- struct{}{} + }() + + err := a.client.PostMetadata(ctx, agentsdk.PostMetadataRequest{Metadata: metadata}) + if err != nil { + a.logger.Error(ctx, "agent failed to report metadata", slog.Error(err)) + } + }() + } } } } diff --git a/agent/agent_test.go b/agent/agent_test.go index aca521a82f9d2..bf71c4f1638f9 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -1066,34 +1066,43 @@ func TestAgent_Metadata(t *testing.T) { t.Run("Once", func(t *testing.T) { t.Parallel() + //nolint:dogsled _, client, _, _, _ := setupAgent(t, agentsdk.Manifest{ Metadata: []codersdk.WorkspaceAgentMetadataDescription{ { - Key: "greeting", + Key: "greeting1", Interval: 0, Script: echoHello, }, + { + Key: "greeting2", + Interval: 1, + Script: echoHello, + }, }, }, 0, func(_ *agenttest.Client, opts *agent.Options) { - opts.ReportMetadataInterval = 100 * time.Millisecond + opts.ReportMetadataInterval = testutil.IntervalFast }) var gotMd map[string]agentsdk.Metadata require.Eventually(t, func() bool { gotMd = client.GetMetadata() - return len(gotMd) == 1 - }, testutil.WaitShort, testutil.IntervalMedium) + return len(gotMd) == 2 + }, testutil.WaitShort, testutil.IntervalFast/2) - collectedAt := gotMd["greeting"].CollectedAt + collectedAt1 := gotMd["greeting1"].CollectedAt + collectedAt2 := gotMd["greeting2"].CollectedAt - require.Never(t, func() bool { + require.Eventually(t, func() bool { gotMd = client.GetMetadata() - if len(gotMd) != 1 { + if len(gotMd) != 2 { panic("unexpected number of metadata") } - return !gotMd["greeting"].CollectedAt.Equal(collectedAt) - }, testutil.WaitShort, testutil.IntervalMedium) + return !gotMd["greeting2"].CollectedAt.Equal(collectedAt2) + }, testutil.WaitShort, testutil.IntervalFast/2) + + require.Equal(t, gotMd["greeting1"].CollectedAt, collectedAt1, "metadata should not be collected again") }) t.Run("Many", func(t *testing.T) { diff --git a/docs/templates/agent-metadata.md b/docs/templates/agent-metadata.md index 7303e3fa46c89..f36893e918d74 100644 --- a/docs/templates/agent-metadata.md +++ b/docs/templates/agent-metadata.md @@ -123,19 +123,23 @@ usr sys idl wai stl| read writ| recv send| in out | int csw Agent metadata can generate a significant write load and overwhelm your database if you're not careful. The approximate writes per second can be calculated using -the formula: +the following formula (applied once for each unique metadata interval): ```text -(metadata_count * num_running_agents * 2) / metadata_avg_interval +num_running_agents * write_multiplier / metadata_interval ``` -For example, let's say you have +For example, let's say you have: - 10 running agents -- each with 6 metadata snippets -- with an average interval of 4 seconds +- each with 4 metadata snippets +- where two have an interval of 4 seconds, and the other two 6 seconds -You can expect `(10 * 6 * 2) / 4` or 30 writes per second. +You can expect at most `(10 * 2 / 4) + (10 * 2 / 6)` or ~8 writes per second. +The actual writes per second may be a bit lower due to batching of metadata. +Adding more metadata with the same interval will not increase writes per second, +but it may still increase database load slightly. -One of the writes is to the `UNLOGGED` `workspace_agent_metadata` table and the -other to the `NOTIFY` query that enables live stats streaming in the UI. +We use a `write_multiplier` of `2` because each metadata write generates two +writes. One of the writes is to the `UNLOGGED` `workspace_agent_metadata` table +and the other to the `NOTIFY` query that enables live stats streaming in the UI.