Skip to content

Commit 7dc4ff2

Browse files
committed
add backlog for re-reporting in case of failure
1 parent e8969d3 commit 7dc4ff2

File tree

4 files changed

+114
-7
lines changed

4 files changed

+114
-7
lines changed

coderd/database/queries.sql.go

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/workspaceappstats.sql

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,8 @@ DO
3030
WHERE
3131
workspace_app_stats.user_id = EXCLUDED.user_id
3232
AND workspace_app_stats.agent_id = EXCLUDED.agent_id
33-
AND workspace_app_stats.session_id = EXCLUDED.session_id;
33+
AND workspace_app_stats.session_id = EXCLUDED.session_id
34+
-- Since stats are updated in place as time progresses, we only
35+
-- want to update this row if it's fresh.
36+
AND workspace_app_stats.session_ended_at <= EXCLUDED.session_ended_at
37+
AND workspace_app_stats.requests <= EXCLUDED.requests;

coderd/workspaceapps/stats.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ type StatsCollector struct {
142142
mu sync.Mutex // Protects following.
143143
statsBySessionID map[uuid.UUID]*StatsReport // Track unique sessions.
144144
groupedStats map[statsGroupKey][]*StatsReport // Rolled up stats for sessions in close proximity.
145+
backlog []StatsReport // Stats that have not been reported yet (due to error).
145146
}
146147

147148
type StatsCollectorOptions struct {
@@ -242,7 +243,7 @@ func (sc *StatsCollector) rollup() []StatsReport {
242243
rolledUp: true,
243244
}
244245
}
245-
246+
rollupChanged := false
246247
newGroup := []*StatsReport{rolledUp} // Must be first in slice for future iterations (see group[0] above).
247248
for _, stat := range group {
248249
if !stat.SessionEndedAt.IsZero() && stat.SessionEndedAt.Sub(stat.SessionStartedAt) <= sc.opts.RollupWindow {
@@ -251,6 +252,7 @@ func (sc *StatsCollector) rollup() []StatsReport {
251252
rolledUp.SessionID = stat.SessionID // Borrow the first session ID, useful in tests.
252253
}
253254
rolledUp.Requests += stat.Requests
255+
rollupChanged = true
254256
continue
255257
}
256258
if stat.SessionEndedAt.IsZero() && sc.opts.Now().Sub(stat.SessionStartedAt) <= sc.opts.RollupWindow {
@@ -273,7 +275,7 @@ func (sc *StatsCollector) rollup() []StatsReport {
273275
newGroup = append(newGroup, stat) // Keep it for future updates.
274276
}
275277
}
276-
if rolledUp.Requests > 0 {
278+
if rollupChanged {
277279
report = append(report, *rolledUp)
278280
}
279281

@@ -300,12 +302,33 @@ func (sc *StatsCollector) flush(ctx context.Context) (err error) {
300302
}
301303
}()
302304

305+
// We keep the backlog as a simple slice so that we don't need to
306+
// attempt to merge it with the stats we're about to report. This
307+
// is because the rollup is a one-way operation and the backlog may
308+
// contain stats that are still in the statsBySessionID map and will
309+
// be reported again in the future. It is possible to merge the
310+
// backlog and the stats we're about to report, but it's not worth
311+
// the complexity.
312+
if len(sc.backlog) > 0 {
313+
err = sc.opts.Reporter.Report(ctx, sc.backlog)
314+
if err != nil {
315+
return xerrors.Errorf("report workspace app stats from backlog failed: %w", err)
316+
}
317+
sc.backlog = nil
318+
}
319+
303320
stats := sc.rollup()
304321
if len(stats) == 0 {
305322
return nil
306323
}
307324

308-
return sc.opts.Reporter.Report(ctx, stats)
325+
err = sc.opts.Reporter.Report(ctx, stats)
326+
if err != nil {
327+
sc.backlog = stats
328+
return xerrors.Errorf("report workspace app stats failed: %w", err)
329+
}
330+
331+
return nil
309332
}
310333

311334
func (sc *StatsCollector) Close() error {

coderd/workspaceapps/stats_test.go

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,18 @@ import (
1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
1313
"golang.org/x/exp/slices"
14+
"golang.org/x/xerrors"
1415

1516
"github.com/coder/coder/coderd/database"
1617
"github.com/coder/coder/coderd/workspaceapps"
1718
"github.com/coder/coder/testutil"
1819
)
1920

2021
type fakeReporter struct {
21-
mu sync.Mutex
22-
s []workspaceapps.StatsReport
22+
mu sync.Mutex
23+
s []workspaceapps.StatsReport
24+
err error
25+
errN int
2326
}
2427

2528
func (r *fakeReporter) stats() []workspaceapps.StatsReport {
@@ -28,8 +31,25 @@ func (r *fakeReporter) stats() []workspaceapps.StatsReport {
2831
return r.s
2932
}
3033

34+
func (r *fakeReporter) errors() int {
35+
r.mu.Lock()
36+
defer r.mu.Unlock()
37+
return r.errN
38+
}
39+
40+
func (r *fakeReporter) setError(err error) {
41+
r.mu.Lock()
42+
defer r.mu.Unlock()
43+
r.err = err
44+
}
45+
3146
func (r *fakeReporter) Report(_ context.Context, stats []workspaceapps.StatsReport) error {
3247
r.mu.Lock()
48+
if r.err != nil {
49+
r.errN++
50+
r.mu.Unlock()
51+
return r.err
52+
}
3353
r.s = append(r.s, stats...)
3454
r.mu.Unlock()
3555
return nil
@@ -296,7 +316,6 @@ func TestStatsCollector(t *testing.T) {
296316
var gotStats []workspaceapps.StatsReport
297317
require.Eventually(t, func() bool {
298318
gotStats = reporter.stats()
299-
t.Logf("len(reporter.stats()) = %d, len(tt.want) = %d", len(gotStats), len(tt.want))
300319
return len(gotStats) == len(tt.want)
301320
}, testutil.WaitMedium, testutil.IntervalFast)
302321

@@ -329,6 +348,63 @@ func TestStatsCollector(t *testing.T) {
329348
}
330349
}
331350

351+
func TestStatsCollector_backlog(t *testing.T) {
352+
t.Parallel()
353+
354+
rollupWindow := time.Minute
355+
flush := make(chan chan<- struct{}, 1)
356+
357+
start := database.Now().Truncate(time.Minute).UTC()
358+
var now atomic.Pointer[time.Time]
359+
now.Store(&start)
360+
361+
reporter := &fakeReporter{}
362+
collector := workspaceapps.NewStatsCollector(workspaceapps.StatsCollectorOptions{
363+
Reporter: reporter,
364+
ReportInterval: time.Hour,
365+
RollupWindow: rollupWindow,
366+
367+
Flush: flush,
368+
Now: func() time.Time { return *now.Load() },
369+
})
370+
371+
reporter.setError(xerrors.New("some error"))
372+
373+
// The first collected stat is "rolled up" and moved into the
374+
// backlog during the first flush. On the second flush nothing is
375+
// rolled up due to being unable to report the backlog.
376+
for i := 0; i < 2; i++ {
377+
collector.Collect(workspaceapps.StatsReport{
378+
SessionID: uuid.New(),
379+
SessionStartedAt: start,
380+
SessionEndedAt: start.Add(10 * time.Second),
381+
Requests: 1,
382+
})
383+
start = start.Add(time.Minute)
384+
now.Store(&start)
385+
386+
flushDone := make(chan struct{}, 1)
387+
flush <- flushDone
388+
<-flushDone
389+
}
390+
391+
// Flush was performed 2 times, 2 reports should have failed.
392+
wantErrors := 2
393+
assert.Equal(t, wantErrors, reporter.errors())
394+
assert.Empty(t, reporter.stats())
395+
396+
reporter.setError(nil)
397+
398+
// Flush again, this time the backlog should be reported in addition
399+
// to the second collected stat being rolled up and reported.
400+
flushDone := make(chan struct{}, 1)
401+
flush <- flushDone
402+
<-flushDone
403+
404+
assert.Equal(t, wantErrors, reporter.errors())
405+
assert.Len(t, reporter.stats(), 2)
406+
}
407+
332408
func TestStatsCollector_Close(t *testing.T) {
333409
t.Parallel()
334410

0 commit comments

Comments
 (0)