From 1fee5b9ca0d1b607dbe26cd148e524003870d233 Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Mon, 7 Aug 2023 23:12:35 +0000 Subject: [PATCH 1/5] fix batcher maybe --- coderd/batchstats/batcher.go | 18 +++++++++--------- coderd/batchstats/batcher_internal_test.go | 10 +++++----- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/coderd/batchstats/batcher.go b/coderd/batchstats/batcher.go index fc177fd143d6a..fdc7cadd842d0 100644 --- a/coderd/batchstats/batcher.go +++ b/coderd/batchstats/batcher.go @@ -199,15 +199,6 @@ func (b *Batcher) flush(ctx context.Context, forced bool, reason string) { defer func() { b.flushForced.Store(false) b.mu.Unlock() - // Notify that a flush has completed. This only happens in tests. - if b.flushed != nil { - select { - case <-ctx.Done(): - close(b.flushed) - default: - b.flushed <- count - } - } if count > 0 { elapsed := time.Since(start) b.log.Debug(ctx, "flush complete", @@ -217,6 +208,15 @@ func (b *Batcher) flush(ctx context.Context, forced bool, reason string) { slog.F("reason", reason), ) } + // Notify that a flush has completed. This only happens in tests. + if b.flushed != nil { + select { + case <-ctx.Done(): + close(b.flushed) + default: + b.flushed <- count + } + } }() if len(b.buf.ID) == 0 { diff --git a/coderd/batchstats/batcher_internal_test.go b/coderd/batchstats/batcher_internal_test.go index a6e28f1a9f389..b66f4573b7c5f 100644 --- a/coderd/batchstats/batcher_internal_test.go +++ b/coderd/batchstats/batcher_internal_test.go @@ -46,7 +46,7 @@ func TestBatchStats(t *testing.T) { // Given: no data points are added for workspace // When: it becomes time to report stats - t1 := time.Now() + t1 := database.Now() // Signal a tick and wait for a flush to complete. tick <- t1 f := <-flushed @@ -59,7 +59,7 @@ func TestBatchStats(t *testing.T) { require.Empty(t, stats, "should have no stats for workspace") // Given: a single data point is added for workspace - t2 := time.Now() + t2 := database.Now() t.Logf("inserting 1 stat") require.NoError(t, b.Add(deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) @@ -77,7 +77,7 @@ func TestBatchStats(t *testing.T) { // Given: a lot of data points are added for both workspaces // (equal to batch size) - t3 := time.Now() + t3 := database.Now() done := make(chan struct{}) go func() { @@ -105,7 +105,7 @@ func TestBatchStats(t *testing.T) { require.Len(t, stats, 2, "should have stats for both workspaces") // Ensures that a subsequent flush pushes all the remaining data - t4 := time.Now() + t4 := database.Now() tick <- t4 f2 := <-flushed t.Logf("flush 4 completed") @@ -113,7 +113,7 @@ func TestBatchStats(t *testing.T) { require.Equal(t, expectedCount, f2, "did not flush expected remaining rows") // Ensure that a subsequent flush does not push stale data. - t5 := time.Now() + t5 := database.Now() tick <- t5 f = <-flushed require.Zero(t, f, "expected zero stats to have been flushed") From 3314b7fcae1c0eea5e5104d976767724b36beb40 Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Tue, 8 Aug 2023 00:16:32 +0000 Subject: [PATCH 2/5] try to fix batcher flake again --- coderd/database/dbfake/dbfake.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/coderd/database/dbfake/dbfake.go b/coderd/database/dbfake/dbfake.go index c968a52f8d00d..73a1b74e79fe9 100644 --- a/coderd/database/dbfake/dbfake.go +++ b/coderd/database/dbfake/dbfake.go @@ -2799,14 +2799,14 @@ func (q *FakeQuerier) GetWorkspaceAgentStats(_ context.Context, createdAfter tim agentStatsCreatedAfter := make([]database.WorkspaceAgentStat, 0) for _, agentStat := range q.workspaceAgentStats { - if agentStat.CreatedAt.After(createdAfter) { + if agentStat.CreatedAt.After(createdAfter) || agentStat.CreatedAt.Equal(createdAfter) { agentStatsCreatedAfter = append(agentStatsCreatedAfter, agentStat) } } latestAgentStats := map[uuid.UUID]database.WorkspaceAgentStat{} for _, agentStat := range q.workspaceAgentStats { - if agentStat.CreatedAt.After(createdAfter) { + if agentStat.CreatedAt.After(createdAfter) || agentStat.CreatedAt.Equal(createdAfter) { latestAgentStats[agentStat.AgentID] = agentStat } } From 122291c3a7df02edc85dbc0adefbd6f7bb09de6e Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Tue, 8 Aug 2023 00:24:26 +0000 Subject: [PATCH 3/5] fixup! try to fix batcher flake again --- coderd/batchstats/batcher.go | 3 ++- coderd/batchstats/batcher_internal_test.go | 10 +++++----- coderd/workspaceagents.go | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/coderd/batchstats/batcher.go b/coderd/batchstats/batcher.go index fdc7cadd842d0..a5239102f2182 100644 --- a/coderd/batchstats/batcher.go +++ b/coderd/batchstats/batcher.go @@ -126,6 +126,7 @@ func New(ctx context.Context, opts ...Option) (*Batcher, func(), error) { // Add adds a stat to the batcher for the given workspace and agent. func (b *Batcher) Add( + now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, @@ -135,7 +136,7 @@ func (b *Batcher) Add( b.mu.Lock() defer b.mu.Unlock() - now := database.Now() + now = database.Time(now) b.buf.ID = append(b.buf.ID, uuid.New()) b.buf.CreatedAt = append(b.buf.CreatedAt, now) diff --git a/coderd/batchstats/batcher_internal_test.go b/coderd/batchstats/batcher_internal_test.go index b66f4573b7c5f..74882cce8e24e 100644 --- a/coderd/batchstats/batcher_internal_test.go +++ b/coderd/batchstats/batcher_internal_test.go @@ -59,9 +59,9 @@ func TestBatchStats(t *testing.T) { require.Empty(t, stats, "should have no stats for workspace") // Given: a single data point is added for workspace - t2 := database.Now() + t2 := t1.Add(time.Millisecond) t.Logf("inserting 1 stat") - require.NoError(t, b.Add(deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) + require.NoError(t, b.Add(t2, deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) // When: it becomes time to report stats // Signal a tick and wait for a flush to complete. @@ -77,7 +77,7 @@ func TestBatchStats(t *testing.T) { // Given: a lot of data points are added for both workspaces // (equal to batch size) - t3 := database.Now() + t3 := t2.Add(time.Millisecond) done := make(chan struct{}) go func() { @@ -85,9 +85,9 @@ func TestBatchStats(t *testing.T) { t.Logf("inserting %d stats", defaultBufferSize) for i := 0; i < defaultBufferSize; i++ { if i%2 == 0 { - require.NoError(t, b.Add(deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) + require.NoError(t, b.Add(t3, deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) } else { - require.NoError(t, b.Add(deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randAgentSDKStats(t))) + require.NoError(t, b.Add(t3, deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randAgentSDKStats(t))) } } }() diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 0f5607db73436..545f3e7c6ed84 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -1414,7 +1414,7 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques var errGroup errgroup.Group errGroup.Go(func() error { - if err := api.statsBatcher.Add(workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req); err != nil { + if err := api.statsBatcher.Add(time.Now(), workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req); err != nil { api.Logger.Error(ctx, "failed to add stats to batcher", slog.Error(err)) return xerrors.Errorf("can't insert workspace agent stat: %w", err) } From 112e20a7e76e79fcedadbde5a0583a04fcd56cdf Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Tue, 8 Aug 2023 00:25:06 +0000 Subject: [PATCH 4/5] fixup! try to fix batcher flake again --- coderd/batchstats/batcher_internal_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/coderd/batchstats/batcher_internal_test.go b/coderd/batchstats/batcher_internal_test.go index 74882cce8e24e..4e886e8f89df4 100644 --- a/coderd/batchstats/batcher_internal_test.go +++ b/coderd/batchstats/batcher_internal_test.go @@ -105,7 +105,7 @@ func TestBatchStats(t *testing.T) { require.Len(t, stats, 2, "should have stats for both workspaces") // Ensures that a subsequent flush pushes all the remaining data - t4 := database.Now() + t4 := t3.Add(time.Millisecond) tick <- t4 f2 := <-flushed t.Logf("flush 4 completed") @@ -113,7 +113,7 @@ func TestBatchStats(t *testing.T) { require.Equal(t, expectedCount, f2, "did not flush expected remaining rows") // Ensure that a subsequent flush does not push stale data. - t5 := database.Now() + t5 := t4.Add(time.Millisecond) tick <- t5 f = <-flushed require.Zero(t, f, "expected zero stats to have been flushed") From 1d2f47e666056d18eaea3404d55767ebc36a165f Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Tue, 8 Aug 2023 02:25:14 +0000 Subject: [PATCH 5/5] fixup! try to fix batcher flake again --- coderd/batchstats/batcher_internal_test.go | 14 +++++++------- coderd/database/dbfake/dbfake.go | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/coderd/batchstats/batcher_internal_test.go b/coderd/batchstats/batcher_internal_test.go index 4e886e8f89df4..8288442400a3e 100644 --- a/coderd/batchstats/batcher_internal_test.go +++ b/coderd/batchstats/batcher_internal_test.go @@ -59,9 +59,9 @@ func TestBatchStats(t *testing.T) { require.Empty(t, stats, "should have no stats for workspace") // Given: a single data point is added for workspace - t2 := t1.Add(time.Millisecond) + t2 := t1.Add(time.Second) t.Logf("inserting 1 stat") - require.NoError(t, b.Add(t2, deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) + require.NoError(t, b.Add(t2.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) // When: it becomes time to report stats // Signal a tick and wait for a flush to complete. @@ -77,7 +77,7 @@ func TestBatchStats(t *testing.T) { // Given: a lot of data points are added for both workspaces // (equal to batch size) - t3 := t2.Add(time.Millisecond) + t3 := t2.Add(time.Second) done := make(chan struct{}) go func() { @@ -85,9 +85,9 @@ func TestBatchStats(t *testing.T) { t.Logf("inserting %d stats", defaultBufferSize) for i := 0; i < defaultBufferSize; i++ { if i%2 == 0 { - require.NoError(t, b.Add(t3, deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) + require.NoError(t, b.Add(t3.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) } else { - require.NoError(t, b.Add(t3, deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randAgentSDKStats(t))) + require.NoError(t, b.Add(t3.Add(time.Millisecond), deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randAgentSDKStats(t))) } } }() @@ -105,7 +105,7 @@ func TestBatchStats(t *testing.T) { require.Len(t, stats, 2, "should have stats for both workspaces") // Ensures that a subsequent flush pushes all the remaining data - t4 := t3.Add(time.Millisecond) + t4 := t3.Add(time.Second) tick <- t4 f2 := <-flushed t.Logf("flush 4 completed") @@ -113,7 +113,7 @@ func TestBatchStats(t *testing.T) { require.Equal(t, expectedCount, f2, "did not flush expected remaining rows") // Ensure that a subsequent flush does not push stale data. - t5 := t4.Add(time.Millisecond) + t5 := t4.Add(time.Second) tick <- t5 f = <-flushed require.Zero(t, f, "expected zero stats to have been flushed") diff --git a/coderd/database/dbfake/dbfake.go b/coderd/database/dbfake/dbfake.go index 73a1b74e79fe9..c968a52f8d00d 100644 --- a/coderd/database/dbfake/dbfake.go +++ b/coderd/database/dbfake/dbfake.go @@ -2799,14 +2799,14 @@ func (q *FakeQuerier) GetWorkspaceAgentStats(_ context.Context, createdAfter tim agentStatsCreatedAfter := make([]database.WorkspaceAgentStat, 0) for _, agentStat := range q.workspaceAgentStats { - if agentStat.CreatedAt.After(createdAfter) || agentStat.CreatedAt.Equal(createdAfter) { + if agentStat.CreatedAt.After(createdAfter) { agentStatsCreatedAfter = append(agentStatsCreatedAfter, agentStat) } } latestAgentStats := map[uuid.UUID]database.WorkspaceAgentStat{} for _, agentStat := range q.workspaceAgentStats { - if agentStat.CreatedAt.After(createdAfter) || agentStat.CreatedAt.Equal(createdAfter) { + if agentStat.CreatedAt.After(createdAfter) { latestAgentStats[agentStat.AgentID] = agentStat } }