Skip to content

Commit 12e6fbf

Browse files
authored
feat(coderd/database): add dbrollup service to rollup insights (#12665)
Add `dbrollup` service that runs the `UpsertTemplateUsageStats` query every 5 minutes, on the minute. This allows us to have fairly real-time insights data when viewing "today".
1 parent 04f0510 commit 12e6fbf

File tree

7 files changed

+479
-5
lines changed

7 files changed

+479
-5
lines changed

coderd/coderd.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"github.com/coder/coder/v2/coderd/batchstats"
4848
"github.com/coder/coder/v2/coderd/database"
4949
"github.com/coder/coder/v2/coderd/database/dbauthz"
50+
"github.com/coder/coder/v2/coderd/database/dbrollup"
5051
"github.com/coder/coder/v2/coderd/database/dbtime"
5152
"github.com/coder/coder/v2/coderd/database/pubsub"
5253
"github.com/coder/coder/v2/coderd/externalauth"
@@ -192,6 +193,9 @@ type Options struct {
192193
// NewTicker is used for unit tests to replace "time.NewTicker".
193194
NewTicker func(duration time.Duration) (tick <-chan time.Time, done func())
194195

196+
// DatabaseRolluper rolls up template usage stats from raw agent and app
197+
// stats. This is used to provide insights in the WebUI.
198+
DatabaseRolluper *dbrollup.Rolluper
195199
// WorkspaceUsageTracker tracks workspace usage by the CLI.
196200
WorkspaceUsageTracker *workspaceusage.Tracker
197201
}
@@ -366,6 +370,10 @@ func New(options *Options) *API {
366370
OIDC: options.OIDCConfig,
367371
}
368372

373+
if options.DatabaseRolluper == nil {
374+
options.DatabaseRolluper = dbrollup.New(options.Logger.Named("dbrollup"), options.Database)
375+
}
376+
369377
if options.WorkspaceUsageTracker == nil {
370378
options.WorkspaceUsageTracker = workspaceusage.New(options.Database,
371379
workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")),
@@ -414,7 +422,9 @@ func New(options *Options) *API {
414422
ctx,
415423
options.Logger.Named("acquirer"),
416424
options.Database,
417-
options.Pubsub),
425+
options.Pubsub,
426+
),
427+
dbRolluper: options.DatabaseRolluper,
418428
workspaceUsageTracker: options.WorkspaceUsageTracker,
419429
}
420430

@@ -1197,7 +1207,9 @@ type API struct {
11971207
statsBatcher *batchstats.Batcher
11981208

11991209
Acquirer *provisionerdserver.Acquirer
1200-
1210+
// dbRolluper rolls up template usage stats from raw agent and app
1211+
// stats. This is used to provide insights in the WebUI.
1212+
dbRolluper *dbrollup.Rolluper
12011213
workspaceUsageTracker *workspaceusage.Tracker
12021214
}
12031215

@@ -1212,6 +1224,7 @@ func (api *API) Close() error {
12121224
api.WebsocketWaitGroup.Wait()
12131225
api.WebsocketWaitMutex.Unlock()
12141226

1227+
api.dbRolluper.Close()
12151228
api.metricsCache.Close()
12161229
if api.updateChecker != nil {
12171230
api.updateChecker.Close()

coderd/coderdtest/coderdtest.go

+3
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import (
5757
"github.com/coder/coder/v2/coderd/batchstats"
5858
"github.com/coder/coder/v2/coderd/database"
5959
"github.com/coder/coder/v2/coderd/database/dbauthz"
60+
"github.com/coder/coder/v2/coderd/database/dbrollup"
6061
"github.com/coder/coder/v2/coderd/database/dbtestutil"
6162
"github.com/coder/coder/v2/coderd/database/pubsub"
6263
"github.com/coder/coder/v2/coderd/externalauth"
@@ -147,6 +148,7 @@ type Options struct {
147148
WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions
148149
AllowWorkspaceRenames bool
149150
NewTicker func(duration time.Duration) (<-chan time.Time, func())
151+
DatabaseRolluper *dbrollup.Rolluper
150152
WorkspaceUsageTrackerFlush chan int
151153
WorkspaceUsageTrackerTick chan time.Time
152154
}
@@ -491,6 +493,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
491493
WorkspaceAppsStatsCollectorOptions: options.WorkspaceAppsStatsCollectorOptions,
492494
AllowWorkspaceRenames: options.AllowWorkspaceRenames,
493495
NewTicker: options.NewTicker,
496+
DatabaseRolluper: options.DatabaseRolluper,
494497
WorkspaceUsageTracker: wuTracker,
495498
}
496499
}

coderd/database/dbgen/dbgen.go

+32
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,38 @@ func WorkspaceApp(t testing.TB, db database.Store, orig database.WorkspaceApp) d
489489
return resource
490490
}
491491

492+
func WorkspaceAppStat(t testing.TB, db database.Store, orig database.WorkspaceAppStat) database.WorkspaceAppStat {
493+
// This is not going to be correct, but our query doesn't return the ID.
494+
id, err := cryptorand.Int63()
495+
require.NoError(t, err, "generate id")
496+
497+
scheme := database.WorkspaceAppStat{
498+
ID: takeFirst(orig.ID, id),
499+
UserID: takeFirst(orig.UserID, uuid.New()),
500+
WorkspaceID: takeFirst(orig.WorkspaceID, uuid.New()),
501+
AgentID: takeFirst(orig.AgentID, uuid.New()),
502+
AccessMethod: takeFirst(orig.AccessMethod, ""),
503+
SlugOrPort: takeFirst(orig.SlugOrPort, ""),
504+
SessionID: takeFirst(orig.SessionID, uuid.New()),
505+
SessionStartedAt: takeFirst(orig.SessionStartedAt, dbtime.Now().Add(-time.Minute)),
506+
SessionEndedAt: takeFirst(orig.SessionEndedAt, dbtime.Now()),
507+
Requests: takeFirst(orig.Requests, 1),
508+
}
509+
err = db.InsertWorkspaceAppStats(genCtx, database.InsertWorkspaceAppStatsParams{
510+
UserID: []uuid.UUID{scheme.UserID},
511+
WorkspaceID: []uuid.UUID{scheme.WorkspaceID},
512+
AgentID: []uuid.UUID{scheme.AgentID},
513+
AccessMethod: []string{scheme.AccessMethod},
514+
SlugOrPort: []string{scheme.SlugOrPort},
515+
SessionID: []uuid.UUID{scheme.SessionID},
516+
SessionStartedAt: []time.Time{scheme.SessionStartedAt},
517+
SessionEndedAt: []time.Time{scheme.SessionEndedAt},
518+
Requests: []int32{scheme.Requests},
519+
})
520+
require.NoError(t, err, "insert workspace agent stat")
521+
return scheme
522+
}
523+
492524
func WorkspaceResource(t testing.TB, db database.Store, orig database.WorkspaceResource) database.WorkspaceResource {
493525
resource, err := db.InsertWorkspaceResource(genCtx, database.InsertWorkspaceResourceParams{
494526
ID: takeFirst(orig.ID, uuid.New()),

coderd/database/dbpurge/dbpurge.go

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const (
2424
// This is for cleaning up old, unused resources from the database that take up space.
2525
func New(ctx context.Context, logger slog.Logger, db database.Store) io.Closer {
2626
closed := make(chan struct{})
27+
logger = logger.Named("dbpurge")
2728

2829
ctx, cancelFunc := context.WithCancel(ctx)
2930
//nolint:gocritic // The system purges old db records without user input.

coderd/database/dbrollup/dbrollup.go

+173
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package dbrollup
2+
3+
import (
4+
"context"
5+
"flag"
6+
"time"
7+
8+
"golang.org/x/sync/errgroup"
9+
10+
"cdr.dev/slog"
11+
12+
"github.com/coder/coder/v2/coderd/database"
13+
"github.com/coder/coder/v2/coderd/database/dbauthz"
14+
)
15+
16+
const (
17+
// DefaultInterval is the default time between rollups.
18+
// Rollups will be synchronized with the clock so that
19+
// they happen 13:00, 13:05, 13:10, etc.
20+
DefaultInterval = 5 * time.Minute
21+
)
22+
23+
type Event struct {
24+
TemplateUsageStats bool
25+
}
26+
27+
type Rolluper struct {
28+
cancel context.CancelFunc
29+
closed chan struct{}
30+
db database.Store
31+
logger slog.Logger
32+
interval time.Duration
33+
event chan<- Event
34+
}
35+
36+
type Option func(*Rolluper)
37+
38+
// WithInterval sets the interval between rollups.
39+
func WithInterval(interval time.Duration) Option {
40+
return func(r *Rolluper) {
41+
r.interval = interval
42+
}
43+
}
44+
45+
// WithEventChannel sets the event channel to use for rollup events.
46+
//
47+
// This is only used for testing.
48+
func WithEventChannel(ch chan<- Event) Option {
49+
if flag.Lookup("test.v") == nil {
50+
panic("developer error: WithEventChannel is not to be used outside of tests")
51+
}
52+
return func(r *Rolluper) {
53+
r.event = ch
54+
}
55+
}
56+
57+
// New creates a new DB rollup service that periodically runs rollup queries.
58+
// It is the caller's responsibility to call Close on the returned instance.
59+
//
60+
// This is for e.g. generating insights data (template_usage_stats) from
61+
// raw data (workspace_agent_stats, workspace_app_stats).
62+
func New(logger slog.Logger, db database.Store, opts ...Option) *Rolluper {
63+
ctx, cancel := context.WithCancel(context.Background())
64+
65+
r := &Rolluper{
66+
cancel: cancel,
67+
closed: make(chan struct{}),
68+
db: db,
69+
logger: logger,
70+
interval: DefaultInterval,
71+
}
72+
73+
for _, opt := range opts {
74+
opt(r)
75+
}
76+
77+
//nolint:gocritic // The system rolls up database tables without user input.
78+
ctx = dbauthz.AsSystemRestricted(ctx)
79+
go r.start(ctx)
80+
81+
return r
82+
}
83+
84+
func (r *Rolluper) start(ctx context.Context) {
85+
defer close(r.closed)
86+
87+
do := func() {
88+
var eg errgroup.Group
89+
90+
r.logger.Debug(ctx, "rolling up data")
91+
now := time.Now()
92+
93+
// Track whether or not we performed a rollup (we got the advisory lock).
94+
var ev Event
95+
96+
eg.Go(func() error {
97+
return r.db.InTx(func(tx database.Store) error {
98+
// Acquire a lock to ensure that only one instance of
99+
// the rollup is running at a time.
100+
ok, err := tx.TryAcquireLock(ctx, database.LockIDDBRollup)
101+
if err != nil {
102+
return err
103+
}
104+
if !ok {
105+
return nil
106+
}
107+
108+
ev.TemplateUsageStats = true
109+
return tx.UpsertTemplateUsageStats(ctx)
110+
}, nil)
111+
})
112+
113+
err := eg.Wait()
114+
if err != nil {
115+
if database.IsQueryCanceledError(err) {
116+
return
117+
}
118+
// Only log if Close hasn't been called.
119+
if ctx.Err() == nil {
120+
r.logger.Error(ctx, "failed to rollup data", slog.Error(err))
121+
}
122+
return
123+
}
124+
125+
r.logger.Debug(ctx,
126+
"rolled up data",
127+
slog.F("took", time.Since(now)),
128+
slog.F("event", ev),
129+
)
130+
131+
// For testing.
132+
if r.event != nil {
133+
select {
134+
case <-ctx.Done():
135+
return
136+
case r.event <- ev:
137+
}
138+
}
139+
}
140+
141+
// Perform do immediately and on every tick of the ticker,
142+
// disregarding the execution time of do. This ensure that
143+
// the rollup is performed every interval assuming do does
144+
// not take longer than the interval to execute.
145+
t := time.NewTicker(time.Microsecond)
146+
defer t.Stop()
147+
for {
148+
select {
149+
case <-ctx.Done():
150+
return
151+
case <-t.C:
152+
// Ensure we're on the interval.
153+
now := time.Now()
154+
next := now.Add(r.interval).Truncate(r.interval) // Ensure we're on the interval and synced with the clock.
155+
d := next.Sub(now)
156+
// Safety check (shouldn't be possible).
157+
if d <= 0 {
158+
d = r.interval
159+
}
160+
t.Reset(d)
161+
162+
do()
163+
164+
r.logger.Debug(ctx, "next rollup at", slog.F("next", next))
165+
}
166+
}
167+
}
168+
169+
func (r *Rolluper) Close() error {
170+
r.cancel()
171+
<-r.closed
172+
return nil
173+
}

0 commit comments

Comments
 (0)