Skip to content

feat(coderd/database): add dbrollup service to rollup insights #12665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/coder/coder/v2/coderd/batchstats"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbrollup"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/externalauth"
Expand Down Expand Up @@ -192,6 +193,9 @@ type Options struct {
// NewTicker is used for unit tests to replace "time.NewTicker".
NewTicker func(duration time.Duration) (tick <-chan time.Time, done func())

// DatabaseRolluper rolls up template usage stats from raw agent and app
// stats. This is used to provide insights in the WebUI.
DatabaseRolluper *dbrollup.Rolluper
// WorkspaceUsageTracker tracks workspace usage by the CLI.
WorkspaceUsageTracker *workspaceusage.Tracker
}
Expand Down Expand Up @@ -366,6 +370,10 @@ func New(options *Options) *API {
OIDC: options.OIDCConfig,
}

if options.DatabaseRolluper == nil {
options.DatabaseRolluper = dbrollup.New(options.Logger.Named("dbrollup"), options.Database)
}

if options.WorkspaceUsageTracker == nil {
options.WorkspaceUsageTracker = workspaceusage.New(options.Database,
workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")),
Expand Down Expand Up @@ -414,7 +422,9 @@ func New(options *Options) *API {
ctx,
options.Logger.Named("acquirer"),
options.Database,
options.Pubsub),
options.Pubsub,
),
dbRolluper: options.DatabaseRolluper,
workspaceUsageTracker: options.WorkspaceUsageTracker,
}

Expand Down Expand Up @@ -1197,7 +1207,9 @@ type API struct {
statsBatcher *batchstats.Batcher

Acquirer *provisionerdserver.Acquirer

// dbRolluper rolls up template usage stats from raw agent and app
// stats. This is used to provide insights in the WebUI.
dbRolluper *dbrollup.Rolluper
workspaceUsageTracker *workspaceusage.Tracker
}

Expand All @@ -1212,6 +1224,7 @@ func (api *API) Close() error {
api.WebsocketWaitGroup.Wait()
api.WebsocketWaitMutex.Unlock()

api.dbRolluper.Close()
api.metricsCache.Close()
if api.updateChecker != nil {
api.updateChecker.Close()
Expand Down
3 changes: 3 additions & 0 deletions coderd/coderdtest/coderdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/coder/coder/v2/coderd/batchstats"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbrollup"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/externalauth"
Expand Down Expand Up @@ -147,6 +148,7 @@ type Options struct {
WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions
AllowWorkspaceRenames bool
NewTicker func(duration time.Duration) (<-chan time.Time, func())
DatabaseRolluper *dbrollup.Rolluper
WorkspaceUsageTrackerFlush chan int
WorkspaceUsageTrackerTick chan time.Time
}
Expand Down Expand Up @@ -491,6 +493,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
WorkspaceAppsStatsCollectorOptions: options.WorkspaceAppsStatsCollectorOptions,
AllowWorkspaceRenames: options.AllowWorkspaceRenames,
NewTicker: options.NewTicker,
DatabaseRolluper: options.DatabaseRolluper,
WorkspaceUsageTracker: wuTracker,
}
}
Expand Down
32 changes: 32 additions & 0 deletions coderd/database/dbgen/dbgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,38 @@ func WorkspaceApp(t testing.TB, db database.Store, orig database.WorkspaceApp) d
return resource
}

func WorkspaceAppStat(t testing.TB, db database.Store, orig database.WorkspaceAppStat) database.WorkspaceAppStat {
// This is not going to be correct, but our query doesn't return the ID.
id, err := cryptorand.Int63()
require.NoError(t, err, "generate id")

scheme := database.WorkspaceAppStat{
ID: takeFirst(orig.ID, id),
UserID: takeFirst(orig.UserID, uuid.New()),
WorkspaceID: takeFirst(orig.WorkspaceID, uuid.New()),
AgentID: takeFirst(orig.AgentID, uuid.New()),
AccessMethod: takeFirst(orig.AccessMethod, ""),
SlugOrPort: takeFirst(orig.SlugOrPort, ""),
SessionID: takeFirst(orig.SessionID, uuid.New()),
SessionStartedAt: takeFirst(orig.SessionStartedAt, dbtime.Now().Add(-time.Minute)),
SessionEndedAt: takeFirst(orig.SessionEndedAt, dbtime.Now()),
Requests: takeFirst(orig.Requests, 1),
}
err = db.InsertWorkspaceAppStats(genCtx, database.InsertWorkspaceAppStatsParams{
UserID: []uuid.UUID{scheme.UserID},
WorkspaceID: []uuid.UUID{scheme.WorkspaceID},
AgentID: []uuid.UUID{scheme.AgentID},
AccessMethod: []string{scheme.AccessMethod},
SlugOrPort: []string{scheme.SlugOrPort},
SessionID: []uuid.UUID{scheme.SessionID},
SessionStartedAt: []time.Time{scheme.SessionStartedAt},
SessionEndedAt: []time.Time{scheme.SessionEndedAt},
Requests: []int32{scheme.Requests},
})
require.NoError(t, err, "insert workspace agent stat")
return scheme
}

func WorkspaceResource(t testing.TB, db database.Store, orig database.WorkspaceResource) database.WorkspaceResource {
resource, err := db.InsertWorkspaceResource(genCtx, database.InsertWorkspaceResourceParams{
ID: takeFirst(orig.ID, uuid.New()),
Expand Down
1 change: 1 addition & 0 deletions coderd/database/dbpurge/dbpurge.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
// This is for cleaning up old, unused resources from the database that take up space.
func New(ctx context.Context, logger slog.Logger, db database.Store) io.Closer {
closed := make(chan struct{})
logger = logger.Named("dbpurge")

ctx, cancelFunc := context.WithCancel(ctx)
//nolint:gocritic // The system purges old db records without user input.
Expand Down
173 changes: 173 additions & 0 deletions coderd/database/dbrollup/dbrollup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package dbrollup
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a potential abstraction here -- the PR I'm working on in parallel #12659 has a similar logic of "run a query every interval". There are definitely other areas of the codebase that could benefit from a similar framework.


import (
"context"
"flag"
"time"

"golang.org/x/sync/errgroup"

"cdr.dev/slog"

"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
)

const (
// DefaultInterval is the default time between rollups.
// Rollups will be synchronized with the clock so that
// they happen 13:00, 13:05, 13:10, etc.
DefaultInterval = 5 * time.Minute
)

type Event struct {
TemplateUsageStats bool
}

type Rolluper struct {
cancel context.CancelFunc
closed chan struct{}
db database.Store
logger slog.Logger
interval time.Duration
event chan<- Event
}

type Option func(*Rolluper)

// WithInterval sets the interval between rollups.
func WithInterval(interval time.Duration) Option {
return func(r *Rolluper) {
r.interval = interval
}
}

// WithEventChannel sets the event channel to use for rollup events.
//
// This is only used for testing.
func WithEventChannel(ch chan<- Event) Option {
if flag.Lookup("test.v") == nil {
panic("developer error: WithEventChannel is not to be used outside of tests")
}
return func(r *Rolluper) {
r.event = ch
}
}

// New creates a new DB rollup service that periodically runs rollup queries.
// It is the caller's responsibility to call Close on the returned instance.
//
// This is for e.g. generating insights data (template_usage_stats) from
// raw data (workspace_agent_stats, workspace_app_stats).
func New(logger slog.Logger, db database.Store, opts ...Option) *Rolluper {
ctx, cancel := context.WithCancel(context.Background())

r := &Rolluper{
cancel: cancel,
closed: make(chan struct{}),
db: db,
logger: logger,
interval: DefaultInterval,
}

for _, opt := range opts {
opt(r)
}

//nolint:gocritic // The system rolls up database tables without user input.
ctx = dbauthz.AsSystemRestricted(ctx)
go r.start(ctx)

return r
}

func (r *Rolluper) start(ctx context.Context) {
defer close(r.closed)

do := func() {
var eg errgroup.Group

r.logger.Debug(ctx, "rolling up data")
now := time.Now()

// Track whether or not we performed a rollup (we got the advisory lock).
var ev Event

eg.Go(func() error {
return r.db.InTx(func(tx database.Store) error {
// Acquire a lock to ensure that only one instance of
// the rollup is running at a time.
ok, err := tx.TryAcquireLock(ctx, database.LockIDDBRollup)
if err != nil {
return err
}
if !ok {
return nil
}

ev.TemplateUsageStats = true
return tx.UpsertTemplateUsageStats(ctx)
}, nil)
})

err := eg.Wait()
if err != nil {
if database.IsQueryCanceledError(err) {
return
}
// Only log if Close hasn't been called.
if ctx.Err() == nil {
r.logger.Error(ctx, "failed to rollup data", slog.Error(err))
}
return
}

r.logger.Debug(ctx,
"rolled up data",
slog.F("took", time.Since(now)),
slog.F("event", ev),
)

// For testing.
if r.event != nil {
select {
case <-ctx.Done():
return
case r.event <- ev:
}
}
}

// Perform do immediately and on every tick of the ticker,
// disregarding the execution time of do. This ensure that
// the rollup is performed every interval assuming do does
// not take longer than the interval to execute.
t := time.NewTicker(time.Microsecond)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
// Ensure we're on the interval.
now := time.Now()
next := now.Add(r.interval).Truncate(r.interval) // Ensure we're on the interval and synced with the clock.
d := next.Sub(now)
// Safety check (shouldn't be possible).
if d <= 0 {
d = r.interval
}
t.Reset(d)

do()

r.logger.Debug(ctx, "next rollup at", slog.F("next", next))
}
}
}

func (r *Rolluper) Close() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the error return value here to satisfy an interface? Do we need it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I feel the io.Closer interface is so common it's worth using this signature even if the error is not used at the moment or ever in the future.

r.cancel()
<-r.closed
return nil
}
Loading