Skip to content

Commit 4c5b737

Browse files
authored
fix: accumulate agentstats until reported and fix insights DAU offset (coder#15832)
1 parent 77dc510 commit 4c5b737

File tree

4 files changed

+50
-12
lines changed

4 files changed

+50
-12
lines changed

agent/stats.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package agent
22

33
import (
44
"context"
5+
"maps"
56
"sync"
67
"time"
78

@@ -32,7 +33,7 @@ type statsDest interface {
3233
// statsDest (agent API in prod)
3334
type statsReporter struct {
3435
*sync.Cond
35-
networkStats *map[netlogtype.Connection]netlogtype.Counts
36+
networkStats map[netlogtype.Connection]netlogtype.Counts
3637
unreported bool
3738
lastInterval time.Duration
3839

@@ -54,8 +55,15 @@ func (s *statsReporter) callback(_, _ time.Time, virtual, _ map[netlogtype.Conne
5455
s.L.Lock()
5556
defer s.L.Unlock()
5657
s.logger.Debug(context.Background(), "got stats callback")
57-
s.networkStats = &virtual
58-
s.unreported = true
58+
// Accumulate stats until they've been reported.
59+
if s.unreported && len(s.networkStats) > 0 {
60+
for k, v := range virtual {
61+
s.networkStats[k] = s.networkStats[k].Add(v)
62+
}
63+
} else {
64+
s.networkStats = maps.Clone(virtual)
65+
s.unreported = true
66+
}
5967
s.Broadcast()
6068
}
6169

@@ -96,9 +104,8 @@ func (s *statsReporter) reportLoop(ctx context.Context, dest statsDest) error {
96104
if ctxDone {
97105
return nil
98106
}
99-
networkStats := *s.networkStats
100107
s.unreported = false
101-
if err = s.reportLocked(ctx, dest, networkStats); err != nil {
108+
if err = s.reportLocked(ctx, dest, s.networkStats); err != nil {
102109
return xerrors.Errorf("report stats: %w", err)
103110
}
104111
}

agent/stats_internal_test.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func TestStatsReporter(t *testing.T) {
6464
require.Equal(t, netStats, gotNetStats)
6565

6666
// while we are collecting the stats, send in two new netStats to simulate
67-
// what happens if we don't keep up. Only the latest should be kept.
67+
// what happens if we don't keep up. The stats should be accumulated.
6868
netStats0 := map[netlogtype.Connection]netlogtype.Counts{
6969
{
7070
Proto: ipproto.TCP,
@@ -102,9 +102,21 @@ func TestStatsReporter(t *testing.T) {
102102
require.Equal(t, stats, update.Stats)
103103
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.UpdateStatsResponse{ReportInterval: durationpb.New(interval)})
104104

105-
// second update -- only netStats1 is reported
105+
// second update -- netStat0 and netStats1 are accumulated and reported
106+
wantNetStats := map[netlogtype.Connection]netlogtype.Counts{
107+
{
108+
Proto: ipproto.TCP,
109+
Src: netip.MustParseAddrPort("192.168.1.33:4887"),
110+
Dst: netip.MustParseAddrPort("192.168.2.99:9999"),
111+
}: {
112+
TxPackets: 21,
113+
TxBytes: 21,
114+
RxPackets: 21,
115+
RxBytes: 21,
116+
},
117+
}
106118
gotNetStats = testutil.RequireRecvCtx(ctx, t, fCollector.calls)
107-
require.Equal(t, netStats1, gotNetStats)
119+
require.Equal(t, wantNetStats, gotNetStats)
108120
stats = &proto.Stats{SessionCountJetbrains: 66}
109121
testutil.RequireSendCtx(ctx, t, fCollector.stats, stats)
110122
update = testutil.RequireRecvCtx(ctx, t, fDest.reqs)

coderd/insights.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (api *API) returnDAUsInternal(rw http.ResponseWriter, r *http.Request, temp
8989
}
9090
for _, row := range rows {
9191
resp.Entries = append(resp.Entries, codersdk.DAUEntry{
92-
Date: row.StartTime.Format(time.DateOnly),
92+
Date: row.StartTime.In(loc).Format(time.DateOnly),
9393
Amount: int(row.ActiveUsers),
9494
})
9595
}

coderd/insights_test.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,27 @@ func TestDeploymentInsights(t *testing.T) {
4848
db, ps := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure())
4949
logger := testutil.Logger(t)
5050
rollupEvents := make(chan dbrollup.Event)
51+
statsInterval := 500 * time.Millisecond
52+
// Speed up the test by controlling batch size and interval.
53+
batcher, closeBatcher, err := workspacestats.NewBatcher(context.Background(),
54+
workspacestats.BatcherWithLogger(logger.Named("batcher").Leveled(slog.LevelDebug)),
55+
workspacestats.BatcherWithStore(db),
56+
workspacestats.BatcherWithBatchSize(1),
57+
workspacestats.BatcherWithInterval(statsInterval),
58+
)
59+
require.NoError(t, err)
60+
defer closeBatcher()
5161
client := coderdtest.New(t, &coderdtest.Options{
5262
Database: db,
5363
Pubsub: ps,
5464
Logger: &logger,
5565
IncludeProvisionerDaemon: true,
56-
AgentStatsRefreshInterval: time.Millisecond * 100,
66+
AgentStatsRefreshInterval: statsInterval,
67+
StatsBatcher: batcher,
5768
DatabaseRolluper: dbrollup.New(
5869
logger.Named("dbrollup").Leveled(slog.LevelDebug),
5970
db,
60-
dbrollup.WithInterval(time.Millisecond*100),
71+
dbrollup.WithInterval(statsInterval/2),
6172
dbrollup.WithEventChannel(rollupEvents),
6273
),
6374
})
@@ -76,7 +87,7 @@ func TestDeploymentInsights(t *testing.T) {
7687
workspace := coderdtest.CreateWorkspace(t, client, template.ID)
7788
coderdtest.AwaitWorkspaceBuildJobCompleted(t, client, workspace.LatestBuild.ID)
7889

79-
ctx := testutil.Context(t, testutil.WaitLong)
90+
ctx := testutil.Context(t, testutil.WaitSuperLong)
8091

8192
// Pre-check, no permission issues.
8293
daus, err := client.DeploymentDAUs(ctx, codersdk.TimezoneOffsetHour(clientTz))
@@ -108,6 +119,13 @@ func TestDeploymentInsights(t *testing.T) {
108119
err = sess.Start("cat")
109120
require.NoError(t, err)
110121

122+
select {
123+
case <-ctx.Done():
124+
require.Fail(t, "timed out waiting for initial rollup event", ctx.Err())
125+
case ev := <-rollupEvents:
126+
require.True(t, ev.Init, "want init event")
127+
}
128+
111129
for {
112130
select {
113131
case <-ctx.Done():
@@ -120,6 +138,7 @@ func TestDeploymentInsights(t *testing.T) {
120138
if len(daus.Entries) > 0 && daus.Entries[len(daus.Entries)-1].Amount > 0 {
121139
break
122140
}
141+
t.Logf("waiting for deployment daus to update: %+v", daus)
123142
}
124143
}
125144

0 commit comments

Comments
 (0)