Skip to content

fix(agent): send metadata in batches #10225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 185 additions & 122 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,147 +362,210 @@ func (t *trySingleflight) Do(key string, fn func()) {
}

func (a *agent) reportMetadataLoop(ctx context.Context) {
const metadataLimit = 128
tickerDone := make(chan struct{})
collectDone := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
<-collectDone
<-tickerDone
}()

var (
baseTicker = time.NewTicker(a.reportMetadataInterval)
lastCollectedAtMu sync.RWMutex
lastCollectedAts = make(map[string]time.Time)
metadataResults = make(chan metadataResultAndKey, metadataLimit)
logger = a.logger.Named("metadata")
logger = a.logger.Named("metadata")
report = make(chan struct{}, 1)
collect = make(chan struct{}, 1)
Comment on lines +376 to +377
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it change something if we increase buffers here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. They are single buffer to be non-blocking and not cause back-pressure. If we increase channel sizes then we may have back-pressure if an operation is taking longer than expected. Say collect was 10, if one iteration of starting collection took 10s then this channel would fill and afterwards 10 collections would start immediately after each other, even if they now finish in 0.1s each, possibly leading to 10 collections (and load) within a second (vs once per second).

metadataResults = make(chan metadataResultAndKey, 1)
)
defer baseTicker.Stop()

// We use a custom singleflight that immediately returns if there is already
// a goroutine running for a given key. This is to prevent a build-up of
// goroutines waiting on Do when the script takes many multiples of
// baseInterval to run.
flight := trySingleflight{m: map[string]struct{}{}}

postMetadata := func(mr metadataResultAndKey) {
err := a.client.PostMetadata(ctx, agentsdk.PostMetadataRequest{
Metadata: []agentsdk.Metadata{
{
Key: mr.key,
WorkspaceAgentMetadataResult: *mr.result,
},
},
})
if err != nil {
a.logger.Error(ctx, "agent failed to report metadata", slog.Error(err))
}
}

for {
select {
case <-ctx.Done():
return
case mr := <-metadataResults:
postMetadata(mr)
continue
case <-baseTicker.C:
// Set up collect and report as a single ticker with two channels,
// this is to allow collection and reporting to be triggered
// independently of each other.
go func() {
t := time.NewTicker(a.reportMetadataInterval)
defer func() {
t.Stop()
close(report)
close(collect)
close(tickerDone)
}()
wake := func(c chan<- struct{}) {
select {
case c <- struct{}{}:
default:
}
}
wake(collect) // Start immediately.

if len(metadataResults) > 0 {
// The inner collection loop expects the channel is empty before spinning up
// all the collection goroutines.
logger.Debug(ctx, "metadata collection backpressured",
slog.F("queue_len", len(metadataResults)),
)
continue
for {
select {
case <-ctx.Done():
return
case <-t.C:
wake(report)
wake(collect)
}
}
}()

manifest := a.manifest.Load()
if manifest == nil {
continue
}
go func() {
defer close(collectDone)

var (
// We use a custom singleflight that immediately returns if there is already
// a goroutine running for a given key. This is to prevent a build-up of
// goroutines waiting on Do when the script takes many multiples of
// baseInterval to run.
flight = trySingleflight{m: map[string]struct{}{}}
lastCollectedAtMu sync.RWMutex
lastCollectedAts = make(map[string]time.Time)
)
for {
select {
case <-ctx.Done():
return
case <-collect:
}

if len(manifest.Metadata) > metadataLimit {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this condition anymore? just in case...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously there was a buffered channel, but that's no longer required so this artificial restriction got lifted. If we want to limit this, it should be in the coder tf provider, saying too many metadata items were added.

logger.Error(
ctx, "metadata limit exceeded",
slog.F("limit", metadataLimit), slog.F("got", len(manifest.Metadata)),
)
continue
}
manifest := a.manifest.Load()
if manifest == nil {
continue
}

// If the manifest changes (e.g. on agent reconnect) we need to
// purge old cache values to prevent lastCollectedAt from growing
// boundlessly.
lastCollectedAtMu.Lock()
for key := range lastCollectedAts {
if slices.IndexFunc(manifest.Metadata, func(md codersdk.WorkspaceAgentMetadataDescription) bool {
return md.Key == key
}) < 0 {
logger.Debug(ctx, "deleting lastCollected key, missing from manifest",
slog.F("key", key),
)
delete(lastCollectedAts, key)
// If the manifest changes (e.g. on agent reconnect) we need to
// purge old cache values to prevent lastCollectedAt from growing
// boundlessly.
lastCollectedAtMu.Lock()
for key := range lastCollectedAts {
if slices.IndexFunc(manifest.Metadata, func(md codersdk.WorkspaceAgentMetadataDescription) bool {
return md.Key == key
}) < 0 {
logger.Debug(ctx, "deleting lastCollected key, missing from manifest",
slog.F("key", key),
)
delete(lastCollectedAts, key)
}
}
}
lastCollectedAtMu.Unlock()

// Spawn a goroutine for each metadata collection, and use a
// channel to synchronize the results and avoid both messy
// mutex logic and overloading the API.
for _, md := range manifest.Metadata {
md := md
// We send the result to the channel in the goroutine to avoid
// sending the same result multiple times. So, we don't care about
// the return values.
go flight.Do(md.Key, func() {
ctx := slog.With(ctx, slog.F("key", md.Key))
lastCollectedAtMu.RLock()
collectedAt, ok := lastCollectedAts[md.Key]
lastCollectedAtMu.RUnlock()
if ok {
// If the interval is zero, we assume the user just wants
// a single collection at startup, not a spinning loop.
if md.Interval == 0 {
return
lastCollectedAtMu.Unlock()

// Spawn a goroutine for each metadata collection, and use a
// channel to synchronize the results and avoid both messy
// mutex logic and overloading the API.
for _, md := range manifest.Metadata {
md := md
// We send the result to the channel in the goroutine to avoid
// sending the same result multiple times. So, we don't care about
// the return values.
go flight.Do(md.Key, func() {
ctx := slog.With(ctx, slog.F("key", md.Key))
lastCollectedAtMu.RLock()
collectedAt, ok := lastCollectedAts[md.Key]
lastCollectedAtMu.RUnlock()
if ok {
// If the interval is zero, we assume the user just wants
// a single collection at startup, not a spinning loop.
if md.Interval == 0 {
return
}
intervalUnit := time.Second
// reportMetadataInterval is only less than a second in tests,
// so adjust the interval unit for them.
if a.reportMetadataInterval < time.Second {
intervalUnit = 100 * time.Millisecond
}
// The last collected value isn't quite stale yet, so we skip it.
if collectedAt.Add(time.Duration(md.Interval) * intervalUnit).After(time.Now()) {
return
}
}
intervalUnit := time.Second
// reportMetadataInterval is only less than a second in tests,
// so adjust the interval unit for them.
if a.reportMetadataInterval < time.Second {
intervalUnit = 100 * time.Millisecond

timeout := md.Timeout
if timeout == 0 {
if md.Interval != 0 {
timeout = md.Interval
} else if interval := int64(a.reportMetadataInterval.Seconds()); interval != 0 {
// Fallback to the report interval
timeout = interval * 3
} else {
// If the interval is still 0 (possible if the interval
// is less than a second), default to 5. This was
// randomly picked.
timeout = 5
}
}
// The last collected value isn't quite stale yet, so we skip it.
if collectedAt.Add(time.Duration(md.Interval) * intervalUnit).After(time.Now()) {
return
ctxTimeout := time.Duration(timeout) * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be lost in the flow here... why is this 1sec timeout needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also didn't change in this PR.

The timeout here can be 1s or it can be higher. I suppose the logic is that if you want metadata to update every 1s, collection should run faster than 1s, but this is likely not ideal.

We could change this in the future or if it becomes a problem.

ctx, cancel := context.WithTimeout(ctx, ctxTimeout)
defer cancel()

now := time.Now()
select {
case <-ctx.Done():
logger.Warn(ctx, "metadata collection timed out", slog.F("timeout", ctxTimeout))
case metadataResults <- metadataResultAndKey{
key: md.Key,
result: a.collectMetadata(ctx, md, now),
}:
lastCollectedAtMu.Lock()
lastCollectedAts[md.Key] = now
lastCollectedAtMu.Unlock()
}
}
})
}
}
}()

timeout := md.Timeout
if timeout == 0 {
if md.Interval != 0 {
timeout = md.Interval
} else if interval := int64(a.reportMetadataInterval.Seconds()); interval != 0 {
// Fallback to the report interval
timeout = interval * 3
} else {
// If the interval is still 0 (possible if the interval
// is less than a second), default to 5. This was
// randomly picked.
timeout = 5
}
// Gather metadata updates and report them once every interval. If a
// previous report is in flight, wait for it to complete before
// sending a new one. If the network conditions are bad, we won't
// benefit from canceling the previous send and starting a new one.
var (
updatedMetadata = make(map[string]*codersdk.WorkspaceAgentMetadataResult)
reportTimeout = 30 * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many magic timeouts hidden in this file. I'm wondering if we shouldn't move them to the top of file, and make them more dependent. For instance: instead of 5sec -> reportTimeout * 1/6

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With 5 sec, are you referring to the case where md.Timeout is not set (or zero), and we fall back to 5 sec?

I wanted to change as little about the metadata collection as possible, but I can go over this and see what can be consolidated.

This 30 second timeout was randomly picked by me as a crutch for scenarios where the network is super slow or down. It should be enough for at least something to get through, whilst avoiding sending very stale data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I'm afraid that there are too many timeouts depending on each other in the entire implementation, and it might be hard to debug potential queueing problems.

reportSemaphore = make(chan struct{}, 1)
)
reportSemaphore <- struct{}{}

for {
select {
case <-ctx.Done():
return
case mr := <-metadataResults:
// This can overwrite unsent values, but that's fine because
// we're only interested about up-to-date values.
updatedMetadata[mr.key] = mr.result
continue
case <-report:
if len(updatedMetadata) > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding this correctly - say you have two agent metadata definitions, one which updates every 1 second, the other which updates every 10 seconds.

In the previous request-per-metadata-key approach, this would cause (0.1 + 1) 1.1 requests per second, while in this new approach we would end up with a minimum of 1 requests per second, as the more frequently updated metadatum would cause a batch request.

I'm think we should update the documentation with this change to reflect the new behaviour. I think it would also make sense to recommend users to keep in mind the minimum metadata refresh interval when writing their templates; any metadatum with a refresh interval of 1 second will cause frequent metadata updates from what I understand here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous request-per-metadata-key approach, this would cause (0.1 + 1) 1.1 requests per second, while in this new approach we would end up with a minimum of 1 requests per second.

I'm not sure where 0.1 comes from in your example, but for simplicity sake, let's say we have 1s and 2s intervals for the metadata, and each take 0ns to execute. In the previous implementation we would approximately do:

14:00:00 POST meta1
14:00:00 POST meta2
14:00:01 POST meta1
14:00:02 POST meta1
14:00:02 POST meta2
14:00:03 POST meta1

In the new implementation, we would:

14:00:00 POST meta1, meta2
14:00:01 POST meta1
14:00:02 POST meta1, meta2
14:00:03 POST meta1

With 2s and 3s metadata, it would look like this:

Old:

14:00:00 POST meta1
14:00:00 POST meta2
14:00:02 POST meta1
14:00:03 POST meta2
14:00:04 POST meta1
14:00:06 POST meta1
14:00:06 POST meta2

New:

14:00:00 POST meta1, meta2
14:00:02 POST meta1
14:00:03 POST meta2
14:00:04 POST meta1
14:00:06 POST meta1, meta2

This is an approximation of ideal conditions, though. And perhaps we should separate collect and send triggers by, say, 500ms. This would increase the likelyhood of ideal batching.

With regards to RPS the new implementation reduces RPS and doesn't necessarily guarantee 1 RPS, it depends on interval and how long commands take to execute.

I'm think we should update the documentation with this change to reflect the new behaviour.

Are you referring to this https://coder.com/docs/v2/latest/templates/agent-metadata#db-write-load?

The write load will be about the same, but the writes will be more performant since they're batched. I suppose we could call one batch one write, even though we're updating multiple rows. I'll amend this part.

Copy link
Member

@johnstcn johnstcn Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't an agent metadatum with a refresh interval of 10 seconds cause 1 request roughly every 10 seconds (i.e. 0.1 RPS)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the case in both implementations.

metadata := make([]agentsdk.Metadata, 0, len(updatedMetadata))
for key, result := range updatedMetadata {
metadata = append(metadata, agentsdk.Metadata{
Key: key,
WorkspaceAgentMetadataResult: *result,
})
delete(updatedMetadata, key)
}
ctxTimeout := time.Duration(timeout) * time.Second
ctx, cancel := context.WithTimeout(ctx, ctxTimeout)
defer cancel()

now := time.Now()
select {
case <-ctx.Done():
logger.Warn(ctx, "metadata collection timed out", slog.F("timeout", ctxTimeout))
case metadataResults <- metadataResultAndKey{
key: md.Key,
result: a.collectMetadata(ctx, md, now),
}:
lastCollectedAtMu.Lock()
lastCollectedAts[md.Key] = now
lastCollectedAtMu.Unlock()
case <-reportSemaphore:
default:
// If there's already a report in flight, don't send
// another one, wait for next tick instead.
continue
}
})

go func() {
ctx, cancel := context.WithTimeout(ctx, reportTimeout)
defer func() {
cancel()
reportSemaphore <- struct{}{}
}()

err := a.client.PostMetadata(ctx, agentsdk.PostMetadataRequest{Metadata: metadata})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior doesn't change, right:

If the agent fails to send the metadata, it will be lost. There is no "retry" mechanism in place now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we don't retry since trying to send stale data wouldn't make much sense, we instead wait until the next update is available and try to send it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok! I guess we can debate about the lack of source data for insights, but that's a separate issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is not used for insights, so it will not have an effect on that.

if err != nil {
a.logger.Error(ctx, "agent failed to report metadata", slog.Error(err))
}
}()
}
}
}
}
Expand Down
27 changes: 18 additions & 9 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,34 +1066,43 @@ func TestAgent_Metadata(t *testing.T) {

t.Run("Once", func(t *testing.T) {
t.Parallel()

//nolint:dogsled
_, client, _, _, _ := setupAgent(t, agentsdk.Manifest{
Metadata: []codersdk.WorkspaceAgentMetadataDescription{
{
Key: "greeting",
Key: "greeting1",
Interval: 0,
Script: echoHello,
},
{
Key: "greeting2",
Interval: 1,
Script: echoHello,
},
},
}, 0, func(_ *agenttest.Client, opts *agent.Options) {
opts.ReportMetadataInterval = 100 * time.Millisecond
opts.ReportMetadataInterval = testutil.IntervalFast
})

var gotMd map[string]agentsdk.Metadata
require.Eventually(t, func() bool {
gotMd = client.GetMetadata()
return len(gotMd) == 1
}, testutil.WaitShort, testutil.IntervalMedium)
return len(gotMd) == 2
}, testutil.WaitShort, testutil.IntervalFast/2)

collectedAt := gotMd["greeting"].CollectedAt
collectedAt1 := gotMd["greeting1"].CollectedAt
collectedAt2 := gotMd["greeting2"].CollectedAt

require.Never(t, func() bool {
require.Eventually(t, func() bool {
gotMd = client.GetMetadata()
if len(gotMd) != 1 {
if len(gotMd) != 2 {
panic("unexpected number of metadata")
}
return !gotMd["greeting"].CollectedAt.Equal(collectedAt)
}, testutil.WaitShort, testutil.IntervalMedium)
return !gotMd["greeting2"].CollectedAt.Equal(collectedAt2)
}, testutil.WaitShort, testutil.IntervalFast/2)

require.Equal(t, gotMd["greeting1"].CollectedAt, collectedAt1, "metadata should not be collected again")
})

t.Run("Many", func(t *testing.T) {
Expand Down
Loading