Skip to content

Conversation

johnstcn
Copy link
Member

@johnstcn johnstcn commented Aug 3, 2023

This PR adds support for batching inserts to the workspace_agents_stats table.

Closes #8063

@johnstcn johnstcn self-assigned this Aug 3, 2023
Copy link
Member Author

@johnstcn johnstcn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notes:

cli/server.go Outdated
@@ -813,6 +814,21 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
options.SwaggerEndpoint = cfg.Swagger.Enable.Value()
}

batchStatsTicker := time.NewTicker(29 * time.Second) // Hard-coding.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review: Would this be better set at a lower interval, or made configurable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, 29 sec looks magical :) You may need to comment on why is it exactly this.

I didn't look deeper into the review yet, but you may want to introduce 2 limits: timeout and buffer size are getting full.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the interval 1 second instead by default.

Comment on lines +2705 to +2709
stat := statByAgent[agentID]
stat.AgentID = agentStat.AgentID
stat.TemplateID = agentStat.TemplateID
stat.UserID = agentStat.UserID
stat.WorkspaceID = agentStat.WorkspaceID
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review: this was bugged before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good finding 👍 Will it be covered with tests now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the tests added here were the impetus for this change!

unnest(@workspace_id :: uuid[]) AS workspace_id,
unnest(@template_id :: uuid[]) AS template_id,
unnest(@agent_id :: uuid[]) AS agent_id,
jsonb_array_elements(@connections_by_proto :: jsonb) AS connections_by_proto,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review: connections_by_proto needs to be handled as jsonb[] and not jsonb[] as that ends up being passed to pq.Array which turns it into something like {{123,34,106,34,...125}}.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

jsonb[] and not jsonb[]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙃

jsonb and not jsonb[]

@johnstcn johnstcn marked this pull request as ready for review August 3, 2023 11:58
@johnstcn johnstcn requested review from mtojek and Emyrk August 3, 2023 13:55
unnest(@workspace_id :: uuid[]) AS workspace_id,
unnest(@template_id :: uuid[]) AS template_id,
unnest(@agent_id :: uuid[]) AS agent_id,
jsonb_array_elements(@connections_by_proto :: jsonb) AS connections_by_proto,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

jsonb[] and not jsonb[]

Comment on lines +2705 to +2709
stat := statByAgent[agentID]
stat.AgentID = agentStat.AgentID
stat.TemplateID = agentStat.TemplateID
stat.UserID = agentStat.UserID
stat.WorkspaceID = agentStat.WorkspaceID
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good finding 👍 Will it be covered with tests now?

ctx, cancel := context.WithCancel(context.Background())

if options.StatsBatcher == nil {
panic("developer error: options.StatsBatcher is nil")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why New(options) can't define a default instance?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, just keeping to the existing pattern in this file.


const (
// DefaultBatchSize is the default size of the batcher's buffer.
DefaultBatchSize = 1024
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a typical environment, how many items are in the buffer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming 1000 workspaces, the worst-case scenario is that they all attempt to push stats at once. That's extremely unlikely however.

b.buf.SessionCountSSH = append(b.buf.SessionCountSSH, st.SessionCountSSH)
b.buf.ConnectionMedianLatencyMS = append(b.buf.ConnectionMedianLatencyMS, st.ConnectionMedianLatencyMS)

// If the buffer is full, signal the flusher to flush immediately.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do you think that we should flush before the buffer is full? Let's say 80% of occupancy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the advantage of this be? If you're trying to avoid blocking during flush, blocking will happen regardless of when we flush. We could make a temporary copy of the buffer while we flush to avoid that, but batch-inserting 1000 entries happens fairly quickly from what I can tell.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a conversation about this.

We came up with passing the existing buffer to the lever and resetting the buffer. So that way the buffer is new for the next Add and the previous data is pending the db query.

Psuedo code:

	if len(b.buf.ID) == cap(b.buf.ID) {
		select{
		case b.flushLever <- struct{}{}:
		default:
			b.log.Error(context.Background(), "this should never happen, dropping agent stats :(")
		}
		b.flushLever <- struct{}{}
		b.flushLever <- &b.buf
		b.buf = make(database.InsertWorkspaceAgentStatsParams, 0, b.batchSize)
	}

Some additional nits:

  • We should probably use the constant instead of cap for comparison since cap can grow. len(b.buf.ID) >= b.batchSize
  • We should use a select and throw an error log if we are backing up. We should drop the stats, as if we are backing up, there is nothing we can do except log it anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It ended up being simpler to do an early flush. However, I thought about this some more and I think using a buffered channel would also resolve this issue. This is a possible later enhancement.

b.buf.ConnectionsByProto = payload
}

err = b.store.InsertWorkspaceAgentStats(ctx, b.buf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if one "stat" in the batch is faulty? Is the whole batch rejected?

Bonus points: maybe we need a test for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the whole batch will be rejected. That's a good idea for a test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to exclude the faulty item, and keep the rest of the batch, but I guess that it would be tricky to implement, or just fall back to iterative approach.

Copy link
Member

@Emyrk Emyrk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not saying to solve this, but dang our server.go start function is growing huge. We should really look into a better way to manage it.

b.buf.SessionCountSSH = append(b.buf.SessionCountSSH, st.SessionCountSSH)
b.buf.ConnectionMedianLatencyMS = append(b.buf.ConnectionMedianLatencyMS, st.ConnectionMedianLatencyMS)

// If the buffer is full, signal the flusher to flush immediately.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a conversation about this.

We came up with passing the existing buffer to the lever and resetting the buffer. So that way the buffer is new for the next Add and the previous data is pending the db query.

Psuedo code:

	if len(b.buf.ID) == cap(b.buf.ID) {
		select{
		case b.flushLever <- struct{}{}:
		default:
			b.log.Error(context.Background(), "this should never happen, dropping agent stats :(")
		}
		b.flushLever <- struct{}{}
		b.flushLever <- &b.buf
		b.buf = make(database.InsertWorkspaceAgentStatsParams, 0, b.batchSize)
	}

Some additional nits:

  • We should probably use the constant instead of cap for comparison since cap can grow. len(b.buf.ID) >= b.batchSize
  • We should use a select and throw an error log if we are backing up. We should drop the stats, as if we are backing up, there is nothing we can do except log it anyway.

@mtojek mtojek self-requested a review August 4, 2023 06:04
Copy link
Member

@mtojek mtojek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good 👍

b.flushLever <- struct{}{}
b.flushForced.Store(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the flushForced just prevent a deadlock by adding to the flushLever channel if it already has something?

Can we just use a select statement to prevent pushing to the flushLever channel if it already has an item?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the select but it will still end up doing one extra unwanted flush.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, in theory an extra flush is lesser evil than losing data.

@johnstcn johnstcn merged commit 9fb18f3 into main Aug 4, 2023
@johnstcn johnstcn deleted the cj/batch-agent-stat-insert branch August 4, 2023 16:00
@github-actions github-actions bot locked and limited conversation to collaborators Aug 4, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

batch insert workspace agent stat updates
3 participants