From e97f3a963529e2783333ce4f7460a95aafff6f61 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 11 Dec 2024 17:31:23 +0200 Subject: [PATCH 1/8] fix: accumulate agentstats until reported and fix insights DAU offset Fixes #15824 --- agent/stats.go | 20 +++++++++++++++----- coderd/insights.go | 2 +- coderd/insights_test.go | 24 +++++++++++++++++++++--- 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/agent/stats.go b/agent/stats.go index 2615ab339637b..68f5568f0622f 100644 --- a/agent/stats.go +++ b/agent/stats.go @@ -2,6 +2,7 @@ package agent import ( "context" + "maps" "sync" "time" @@ -32,7 +33,7 @@ type statsDest interface { // statsDest (agent API in prod) type statsReporter struct { *sync.Cond - networkStats *map[netlogtype.Connection]netlogtype.Counts + networkStats map[netlogtype.Connection]netlogtype.Counts unreported bool lastInterval time.Duration @@ -54,8 +55,18 @@ func (s *statsReporter) callback(_, _ time.Time, virtual, _ map[netlogtype.Conne s.L.Lock() defer s.L.Unlock() s.logger.Debug(context.Background(), "got stats callback") - s.networkStats = &virtual - s.unreported = true + // Accumulate stats until they've been reported. + if s.unreported { + if s.networkStats == nil && virtual != nil { + s.networkStats = make(map[netlogtype.Connection]netlogtype.Counts) + } + for k, v := range virtual { + s.networkStats[k] = s.networkStats[k].Add(v) + } + } else { + s.networkStats = maps.Clone(virtual) + s.unreported = true + } s.Broadcast() } @@ -96,9 +107,8 @@ func (s *statsReporter) reportLoop(ctx context.Context, dest statsDest) error { if ctxDone { return nil } - networkStats := *s.networkStats s.unreported = false - if err = s.reportLocked(ctx, dest, networkStats); err != nil { + if err = s.reportLocked(ctx, dest, s.networkStats); err != nil { return xerrors.Errorf("report stats: %w", err) } } diff --git a/coderd/insights.go b/coderd/insights.go index 7234a88d44fe9..d5faacee90bd5 100644 --- a/coderd/insights.go +++ b/coderd/insights.go @@ -89,7 +89,7 @@ func (api *API) returnDAUsInternal(rw http.ResponseWriter, r *http.Request, temp } for _, row := range rows { resp.Entries = append(resp.Entries, codersdk.DAUEntry{ - Date: row.StartTime.Format(time.DateOnly), + Date: row.StartTime.In(loc).Format(time.DateOnly), Amount: int(row.ActiveUsers), }) } diff --git a/coderd/insights_test.go b/coderd/insights_test.go index b47bc8ada534b..1468e1fdda7cf 100644 --- a/coderd/insights_test.go +++ b/coderd/insights_test.go @@ -48,16 +48,26 @@ func TestDeploymentInsights(t *testing.T) { db, ps := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure()) logger := testutil.Logger(t) rollupEvents := make(chan dbrollup.Event) + statsInterval := 500 * time.Millisecond + batcher, closeBatcher, err := workspacestats.NewBatcher(context.Background(), + workspacestats.BatcherWithLogger(logger.Named("batcher").Leveled(slog.LevelDebug)), + workspacestats.BatcherWithStore(db), + workspacestats.BatcherWithBatchSize(1), + workspacestats.BatcherWithInterval(statsInterval), + ) + require.NoError(t, err) + defer closeBatcher() client := coderdtest.New(t, &coderdtest.Options{ Database: db, Pubsub: ps, Logger: &logger, IncludeProvisionerDaemon: true, - AgentStatsRefreshInterval: time.Millisecond * 100, + AgentStatsRefreshInterval: statsInterval, + StatsBatcher: batcher, DatabaseRolluper: dbrollup.New( logger.Named("dbrollup").Leveled(slog.LevelDebug), db, - dbrollup.WithInterval(time.Millisecond*100), + dbrollup.WithInterval(statsInterval/2), dbrollup.WithEventChannel(rollupEvents), ), }) @@ -76,7 +86,7 @@ func TestDeploymentInsights(t *testing.T) { workspace := coderdtest.CreateWorkspace(t, client, template.ID) coderdtest.AwaitWorkspaceBuildJobCompleted(t, client, workspace.LatestBuild.ID) - ctx := testutil.Context(t, testutil.WaitLong) + ctx := testutil.Context(t, testutil.WaitSuperLong) // Pre-check, no permission issues. daus, err := client.DeploymentDAUs(ctx, codersdk.TimezoneOffsetHour(clientTz)) @@ -108,6 +118,13 @@ func TestDeploymentInsights(t *testing.T) { err = sess.Start("cat") require.NoError(t, err) + select { + case <-ctx.Done(): + require.Fail(t, "timed out waiting for initial rollup event") + case ev := <-rollupEvents: + require.True(t, ev.Init, "want init event") + } + for { select { case <-ctx.Done(): @@ -120,6 +137,7 @@ func TestDeploymentInsights(t *testing.T) { if len(daus.Entries) > 0 && daus.Entries[len(daus.Entries)-1].Amount > 0 { break } + t.Logf("waiting for deployment daus to update: %+v", daus) } } From bd4ae11eba268361d2571c98be876530dbaac3f9 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 11 Dec 2024 17:45:29 +0200 Subject: [PATCH 2/8] fix test --- agent/stats_internal_test.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/agent/stats_internal_test.go b/agent/stats_internal_test.go index e032910ee36aa..69203b97980fb 100644 --- a/agent/stats_internal_test.go +++ b/agent/stats_internal_test.go @@ -64,7 +64,7 @@ func TestStatsReporter(t *testing.T) { require.Equal(t, netStats, gotNetStats) // while we are collecting the stats, send in two new netStats to simulate - // what happens if we don't keep up. Only the latest should be kept. + // what happens if we don't keep up. The stats should be accumulated. netStats0 := map[netlogtype.Connection]netlogtype.Counts{ { Proto: ipproto.TCP, @@ -103,8 +103,20 @@ func TestStatsReporter(t *testing.T) { testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.UpdateStatsResponse{ReportInterval: durationpb.New(interval)}) // second update -- only netStats1 is reported + wantNetStats := map[netlogtype.Connection]netlogtype.Counts{ + { + Proto: ipproto.TCP, + Src: netip.MustParseAddrPort("192.168.1.33:4887"), + Dst: netip.MustParseAddrPort("192.168.2.99:9999"), + }: { + TxPackets: 21, + TxBytes: 21, + RxPackets: 21, + RxBytes: 21, + }, + } gotNetStats = testutil.RequireRecvCtx(ctx, t, fCollector.calls) - require.Equal(t, netStats1, gotNetStats) + require.Equal(t, wantNetStats, gotNetStats) stats = &proto.Stats{SessionCountJetbrains: 66} testutil.RequireSendCtx(ctx, t, fCollector.stats, stats) update = testutil.RequireRecvCtx(ctx, t, fDest.reqs) From 53f327581e20187a0c46a4ae1ab633b3435e90b6 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 11 Dec 2024 18:09:18 +0200 Subject: [PATCH 3/8] fix comment --- agent/stats_internal_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/stats_internal_test.go b/agent/stats_internal_test.go index 69203b97980fb..9fd6aa102a5aa 100644 --- a/agent/stats_internal_test.go +++ b/agent/stats_internal_test.go @@ -102,7 +102,7 @@ func TestStatsReporter(t *testing.T) { require.Equal(t, stats, update.Stats) testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.UpdateStatsResponse{ReportInterval: durationpb.New(interval)}) - // second update -- only netStats1 is reported + // second update -- netStat0 and netStats1 are accumulated and reported wantNetStats := map[netlogtype.Connection]netlogtype.Counts{ { Proto: ipproto.TCP, From 57fde091866ee76eb9657055267ade3b3f99ec7e Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 11 Dec 2024 19:07:38 +0200 Subject: [PATCH 4/8] add comment from PR review --- coderd/insights_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/coderd/insights_test.go b/coderd/insights_test.go index 1468e1fdda7cf..f466db49487b6 100644 --- a/coderd/insights_test.go +++ b/coderd/insights_test.go @@ -49,6 +49,7 @@ func TestDeploymentInsights(t *testing.T) { logger := testutil.Logger(t) rollupEvents := make(chan dbrollup.Event) statsInterval := 500 * time.Millisecond + // Speed up the test by controlling batch size and interval. batcher, closeBatcher, err := workspacestats.NewBatcher(context.Background(), workspacestats.BatcherWithLogger(logger.Named("batcher").Leveled(slog.LevelDebug)), workspacestats.BatcherWithStore(db), From 7f7df65be0645bd8efd9320c7a619474d590b021 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 11 Dec 2024 19:20:26 +0200 Subject: [PATCH 5/8] refactor stats accumulation --- agent/stats.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/agent/stats.go b/agent/stats.go index 68f5568f0622f..898d7117c6d9f 100644 --- a/agent/stats.go +++ b/agent/stats.go @@ -56,10 +56,7 @@ func (s *statsReporter) callback(_, _ time.Time, virtual, _ map[netlogtype.Conne defer s.L.Unlock() s.logger.Debug(context.Background(), "got stats callback") // Accumulate stats until they've been reported. - if s.unreported { - if s.networkStats == nil && virtual != nil { - s.networkStats = make(map[netlogtype.Connection]netlogtype.Counts) - } + if s.unreported && len(s.networkStats) > 0 { for k, v := range virtual { s.networkStats[k] = s.networkStats[k].Add(v) } From ff236f0aed775136553948a953d4c89166fdcfeb Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 11 Dec 2024 19:23:16 +0200 Subject: [PATCH 6/8] Update coderd/insights_test.go Co-authored-by: Danny Kopping --- coderd/insights_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coderd/insights_test.go b/coderd/insights_test.go index f466db49487b6..0d8bd8bbd32a0 100644 --- a/coderd/insights_test.go +++ b/coderd/insights_test.go @@ -121,7 +121,7 @@ func TestDeploymentInsights(t *testing.T) { select { case <-ctx.Done(): - require.Fail(t, "timed out waiting for initial rollup event") + require.Failf(t, "timed out waiting for initial rollup event: %s", ctx.Err()) case ev := <-rollupEvents: require.True(t, ev.Init, "want init event") } From 08dea241044082a4e43b9e45bc27839f3341be29 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 11 Dec 2024 19:59:12 +0200 Subject: [PATCH 7/8] fix format --- coderd/insights_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coderd/insights_test.go b/coderd/insights_test.go index 0d8bd8bbd32a0..2b7475f1bd9dc 100644 --- a/coderd/insights_test.go +++ b/coderd/insights_test.go @@ -121,7 +121,7 @@ func TestDeploymentInsights(t *testing.T) { select { case <-ctx.Done(): - require.Failf(t, "timed out waiting for initial rollup event: %s", ctx.Err()) + require.Failf(t, "timed out waiting for initial rollup event: %v", ctx.Err()) case ev := <-rollupEvents: require.True(t, ev.Init, "want init event") } From 3fc95da268793fc6eb1e7ff06851206f067ee28e Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 11 Dec 2024 20:43:15 +0200 Subject: [PATCH 8/8] fix failf --- coderd/insights_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coderd/insights_test.go b/coderd/insights_test.go index 2b7475f1bd9dc..43ef04435c218 100644 --- a/coderd/insights_test.go +++ b/coderd/insights_test.go @@ -121,7 +121,7 @@ func TestDeploymentInsights(t *testing.T) { select { case <-ctx.Done(): - require.Failf(t, "timed out waiting for initial rollup event: %v", ctx.Err()) + require.Fail(t, "timed out waiting for initial rollup event", ctx.Err()) case ev := <-rollupEvents: require.True(t, ev.Init, "want init event") }