-
Notifications
You must be signed in to change notification settings - Fork 1k
feat(coderd): batch agent stats inserts #8875
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notes:
cli/server.go
Outdated
@@ -813,6 +814,21 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. | |||
options.SwaggerEndpoint = cfg.Swagger.Enable.Value() | |||
} | |||
|
|||
batchStatsTicker := time.NewTicker(29 * time.Second) // Hard-coding. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review: Would this be better set at a lower interval, or made configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, 29 sec looks magical :) You may need to comment on why is it exactly this.
I didn't look deeper into the review yet, but you may want to introduce 2 limits: timeout and buffer size are getting full.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the interval 1 second instead by default.
stat := statByAgent[agentID] | ||
stat.AgentID = agentStat.AgentID | ||
stat.TemplateID = agentStat.TemplateID | ||
stat.UserID = agentStat.UserID | ||
stat.WorkspaceID = agentStat.WorkspaceID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review: this was bugged before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good finding 👍 Will it be covered with tests now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the tests added here were the impetus for this change!
unnest(@workspace_id :: uuid[]) AS workspace_id, | ||
unnest(@template_id :: uuid[]) AS template_id, | ||
unnest(@agent_id :: uuid[]) AS agent_id, | ||
jsonb_array_elements(@connections_by_proto :: jsonb) AS connections_by_proto, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review: connections_by_proto
needs to be handled as jsonb[]
and not jsonb[]
as that ends up being passed to pq.Array
which turns it into something like {{123,34,106,34,...125}}
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:)
jsonb[] and not jsonb[]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙃
jsonb
and not jsonb[]
unnest(@workspace_id :: uuid[]) AS workspace_id, | ||
unnest(@template_id :: uuid[]) AS template_id, | ||
unnest(@agent_id :: uuid[]) AS agent_id, | ||
jsonb_array_elements(@connections_by_proto :: jsonb) AS connections_by_proto, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:)
jsonb[] and not jsonb[]
stat := statByAgent[agentID] | ||
stat.AgentID = agentStat.AgentID | ||
stat.TemplateID = agentStat.TemplateID | ||
stat.UserID = agentStat.UserID | ||
stat.WorkspaceID = agentStat.WorkspaceID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good finding 👍 Will it be covered with tests now?
ctx, cancel := context.WithCancel(context.Background()) | ||
|
||
if options.StatsBatcher == nil { | ||
panic("developer error: options.StatsBatcher is nil") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why New(options)
can't define a default instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, just keeping to the existing pattern in this file.
coderd/batchstats/batcher.go
Outdated
|
||
const ( | ||
// DefaultBatchSize is the default size of the batcher's buffer. | ||
DefaultBatchSize = 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a typical environment, how many items are in the buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming 1000 workspaces, the worst-case scenario is that they all attempt to push stats at once. That's extremely unlikely however.
coderd/batchstats/batcher.go
Outdated
b.buf.SessionCountSSH = append(b.buf.SessionCountSSH, st.SessionCountSSH) | ||
b.buf.ConnectionMedianLatencyMS = append(b.buf.ConnectionMedianLatencyMS, st.ConnectionMedianLatencyMS) | ||
|
||
// If the buffer is full, signal the flusher to flush immediately. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do you think that we should flush before the buffer is full? Let's say 80% of occupancy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would the advantage of this be? If you're trying to avoid blocking during flush, blocking will happen regardless of when we flush. We could make a temporary copy of the buffer while we flush to avoid that, but batch-inserting 1000 entries happens fairly quickly from what I can tell.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had a conversation about this.
We came up with passing the existing buffer to the lever and resetting the buffer. So that way the buffer is new for the next Add
and the previous data is pending the db query.
Psuedo code:
if len(b.buf.ID) == cap(b.buf.ID) {
select{
case b.flushLever <- struct{}{}:
default:
b.log.Error(context.Background(), "this should never happen, dropping agent stats :(")
}
b.flushLever <- struct{}{}
b.flushLever <- &b.buf
b.buf = make(database.InsertWorkspaceAgentStatsParams, 0, b.batchSize)
}
Some additional nits:
- We should probably use the constant instead of
cap
for comparison sincecap
can grow.len(b.buf.ID) >= b.batchSize
- We should use a select and throw an error log if we are backing up. We should drop the stats, as if we are backing up, there is nothing we can do except log it anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It ended up being simpler to do an early flush. However, I thought about this some more and I think using a buffered channel would also resolve this issue. This is a possible later enhancement.
coderd/batchstats/batcher.go
Outdated
b.buf.ConnectionsByProto = payload | ||
} | ||
|
||
err = b.store.InsertWorkspaceAgentStats(ctx, b.buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if one "stat" in the batch is faulty? Is the whole batch rejected?
Bonus points: maybe we need a test for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the whole batch will be rejected. That's a good idea for a test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to exclude the faulty item, and keep the rest of the batch, but I guess that it would be tricky to implement, or just fall back to iterative approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not saying to solve this, but dang our server.go
start function is growing huge. We should really look into a better way to manage it.
coderd/batchstats/batcher.go
Outdated
b.buf.SessionCountSSH = append(b.buf.SessionCountSSH, st.SessionCountSSH) | ||
b.buf.ConnectionMedianLatencyMS = append(b.buf.ConnectionMedianLatencyMS, st.ConnectionMedianLatencyMS) | ||
|
||
// If the buffer is full, signal the flusher to flush immediately. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had a conversation about this.
We came up with passing the existing buffer to the lever and resetting the buffer. So that way the buffer is new for the next Add
and the previous data is pending the db query.
Psuedo code:
if len(b.buf.ID) == cap(b.buf.ID) {
select{
case b.flushLever <- struct{}{}:
default:
b.log.Error(context.Background(), "this should never happen, dropping agent stats :(")
}
b.flushLever <- struct{}{}
b.flushLever <- &b.buf
b.buf = make(database.InsertWorkspaceAgentStatsParams, 0, b.batchSize)
}
Some additional nits:
- We should probably use the constant instead of
cap
for comparison sincecap
can grow.len(b.buf.ID) >= b.batchSize
- We should use a select and throw an error log if we are backing up. We should drop the stats, as if we are backing up, there is nothing we can do except log it anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good 👍
b.flushLever <- struct{}{} | ||
b.flushForced.Store(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the flushForced
just prevent a deadlock by adding to the flushLever
channel if it already has something?
Can we just use a select statement to prevent pushing to the flushLever channel if it already has an item?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried the select but it will still end up doing one extra unwanted flush.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, in theory an extra flush is lesser evil than losing data.
This PR adds support for batching inserts to the
workspace_agents_stats
table.Closes #8063