diff --git a/coderd/agentapi/stats.go b/coderd/agentapi/stats.go index e91a3624e915d..37d1a14b76db7 100644 --- a/coderd/agentapi/stats.go +++ b/coderd/agentapi/stats.go @@ -18,7 +18,7 @@ import ( "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/prometheusmetrics" "github.com/coder/coder/v2/coderd/schedule" - "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/coderd/workspaceapps" ) type StatsBatcher interface { @@ -34,6 +34,7 @@ type StatsAPI struct { TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore] AgentStatsRefreshInterval time.Duration UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) + StatsCollector workspaceapps.StatsCollector TimeNowFn func() time.Time // defaults to dbtime.Now() } @@ -70,28 +71,6 @@ func (a *StatsAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsR ) now := a.now() - if req.Stats.ConnectionCount > 0 { - var nextAutostart time.Time - if workspace.AutostartSchedule.String != "" { - templateSchedule, err := (*(a.TemplateScheduleStore.Load())).Get(ctx, a.Database, workspace.TemplateID) - // If the template schedule fails to load, just default to bumping - // without the next transition and log it. - if err != nil { - a.Log.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", - slog.F("workspace_id", workspace.ID), - slog.F("template_id", workspace.TemplateID), - slog.Error(err), - ) - } else { - next, allowed := schedule.NextAutostart(now, workspace.AutostartSchedule.String, templateSchedule) - if allowed { - nextAutostart = next - } - } - } - ActivityBumpWorkspace(ctx, a.Log.Named("activity_bump"), a.Database, workspace.ID, nextAutostart) - } - var errGroup errgroup.Group errGroup.Go(func() error { err := a.StatsBatcher.Add(now, workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req.Stats) @@ -101,17 +80,6 @@ func (a *StatsAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsR } return nil }) - errGroup.Go(func() error { - // nolint:gocritic // (#13146) Will be moved soon as part of refactor. - err := a.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{ - ID: workspace.ID, - LastUsedAt: now, - }) - if err != nil { - return xerrors.Errorf("update workspace LastUsedAt: %w", err) - } - return nil - }) if a.UpdateAgentMetricsFn != nil { errGroup.Go(func() error { user, err := a.Database.GetUserByID(ctx, workspace.OwnerID) @@ -133,16 +101,12 @@ func (a *StatsAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsR return nil, xerrors.Errorf("update stats in database: %w", err) } - // Tell the frontend about the new agent report, now that everything is updated - a.publishWorkspaceAgentStats(ctx, workspace.ID) + // Flushing the stats collector will update last_used_at, + // dealine for the workspace, and will publish a workspace update event. + a.StatsCollector.CollectAndFlush(ctx, workspaceapps.StatsReport{ + WorkspaceID: workspace.ID, + // TODO: fill out + }) return res, nil } - -func (a *StatsAPI) publishWorkspaceAgentStats(ctx context.Context, workspaceID uuid.UUID) { - err := a.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspaceID), []byte{}) - if err != nil { - a.Log.Warn(ctx, "failed to publish workspace agent stats", - slog.F("workspace_id", workspaceID), slog.Error(err)) - } -} diff --git a/coderd/batchstats/batcher.go b/coderd/batchstats/batcher.go index bbff38b0413c0..1c19d62c7553d 100644 --- a/coderd/batchstats/batcher.go +++ b/coderd/batchstats/batcher.go @@ -25,7 +25,7 @@ const ( ) // Batcher holds a buffer of agent stats and periodically flushes them to -// its configured store. It also updates the workspace's last used time. +// its configured store. type Batcher struct { store database.Store log slog.Logger diff --git a/coderd/coderd.go b/coderd/coderd.go index 80f77d92ee672..f712ee16f6a77 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -1275,7 +1275,8 @@ type API struct { healthCheckGroup *singleflight.Group[string, *healthsdk.HealthcheckReport] healthCheckCache atomic.Pointer[healthsdk.HealthcheckReport] - statsBatcher *batchstats.Batcher + statsBatcher *batchstats.Batcher + statsCollector workspaceapps.StatsCollector Acquirer *provisionerdserver.Acquirer // dbRolluper rolls up template usage stats from raw agent and app diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 9faae72f22ef7..820733600b2fb 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -36,7 +36,7 @@ import ( "github.com/coder/coder/v2/coderd/httpmw" "github.com/coder/coder/v2/coderd/prometheusmetrics" "github.com/coder/coder/v2/coderd/rbac/policy" - "github.com/coder/coder/v2/coderd/schedule" + "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/workspacesdk" @@ -1167,35 +1167,6 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques slog.F("payload", req), ) - if req.ConnectionCount > 0 { - var nextAutostart time.Time - if workspace.AutostartSchedule.String != "" { - templateSchedule, err := (*(api.TemplateScheduleStore.Load())).Get(ctx, api.Database, workspace.TemplateID) - // If the template schedule fails to load, just default to bumping without the next transition and log it. - if err != nil { - // There's nothing we can do if the query was canceled, the - // client most likely went away so we just return an internal - // server error. - if database.IsQueryCanceledError(err) { - httpapi.InternalServerError(rw, err) - return - } - api.Logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", - slog.F("workspace_id", workspace.ID), - slog.F("template_id", workspace.TemplateID), - slog.Error(err), - ) - } else { - next, allowed := schedule.NextAutostart(time.Now(), workspace.AutostartSchedule.String, templateSchedule) - if allowed { - nextAutostart = next - } - } - } - agentapi.ActivityBumpWorkspace(ctx, api.Logger.Named("activity_bump"), api.Database, workspace.ID, nextAutostart) - } - - now := dbtime.Now() protoStats := &agentproto.Stats{ ConnectionsByProto: req.ConnectionsByProto, ConnectionCount: req.ConnectionCount, @@ -1242,19 +1213,6 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques } return nil }) - if req.SessionCount() > 0 { - errGroup.Go(func() error { - // nolint:gocritic // (#13146) Will be moved soon as part of refactor. - err := api.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{ - ID: workspace.ID, - LastUsedAt: now, - }) - if err != nil { - return xerrors.Errorf("can't update workspace LastUsedAt: %w", err) - } - return nil - }) - } if api.Options.UpdateAgentMetrics != nil { errGroup.Go(func() error { user, err := api.Database.GetUserByID(ctx, workspace.OwnerID) @@ -1277,6 +1235,11 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques return } + api.statsCollector.CollectAndFlush(ctx, workspaceapps.StatsReport{ + WorkspaceID: workspace.ID, + // TODO: fill out + }) + httpapi.Write(ctx, rw, http.StatusOK, agentsdk.StatsResponse{ ReportInterval: api.AgentStatsRefreshInterval, }) diff --git a/coderd/agentapi/activitybump.go b/coderd/workspaceapps/activity_bump.go similarity index 96% rename from coderd/agentapi/activitybump.go rename to coderd/workspaceapps/activity_bump.go index a28ba695d018e..af6cc081c473a 100644 --- a/coderd/agentapi/activitybump.go +++ b/coderd/workspaceapps/activity_bump.go @@ -1,4 +1,4 @@ -package agentapi +package workspaceapps import ( "context" @@ -41,7 +41,6 @@ func ActivityBumpWorkspace(ctx context.Context, log slog.Logger, db database.Sto // low priority operations fail first. ctx, cancel := context.WithTimeout(ctx, time.Second*15) defer cancel() - // nolint:gocritic // (#13146) Will be moved soon as part of refactor. err := db.ActivityBumpWorkspace(ctx, database.ActivityBumpWorkspaceParams{ NextAutostart: nextAutostart.UTC(), WorkspaceID: workspaceID, diff --git a/coderd/agentapi/activitybump_test.go b/coderd/workspaceapps/activity_bump_test.go similarity index 98% rename from coderd/agentapi/activitybump_test.go rename to coderd/workspaceapps/activity_bump_test.go index 5c82454c97cef..db51730d00519 100644 --- a/coderd/agentapi/activitybump_test.go +++ b/coderd/workspaceapps/activity_bump_test.go @@ -1,4 +1,4 @@ -package agentapi_test +package workspaceapps_test import ( "database/sql" @@ -8,12 +8,12 @@ import ( "github.com/google/uuid" "cdr.dev/slog/sloggers/slogtest" - "github.com/coder/coder/v2/coderd/agentapi" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbgen" "github.com/coder/coder/v2/coderd/database/dbtestutil" "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/util/ptr" + "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/testutil" "github.com/stretchr/testify/assert" @@ -272,7 +272,7 @@ func Test_ActivityBumpWorkspace(t *testing.T) { // Bump duration is measured from the time of the bump, so we measure from here. start := dbtime.Now() - agentapi.ActivityBumpWorkspace(ctx, log, db, bld.WorkspaceID, nextAutostart(start)) + workspaceapps.ActivityBumpWorkspace(ctx, log, db, bld.WorkspaceID, nextAutostart(start)) end := dbtime.Now() // Validate our state after bump diff --git a/coderd/workspaceapps/stats.go b/coderd/workspaceapps/stats.go index 76a60c6fbb5df..6ac76fc5254ab 100644 --- a/coderd/workspaceapps/stats.go +++ b/coderd/workspaceapps/stats.go @@ -3,6 +3,7 @@ package workspaceapps import ( "context" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -10,10 +11,14 @@ import ( "cdr.dev/slog" + agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/coderd/database/pubsub" + "github.com/coder/coder/v2/coderd/schedule" "github.com/coder/coder/v2/coderd/util/slice" + "github.com/coder/coder/v2/codersdk" ) const ( @@ -59,8 +64,11 @@ var _ StatsReporter = (*StatsDBReporter)(nil) // StatsDBReporter writes workspace app StatsReports to the database. type StatsDBReporter struct { - db database.Store - batchSize int + db database.Store + pubsub pubsub.Pubsub + logger slog.Logger + templateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore] + batchSize int } // NewStatsDBReporter returns a new StatsDBReporter. @@ -139,6 +147,42 @@ func (r *StatsDBReporter) Report(ctx context.Context, stats []StatsReport) error return err } + workspaces, err := tx.GetWorkspaces(ctx, database.GetWorkspacesParams{ + WorkspaceIds: uniqueIDs, + }) + if err != nil { + return xerrors.Errorf("getting workspaces: %w", err) + } + + // TODO: This probably needs batching to handle larger deployments + for _, workspace := range workspaces { + var nextAutostart time.Time + if workspace.AutostartSchedule.String != "" { + templateSchedule, err := (*(r.templateScheduleStore.Load())).Get(ctx, r.db, workspace.TemplateID) + // If the template schedule fails to load, just default to bumping + // without the next transition and log it. + if err != nil { + r.logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", + slog.F("workspace_id", workspace.ID), + slog.F("template_id", workspace.TemplateID), + slog.Error(err), + ) + } else { + next, allowed := schedule.NextAutostart(dbtime.Now(), workspace.AutostartSchedule.String, templateSchedule) + if allowed { + nextAutostart = next + } + } + } + ActivityBumpWorkspace(ctx, r.logger.Named("activity_bump"), r.db, workspace.ID, nextAutostart) + + err := r.pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspace.ID), []byte{}) + if err != nil { + r.logger.Warn(ctx, "failed to publish workspace agent stats", + slog.F("workspace_id", workspace.ID), slog.Error(err)) + } + } + return nil }, nil) if err != nil { @@ -193,7 +237,10 @@ type StatsCollectorOptions struct { // RollupWindow is the window size for rolling up stats, session shorter // than this will be rolled up and longer than this will be tracked // individually. - RollupWindow time.Duration + RollupWindow time.Duration + DB database.Store + Pubsub pubsub.Pubsub + TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore] // Options for tests. Flush <-chan chan<- struct{} @@ -252,6 +299,36 @@ func (sc *StatsCollector) Collect(report StatsReport) { sc.opts.Logger.Debug(sc.ctx, "collected workspace app stats", slog.F("report", report)) } +func (sc *StatsCollector) CollectAgentStat(ctx context.Context, now time.Time, agentID uuid.UUID, workspace database.Workspace, st *agentproto.Stats) error { + var nextAutostart time.Time + if workspace.AutostartSchedule.String != "" { + templateSchedule, err := (*(sc.opts.TemplateScheduleStore.Load())).Get(ctx, sc.opts.DB, workspace.TemplateID) + // If the template schedule fails to load, just default to bumping + // without the next transition and log it. + if err != nil { + sc.opts.Logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", + slog.F("workspace_id", workspace.ID), + slog.F("template_id", workspace.TemplateID), + slog.Error(err), + ) + } else { + next, allowed := schedule.NextAutostart(dbtime.Now(), workspace.AutostartSchedule.String, templateSchedule) + if allowed { + nextAutostart = next + } + } + } + ActivityBumpWorkspace(ctx, sc.opts.Logger.Named("activity_bump"), sc.opts.DB, workspace.ID, nextAutostart) + + err := sc.opts.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspace.ID), []byte{}) + if err != nil { + sc.opts.Logger.Warn(ctx, "failed to publish workspace agent stats", + slog.F("workspace_id", workspace.ID), slog.Error(err)) + } + + return nil +} + // rollup performs stats rollup for sessions that fall within the // configured rollup window. For sessions longer than the window, // we report them individually.