Skip to content

chore: fix TestBatchStats flake #8952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions coderd/batchstats/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func New(ctx context.Context, opts ...Option) (*Batcher, func(), error) {

// Add adds a stat to the batcher for the given workspace and agent.
func (b *Batcher) Add(
now time.Time,
agentID uuid.UUID,
templateID uuid.UUID,
userID uuid.UUID,
Expand All @@ -135,7 +136,7 @@ func (b *Batcher) Add(
b.mu.Lock()
defer b.mu.Unlock()

now := database.Now()
now = database.Time(now)

b.buf.ID = append(b.buf.ID, uuid.New())
b.buf.CreatedAt = append(b.buf.CreatedAt, now)
Expand Down Expand Up @@ -199,15 +200,6 @@ func (b *Batcher) flush(ctx context.Context, forced bool, reason string) {
defer func() {
b.flushForced.Store(false)
b.mu.Unlock()
// Notify that a flush has completed. This only happens in tests.
if b.flushed != nil {
select {
case <-ctx.Done():
close(b.flushed)
default:
b.flushed <- count
}
}
if count > 0 {
elapsed := time.Since(start)
b.log.Debug(ctx, "flush complete",
Expand All @@ -217,6 +209,15 @@ func (b *Batcher) flush(ctx context.Context, forced bool, reason string) {
slog.F("reason", reason),
)
}
// Notify that a flush has completed. This only happens in tests.
if b.flushed != nil {
select {
case <-ctx.Done():
close(b.flushed)
default:
b.flushed <- count
}
}
}()

if len(b.buf.ID) == 0 {
Expand Down
16 changes: 8 additions & 8 deletions coderd/batchstats/batcher_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestBatchStats(t *testing.T) {

// Given: no data points are added for workspace
// When: it becomes time to report stats
t1 := time.Now()
t1 := database.Now()
// Signal a tick and wait for a flush to complete.
tick <- t1
f := <-flushed
Expand All @@ -59,9 +59,9 @@ func TestBatchStats(t *testing.T) {
require.Empty(t, stats, "should have no stats for workspace")

// Given: a single data point is added for workspace
t2 := time.Now()
t2 := t1.Add(time.Second)
t.Logf("inserting 1 stat")
require.NoError(t, b.Add(deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t)))
require.NoError(t, b.Add(t2.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t)))

// When: it becomes time to report stats
// Signal a tick and wait for a flush to complete.
Expand All @@ -77,17 +77,17 @@ func TestBatchStats(t *testing.T) {

// Given: a lot of data points are added for both workspaces
// (equal to batch size)
t3 := time.Now()
t3 := t2.Add(time.Second)
done := make(chan struct{})

go func() {
defer close(done)
t.Logf("inserting %d stats", defaultBufferSize)
for i := 0; i < defaultBufferSize; i++ {
if i%2 == 0 {
require.NoError(t, b.Add(deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t)))
require.NoError(t, b.Add(t3.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t)))
} else {
require.NoError(t, b.Add(deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randAgentSDKStats(t)))
require.NoError(t, b.Add(t3.Add(time.Millisecond), deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randAgentSDKStats(t)))
}
}
}()
Expand All @@ -105,15 +105,15 @@ func TestBatchStats(t *testing.T) {
require.Len(t, stats, 2, "should have stats for both workspaces")

// Ensures that a subsequent flush pushes all the remaining data
t4 := time.Now()
t4 := t3.Add(time.Second)
tick <- t4
f2 := <-flushed
t.Logf("flush 4 completed")
expectedCount := defaultBufferSize - f
require.Equal(t, expectedCount, f2, "did not flush expected remaining rows")

// Ensure that a subsequent flush does not push stale data.
t5 := time.Now()
t5 := t4.Add(time.Second)
tick <- t5
f = <-flushed
require.Zero(t, f, "expected zero stats to have been flushed")
Expand Down
2 changes: 1 addition & 1 deletion coderd/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -1414,7 +1414,7 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques

var errGroup errgroup.Group
errGroup.Go(func() error {
if err := api.statsBatcher.Add(workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req); err != nil {
if err := api.statsBatcher.Add(time.Now(), workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req); err != nil {
api.Logger.Error(ctx, "failed to add stats to batcher", slog.Error(err))
return xerrors.Errorf("can't insert workspace agent stat: %w", err)
}
Expand Down