diff --git a/coderd/batchstats/batcher.go b/coderd/batchstats/batcher.go index fc177fd143d6a..a5239102f2182 100644 --- a/coderd/batchstats/batcher.go +++ b/coderd/batchstats/batcher.go @@ -126,6 +126,7 @@ func New(ctx context.Context, opts ...Option) (*Batcher, func(), error) { // Add adds a stat to the batcher for the given workspace and agent. func (b *Batcher) Add( + now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, @@ -135,7 +136,7 @@ func (b *Batcher) Add( b.mu.Lock() defer b.mu.Unlock() - now := database.Now() + now = database.Time(now) b.buf.ID = append(b.buf.ID, uuid.New()) b.buf.CreatedAt = append(b.buf.CreatedAt, now) @@ -199,15 +200,6 @@ func (b *Batcher) flush(ctx context.Context, forced bool, reason string) { defer func() { b.flushForced.Store(false) b.mu.Unlock() - // Notify that a flush has completed. This only happens in tests. - if b.flushed != nil { - select { - case <-ctx.Done(): - close(b.flushed) - default: - b.flushed <- count - } - } if count > 0 { elapsed := time.Since(start) b.log.Debug(ctx, "flush complete", @@ -217,6 +209,15 @@ func (b *Batcher) flush(ctx context.Context, forced bool, reason string) { slog.F("reason", reason), ) } + // Notify that a flush has completed. This only happens in tests. + if b.flushed != nil { + select { + case <-ctx.Done(): + close(b.flushed) + default: + b.flushed <- count + } + } }() if len(b.buf.ID) == 0 { diff --git a/coderd/batchstats/batcher_internal_test.go b/coderd/batchstats/batcher_internal_test.go index a6e28f1a9f389..8288442400a3e 100644 --- a/coderd/batchstats/batcher_internal_test.go +++ b/coderd/batchstats/batcher_internal_test.go @@ -46,7 +46,7 @@ func TestBatchStats(t *testing.T) { // Given: no data points are added for workspace // When: it becomes time to report stats - t1 := time.Now() + t1 := database.Now() // Signal a tick and wait for a flush to complete. tick <- t1 f := <-flushed @@ -59,9 +59,9 @@ func TestBatchStats(t *testing.T) { require.Empty(t, stats, "should have no stats for workspace") // Given: a single data point is added for workspace - t2 := time.Now() + t2 := t1.Add(time.Second) t.Logf("inserting 1 stat") - require.NoError(t, b.Add(deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) + require.NoError(t, b.Add(t2.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) // When: it becomes time to report stats // Signal a tick and wait for a flush to complete. @@ -77,7 +77,7 @@ func TestBatchStats(t *testing.T) { // Given: a lot of data points are added for both workspaces // (equal to batch size) - t3 := time.Now() + t3 := t2.Add(time.Second) done := make(chan struct{}) go func() { @@ -85,9 +85,9 @@ func TestBatchStats(t *testing.T) { t.Logf("inserting %d stats", defaultBufferSize) for i := 0; i < defaultBufferSize; i++ { if i%2 == 0 { - require.NoError(t, b.Add(deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) + require.NoError(t, b.Add(t3.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) } else { - require.NoError(t, b.Add(deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randAgentSDKStats(t))) + require.NoError(t, b.Add(t3.Add(time.Millisecond), deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randAgentSDKStats(t))) } } }() @@ -105,7 +105,7 @@ func TestBatchStats(t *testing.T) { require.Len(t, stats, 2, "should have stats for both workspaces") // Ensures that a subsequent flush pushes all the remaining data - t4 := time.Now() + t4 := t3.Add(time.Second) tick <- t4 f2 := <-flushed t.Logf("flush 4 completed") @@ -113,7 +113,7 @@ func TestBatchStats(t *testing.T) { require.Equal(t, expectedCount, f2, "did not flush expected remaining rows") // Ensure that a subsequent flush does not push stale data. - t5 := time.Now() + t5 := t4.Add(time.Second) tick <- t5 f = <-flushed require.Zero(t, f, "expected zero stats to have been flushed") diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 0f5607db73436..545f3e7c6ed84 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -1414,7 +1414,7 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques var errGroup errgroup.Group errGroup.Go(func() error { - if err := api.statsBatcher.Add(workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req); err != nil { + if err := api.statsBatcher.Add(time.Now(), workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req); err != nil { api.Logger.Error(ctx, "failed to add stats to batcher", slog.Error(err)) return xerrors.Errorf("can't insert workspace agent stat: %w", err) }