-
Notifications
You must be signed in to change notification settings - Fork 887
fix(agent): send metadata in batches #10225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -362,147 +362,210 @@ func (t *trySingleflight) Do(key string, fn func()) { | |
} | ||
|
||
func (a *agent) reportMetadataLoop(ctx context.Context) { | ||
const metadataLimit = 128 | ||
tickerDone := make(chan struct{}) | ||
collectDone := make(chan struct{}) | ||
ctx, cancel := context.WithCancel(ctx) | ||
defer func() { | ||
cancel() | ||
<-collectDone | ||
<-tickerDone | ||
}() | ||
|
||
var ( | ||
baseTicker = time.NewTicker(a.reportMetadataInterval) | ||
lastCollectedAtMu sync.RWMutex | ||
lastCollectedAts = make(map[string]time.Time) | ||
metadataResults = make(chan metadataResultAndKey, metadataLimit) | ||
logger = a.logger.Named("metadata") | ||
logger = a.logger.Named("metadata") | ||
report = make(chan struct{}, 1) | ||
collect = make(chan struct{}, 1) | ||
metadataResults = make(chan metadataResultAndKey, 1) | ||
) | ||
defer baseTicker.Stop() | ||
|
||
// We use a custom singleflight that immediately returns if there is already | ||
// a goroutine running for a given key. This is to prevent a build-up of | ||
// goroutines waiting on Do when the script takes many multiples of | ||
// baseInterval to run. | ||
flight := trySingleflight{m: map[string]struct{}{}} | ||
|
||
postMetadata := func(mr metadataResultAndKey) { | ||
err := a.client.PostMetadata(ctx, agentsdk.PostMetadataRequest{ | ||
Metadata: []agentsdk.Metadata{ | ||
{ | ||
Key: mr.key, | ||
WorkspaceAgentMetadataResult: *mr.result, | ||
}, | ||
}, | ||
}) | ||
if err != nil { | ||
a.logger.Error(ctx, "agent failed to report metadata", slog.Error(err)) | ||
} | ||
} | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case mr := <-metadataResults: | ||
postMetadata(mr) | ||
continue | ||
case <-baseTicker.C: | ||
// Set up collect and report as a single ticker with two channels, | ||
// this is to allow collection and reporting to be triggered | ||
// independently of each other. | ||
go func() { | ||
t := time.NewTicker(a.reportMetadataInterval) | ||
defer func() { | ||
t.Stop() | ||
close(report) | ||
close(collect) | ||
close(tickerDone) | ||
}() | ||
wake := func(c chan<- struct{}) { | ||
select { | ||
case c <- struct{}{}: | ||
default: | ||
} | ||
} | ||
wake(collect) // Start immediately. | ||
|
||
if len(metadataResults) > 0 { | ||
// The inner collection loop expects the channel is empty before spinning up | ||
// all the collection goroutines. | ||
logger.Debug(ctx, "metadata collection backpressured", | ||
slog.F("queue_len", len(metadataResults)), | ||
) | ||
continue | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-t.C: | ||
wake(report) | ||
wake(collect) | ||
} | ||
} | ||
}() | ||
|
||
manifest := a.manifest.Load() | ||
if manifest == nil { | ||
continue | ||
} | ||
go func() { | ||
defer close(collectDone) | ||
|
||
var ( | ||
// We use a custom singleflight that immediately returns if there is already | ||
// a goroutine running for a given key. This is to prevent a build-up of | ||
// goroutines waiting on Do when the script takes many multiples of | ||
// baseInterval to run. | ||
flight = trySingleflight{m: map[string]struct{}{}} | ||
lastCollectedAtMu sync.RWMutex | ||
lastCollectedAts = make(map[string]time.Time) | ||
) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-collect: | ||
} | ||
|
||
if len(manifest.Metadata) > metadataLimit { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need this condition anymore? just in case... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously there was a buffered channel, but that's no longer required so this artificial restriction got lifted. If we want to limit this, it should be in the coder tf provider, saying too many metadata items were added. |
||
logger.Error( | ||
ctx, "metadata limit exceeded", | ||
slog.F("limit", metadataLimit), slog.F("got", len(manifest.Metadata)), | ||
) | ||
continue | ||
} | ||
manifest := a.manifest.Load() | ||
if manifest == nil { | ||
continue | ||
} | ||
|
||
// If the manifest changes (e.g. on agent reconnect) we need to | ||
// purge old cache values to prevent lastCollectedAt from growing | ||
// boundlessly. | ||
lastCollectedAtMu.Lock() | ||
for key := range lastCollectedAts { | ||
if slices.IndexFunc(manifest.Metadata, func(md codersdk.WorkspaceAgentMetadataDescription) bool { | ||
return md.Key == key | ||
}) < 0 { | ||
logger.Debug(ctx, "deleting lastCollected key, missing from manifest", | ||
slog.F("key", key), | ||
) | ||
delete(lastCollectedAts, key) | ||
// If the manifest changes (e.g. on agent reconnect) we need to | ||
// purge old cache values to prevent lastCollectedAt from growing | ||
// boundlessly. | ||
lastCollectedAtMu.Lock() | ||
for key := range lastCollectedAts { | ||
if slices.IndexFunc(manifest.Metadata, func(md codersdk.WorkspaceAgentMetadataDescription) bool { | ||
return md.Key == key | ||
}) < 0 { | ||
logger.Debug(ctx, "deleting lastCollected key, missing from manifest", | ||
slog.F("key", key), | ||
) | ||
delete(lastCollectedAts, key) | ||
} | ||
} | ||
} | ||
lastCollectedAtMu.Unlock() | ||
|
||
// Spawn a goroutine for each metadata collection, and use a | ||
// channel to synchronize the results and avoid both messy | ||
// mutex logic and overloading the API. | ||
for _, md := range manifest.Metadata { | ||
md := md | ||
// We send the result to the channel in the goroutine to avoid | ||
// sending the same result multiple times. So, we don't care about | ||
// the return values. | ||
go flight.Do(md.Key, func() { | ||
ctx := slog.With(ctx, slog.F("key", md.Key)) | ||
lastCollectedAtMu.RLock() | ||
collectedAt, ok := lastCollectedAts[md.Key] | ||
lastCollectedAtMu.RUnlock() | ||
if ok { | ||
// If the interval is zero, we assume the user just wants | ||
// a single collection at startup, not a spinning loop. | ||
if md.Interval == 0 { | ||
return | ||
lastCollectedAtMu.Unlock() | ||
|
||
// Spawn a goroutine for each metadata collection, and use a | ||
// channel to synchronize the results and avoid both messy | ||
// mutex logic and overloading the API. | ||
for _, md := range manifest.Metadata { | ||
md := md | ||
// We send the result to the channel in the goroutine to avoid | ||
// sending the same result multiple times. So, we don't care about | ||
// the return values. | ||
go flight.Do(md.Key, func() { | ||
ctx := slog.With(ctx, slog.F("key", md.Key)) | ||
lastCollectedAtMu.RLock() | ||
collectedAt, ok := lastCollectedAts[md.Key] | ||
lastCollectedAtMu.RUnlock() | ||
if ok { | ||
// If the interval is zero, we assume the user just wants | ||
// a single collection at startup, not a spinning loop. | ||
if md.Interval == 0 { | ||
return | ||
} | ||
intervalUnit := time.Second | ||
// reportMetadataInterval is only less than a second in tests, | ||
// so adjust the interval unit for them. | ||
if a.reportMetadataInterval < time.Second { | ||
intervalUnit = 100 * time.Millisecond | ||
mtojek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
// The last collected value isn't quite stale yet, so we skip it. | ||
if collectedAt.Add(time.Duration(md.Interval) * intervalUnit).After(time.Now()) { | ||
return | ||
} | ||
} | ||
intervalUnit := time.Second | ||
// reportMetadataInterval is only less than a second in tests, | ||
// so adjust the interval unit for them. | ||
if a.reportMetadataInterval < time.Second { | ||
intervalUnit = 100 * time.Millisecond | ||
|
||
timeout := md.Timeout | ||
if timeout == 0 { | ||
if md.Interval != 0 { | ||
timeout = md.Interval | ||
} else if interval := int64(a.reportMetadataInterval.Seconds()); interval != 0 { | ||
// Fallback to the report interval | ||
timeout = interval * 3 | ||
} else { | ||
// If the interval is still 0 (possible if the interval | ||
// is less than a second), default to 5. This was | ||
// randomly picked. | ||
timeout = 5 | ||
} | ||
mtojek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
// The last collected value isn't quite stale yet, so we skip it. | ||
if collectedAt.Add(time.Duration(md.Interval) * intervalUnit).After(time.Now()) { | ||
return | ||
ctxTimeout := time.Duration(timeout) * time.Second | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I might be lost in the flow here... why is this 1sec timeout needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This also didn't change in this PR. The timeout here can be 1s or it can be higher. I suppose the logic is that if you want metadata to update every 1s, collection should run faster than 1s, but this is likely not ideal. We could change this in the future or if it becomes a problem. |
||
ctx, cancel := context.WithTimeout(ctx, ctxTimeout) | ||
defer cancel() | ||
|
||
now := time.Now() | ||
select { | ||
case <-ctx.Done(): | ||
logger.Warn(ctx, "metadata collection timed out", slog.F("timeout", ctxTimeout)) | ||
case metadataResults <- metadataResultAndKey{ | ||
key: md.Key, | ||
result: a.collectMetadata(ctx, md, now), | ||
}: | ||
lastCollectedAtMu.Lock() | ||
lastCollectedAts[md.Key] = now | ||
lastCollectedAtMu.Unlock() | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
}() | ||
|
||
timeout := md.Timeout | ||
if timeout == 0 { | ||
if md.Interval != 0 { | ||
timeout = md.Interval | ||
} else if interval := int64(a.reportMetadataInterval.Seconds()); interval != 0 { | ||
// Fallback to the report interval | ||
timeout = interval * 3 | ||
} else { | ||
// If the interval is still 0 (possible if the interval | ||
// is less than a second), default to 5. This was | ||
// randomly picked. | ||
timeout = 5 | ||
} | ||
// Gather metadata updates and report them once every interval. If a | ||
// previous report is in flight, wait for it to complete before | ||
// sending a new one. If the network conditions are bad, we won't | ||
// benefit from canceling the previous send and starting a new one. | ||
var ( | ||
updatedMetadata = make(map[string]*codersdk.WorkspaceAgentMetadataResult) | ||
reportTimeout = 30 * time.Second | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are many magic timeouts hidden in this file. I'm wondering if we shouldn't move them to the top of file, and make them more dependent. For instance: instead of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With 5 sec, are you referring to the case where I wanted to change as little about the metadata collection as possible, but I can go over this and see what can be consolidated. This 30 second timeout was randomly picked by me as a crutch for scenarios where the network is super slow or down. It should be enough for at least something to get through, whilst avoiding sending very stale data. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. I'm afraid that there are too many timeouts depending on each other in the entire implementation, and it might be hard to debug potential queueing problems. |
||
reportSemaphore = make(chan struct{}, 1) | ||
) | ||
reportSemaphore <- struct{}{} | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case mr := <-metadataResults: | ||
// This can overwrite unsent values, but that's fine because | ||
// we're only interested about up-to-date values. | ||
updatedMetadata[mr.key] = mr.result | ||
continue | ||
case <-report: | ||
if len(updatedMetadata) > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I'm understanding this correctly - say you have two agent metadata definitions, one which updates every 1 second, the other which updates every 10 seconds. In the previous request-per-metadata-key approach, this would cause (0.1 + 1) 1.1 requests per second, while in this new approach we would end up with a minimum of 1 requests per second, as the more frequently updated metadatum would cause a batch request. I'm think we should update the documentation with this change to reflect the new behaviour. I think it would also make sense to recommend users to keep in mind the minimum metadata refresh interval when writing their templates; any metadatum with a refresh interval of 1 second will cause frequent metadata updates from what I understand here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'm not sure where
In the new implementation, we would:
With 2s and 3s metadata, it would look like this: Old:
New:
This is an approximation of ideal conditions, though. And perhaps we should separate With regards to RPS the new implementation reduces RPS and doesn't necessarily guarantee 1 RPS, it depends on interval and how long commands take to execute.
Are you referring to this https://coder.com/docs/v2/latest/templates/agent-metadata#db-write-load? The write load will be about the same, but the writes will be more performant since they're batched. I suppose we could call one batch one write, even though we're updating multiple rows. I'll amend this part. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't an agent metadatum with a refresh interval of 10 seconds cause 1 request roughly every 10 seconds (i.e. 0.1 RPS)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is the case in both implementations. |
||
metadata := make([]agentsdk.Metadata, 0, len(updatedMetadata)) | ||
for key, result := range updatedMetadata { | ||
metadata = append(metadata, agentsdk.Metadata{ | ||
Key: key, | ||
WorkspaceAgentMetadataResult: *result, | ||
}) | ||
delete(updatedMetadata, key) | ||
} | ||
ctxTimeout := time.Duration(timeout) * time.Second | ||
ctx, cancel := context.WithTimeout(ctx, ctxTimeout) | ||
defer cancel() | ||
|
||
now := time.Now() | ||
select { | ||
case <-ctx.Done(): | ||
logger.Warn(ctx, "metadata collection timed out", slog.F("timeout", ctxTimeout)) | ||
case metadataResults <- metadataResultAndKey{ | ||
key: md.Key, | ||
result: a.collectMetadata(ctx, md, now), | ||
}: | ||
lastCollectedAtMu.Lock() | ||
lastCollectedAts[md.Key] = now | ||
lastCollectedAtMu.Unlock() | ||
case <-reportSemaphore: | ||
default: | ||
// If there's already a report in flight, don't send | ||
// another one, wait for next tick instead. | ||
continue | ||
} | ||
}) | ||
|
||
go func() { | ||
ctx, cancel := context.WithTimeout(ctx, reportTimeout) | ||
defer func() { | ||
cancel() | ||
reportSemaphore <- struct{}{} | ||
}() | ||
|
||
err := a.client.PostMetadata(ctx, agentsdk.PostMetadataRequest{Metadata: metadata}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This behavior doesn't change, right: If the agent fails to send the metadata, it will be lost. There is no "retry" mechanism in place now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, we don't retry since trying to send stale data wouldn't make much sense, we instead wait until the next update is available and try to send it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok! I guess we can debate about the lack of source data for insights, but that's a separate issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, this is not used for insights, so it will not have an effect on that. |
||
if err != nil { | ||
a.logger.Error(ctx, "agent failed to report metadata", slog.Error(err)) | ||
} | ||
}() | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it change something if we increase buffers here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. They are single buffer to be non-blocking and not cause back-pressure. If we increase channel sizes then we may have back-pressure if an operation is taking longer than expected. Say collect was 10, if one iteration of starting collection took 10s then this channel would fill and afterwards 10 collections would start immediately after each other, even if they now finish in 0.1s each, possibly leading to 10 collections (and load) within a second (vs once per second).