From 64a5aaddc37ef26b75b5ba89dc4c6d365382cd40 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Tue, 21 May 2024 18:36:26 +0000 Subject: [PATCH 1/6] add activity bump to StatsDBReporter.Report --- coderd/agentapi/stats.go | 42 +++++++++++++++--------------- coderd/workspaceapps/stats.go | 49 +++++++++++++++++++++++++++++++++-- 2 files changed, 68 insertions(+), 23 deletions(-) diff --git a/coderd/agentapi/stats.go b/coderd/agentapi/stats.go index e91a3624e915d..06dc5a11a5f3e 100644 --- a/coderd/agentapi/stats.go +++ b/coderd/agentapi/stats.go @@ -70,27 +70,27 @@ func (a *StatsAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsR ) now := a.now() - if req.Stats.ConnectionCount > 0 { - var nextAutostart time.Time - if workspace.AutostartSchedule.String != "" { - templateSchedule, err := (*(a.TemplateScheduleStore.Load())).Get(ctx, a.Database, workspace.TemplateID) - // If the template schedule fails to load, just default to bumping - // without the next transition and log it. - if err != nil { - a.Log.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", - slog.F("workspace_id", workspace.ID), - slog.F("template_id", workspace.TemplateID), - slog.Error(err), - ) - } else { - next, allowed := schedule.NextAutostart(now, workspace.AutostartSchedule.String, templateSchedule) - if allowed { - nextAutostart = next - } - } - } - ActivityBumpWorkspace(ctx, a.Log.Named("activity_bump"), a.Database, workspace.ID, nextAutostart) - } + // if req.Stats.ConnectionCount > 0 { + // var nextAutostart time.Time + // if workspace.AutostartSchedule.String != "" { + // templateSchedule, err := (*(a.TemplateScheduleStore.Load())).Get(ctx, a.Database, workspace.TemplateID) + // // If the template schedule fails to load, just default to bumping + // // without the next transition and log it. + // if err != nil { + // a.Log.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", + // slog.F("workspace_id", workspace.ID), + // slog.F("template_id", workspace.TemplateID), + // slog.Error(err), + // ) + // } else { + // next, allowed := schedule.NextAutostart(now, workspace.AutostartSchedule.String, templateSchedule) + // if allowed { + // nextAutostart = next + // } + // } + // } + // ActivityBumpWorkspace(ctx, a.Log.Named("activity_bump"), a.Database, workspace.ID, nextAutostart) + // } var errGroup errgroup.Group errGroup.Go(func() error { diff --git a/coderd/workspaceapps/stats.go b/coderd/workspaceapps/stats.go index 76a60c6fbb5df..b265824ca7c25 100644 --- a/coderd/workspaceapps/stats.go +++ b/coderd/workspaceapps/stats.go @@ -3,6 +3,7 @@ package workspaceapps import ( "context" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -10,9 +11,11 @@ import ( "cdr.dev/slog" + "github.com/coder/coder/v2/coderd/agentapi" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/coderd/schedule" "github.com/coder/coder/v2/coderd/util/slice" ) @@ -59,8 +62,10 @@ var _ StatsReporter = (*StatsDBReporter)(nil) // StatsDBReporter writes workspace app StatsReports to the database. type StatsDBReporter struct { - db database.Store - batchSize int + db database.Store + logger slog.Logger + templateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore] + batchSize int } // NewStatsDBReporter returns a new StatsDBReporter. @@ -139,6 +144,36 @@ func (r *StatsDBReporter) Report(ctx context.Context, stats []StatsReport) error return err } + workspaces, err := tx.GetWorkspaces(ctx, database.GetWorkspacesParams{ + WorkspaceIds: uniqueIDs, + }) + if err != nil { + return xerrors.Errorf("getting workspaces: %w", err) + } + + // TODO: This probably needs batching to handle larger deployments + for _, workspace := range workspaces { + var nextAutostart time.Time + if workspace.AutostartSchedule.String != "" { + templateSchedule, err := (*(r.templateScheduleStore.Load())).Get(ctx, r.db, workspace.TemplateID) + // If the template schedule fails to load, just default to bumping + // without the next transition and log it. + if err != nil { + r.logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", + slog.F("workspace_id", workspace.ID), + slog.F("template_id", workspace.TemplateID), + slog.Error(err), + ) + } else { + next, allowed := schedule.NextAutostart(dbtime.Now(), workspace.AutostartSchedule.String, templateSchedule) + if allowed { + nextAutostart = next + } + } + } + agentapi.ActivityBumpWorkspace(ctx, r.logger.Named("activity_bump"), r.db, workspace.ID, nextAutostart) + } + return nil }, nil) if err != nil { @@ -252,6 +287,16 @@ func (sc *StatsCollector) Collect(report StatsReport) { sc.opts.Logger.Debug(sc.ctx, "collected workspace app stats", slog.F("report", report)) } +func (sc *StatsCollector) CollectAndFlush(ctx context.Context, report StatsReport) error { + sc.Collect(report) + err := sc.flush(ctx) + if err != nil { + return xerrors.Errorf("flushing collector: %w", err) + } + + return nil +} + // rollup performs stats rollup for sessions that fall within the // configured rollup window. For sessions longer than the window, // we report them individually. From 636b5b8d69e815fde1dd2ece6b2f7efcb28d45bf Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Thu, 23 May 2024 15:48:59 +0000 Subject: [PATCH 2/6] move more --- coderd/agentapi/stats.go | 52 ++++------------------- coderd/workspaceapps/activity_bump.go | 61 +++++++++++++++++++++++++++ coderd/workspaceapps/stats.go | 12 +++++- 3 files changed, 79 insertions(+), 46 deletions(-) create mode 100644 coderd/workspaceapps/activity_bump.go diff --git a/coderd/agentapi/stats.go b/coderd/agentapi/stats.go index 06dc5a11a5f3e..37d1a14b76db7 100644 --- a/coderd/agentapi/stats.go +++ b/coderd/agentapi/stats.go @@ -18,7 +18,7 @@ import ( "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/prometheusmetrics" "github.com/coder/coder/v2/coderd/schedule" - "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/coderd/workspaceapps" ) type StatsBatcher interface { @@ -34,6 +34,7 @@ type StatsAPI struct { TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore] AgentStatsRefreshInterval time.Duration UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) + StatsCollector workspaceapps.StatsCollector TimeNowFn func() time.Time // defaults to dbtime.Now() } @@ -70,28 +71,6 @@ func (a *StatsAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsR ) now := a.now() - // if req.Stats.ConnectionCount > 0 { - // var nextAutostart time.Time - // if workspace.AutostartSchedule.String != "" { - // templateSchedule, err := (*(a.TemplateScheduleStore.Load())).Get(ctx, a.Database, workspace.TemplateID) - // // If the template schedule fails to load, just default to bumping - // // without the next transition and log it. - // if err != nil { - // a.Log.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", - // slog.F("workspace_id", workspace.ID), - // slog.F("template_id", workspace.TemplateID), - // slog.Error(err), - // ) - // } else { - // next, allowed := schedule.NextAutostart(now, workspace.AutostartSchedule.String, templateSchedule) - // if allowed { - // nextAutostart = next - // } - // } - // } - // ActivityBumpWorkspace(ctx, a.Log.Named("activity_bump"), a.Database, workspace.ID, nextAutostart) - // } - var errGroup errgroup.Group errGroup.Go(func() error { err := a.StatsBatcher.Add(now, workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req.Stats) @@ -101,17 +80,6 @@ func (a *StatsAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsR } return nil }) - errGroup.Go(func() error { - // nolint:gocritic // (#13146) Will be moved soon as part of refactor. - err := a.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{ - ID: workspace.ID, - LastUsedAt: now, - }) - if err != nil { - return xerrors.Errorf("update workspace LastUsedAt: %w", err) - } - return nil - }) if a.UpdateAgentMetricsFn != nil { errGroup.Go(func() error { user, err := a.Database.GetUserByID(ctx, workspace.OwnerID) @@ -133,16 +101,12 @@ func (a *StatsAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsR return nil, xerrors.Errorf("update stats in database: %w", err) } - // Tell the frontend about the new agent report, now that everything is updated - a.publishWorkspaceAgentStats(ctx, workspace.ID) + // Flushing the stats collector will update last_used_at, + // dealine for the workspace, and will publish a workspace update event. + a.StatsCollector.CollectAndFlush(ctx, workspaceapps.StatsReport{ + WorkspaceID: workspace.ID, + // TODO: fill out + }) return res, nil } - -func (a *StatsAPI) publishWorkspaceAgentStats(ctx context.Context, workspaceID uuid.UUID) { - err := a.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspaceID), []byte{}) - if err != nil { - a.Log.Warn(ctx, "failed to publish workspace agent stats", - slog.F("workspace_id", workspaceID), slog.Error(err)) - } -} diff --git a/coderd/workspaceapps/activity_bump.go b/coderd/workspaceapps/activity_bump.go new file mode 100644 index 0000000000000..af6cc081c473a --- /dev/null +++ b/coderd/workspaceapps/activity_bump.go @@ -0,0 +1,61 @@ +package workspaceapps + +import ( + "context" + "time" + + "github.com/google/uuid" + "golang.org/x/xerrors" + + "cdr.dev/slog" + "github.com/coder/coder/v2/coderd/database" +) + +// ActivityBumpWorkspace automatically bumps the workspace's auto-off timer +// if it is set to expire soon. The deadline will be bumped by 1 hour*. +// If the bump crosses over an autostart time, the workspace will be +// bumped by the workspace ttl instead. +// +// If nextAutostart is the zero value or in the past, the workspace +// will be bumped by 1 hour. +// It handles the edge case in the example: +// 1. Autostart is set to 9am. +// 2. User works all day, and leaves a terminal open to the workspace overnight. +// 3. The open terminal continually bumps the workspace deadline. +// 4. 9am the next day, the activity bump pushes to 10am. +// 5. If the user goes inactive for 1 hour during the day, the workspace will +// now stop, because it has been extended by 1 hour durations. Despite the TTL +// being set to 8hrs from the autostart time. +// +// So the issue is that when the workspace is bumped across an autostart +// deadline, we should treat the workspace as being "started" again and +// extend the deadline by the autostart time + workspace ttl instead. +// +// The issue still remains with build_max_deadline. We need to respect the original +// maximum deadline, so that will need to be handled separately. +// A way to avoid this is to configure the max deadline to something that will not +// span more than 1 day. This will force the workspace to restart and reset the deadline +// each morning when it autostarts. +func ActivityBumpWorkspace(ctx context.Context, log slog.Logger, db database.Store, workspaceID uuid.UUID, nextAutostart time.Time) { + // We set a short timeout so if the app is under load, these + // low priority operations fail first. + ctx, cancel := context.WithTimeout(ctx, time.Second*15) + defer cancel() + err := db.ActivityBumpWorkspace(ctx, database.ActivityBumpWorkspaceParams{ + NextAutostart: nextAutostart.UTC(), + WorkspaceID: workspaceID, + }) + if err != nil { + if !xerrors.Is(err, context.Canceled) && !database.IsQueryCanceledError(err) { + // Bump will fail if the context is canceled, but this is ok. + log.Error(ctx, "activity bump failed", slog.Error(err), + slog.F("workspace_id", workspaceID), + ) + } + return + } + + log.Debug(ctx, "bumped deadline from activity", + slog.F("workspace_id", workspaceID), + ) +} diff --git a/coderd/workspaceapps/stats.go b/coderd/workspaceapps/stats.go index b265824ca7c25..f306202798cd7 100644 --- a/coderd/workspaceapps/stats.go +++ b/coderd/workspaceapps/stats.go @@ -11,12 +11,13 @@ import ( "cdr.dev/slog" - "github.com/coder/coder/v2/coderd/agentapi" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/schedule" "github.com/coder/coder/v2/coderd/util/slice" + "github.com/coder/coder/v2/codersdk" ) const ( @@ -63,6 +64,7 @@ var _ StatsReporter = (*StatsDBReporter)(nil) // StatsDBReporter writes workspace app StatsReports to the database. type StatsDBReporter struct { db database.Store + pubsub pubsub.Pubsub logger slog.Logger templateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore] batchSize int @@ -171,7 +173,13 @@ func (r *StatsDBReporter) Report(ctx context.Context, stats []StatsReport) error } } } - agentapi.ActivityBumpWorkspace(ctx, r.logger.Named("activity_bump"), r.db, workspace.ID, nextAutostart) + ActivityBumpWorkspace(ctx, r.logger.Named("activity_bump"), r.db, workspace.ID, nextAutostart) + + err := r.pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspace.ID), []byte{}) + if err != nil { + r.logger.Warn(ctx, "failed to publish workspace agent stats", + slog.F("workspace_id", workspace.ID), slog.Error(err)) + } } return nil From 0a8d85c95a249b06cf6682e2ec0a03558717297c Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Thu, 23 May 2024 16:06:01 +0000 Subject: [PATCH 3/6] move activity bump tests --- coderd/agentapi/activitybump.go | 62 ------------------- .../activity_bump_test.go} | 6 +- 2 files changed, 3 insertions(+), 65 deletions(-) delete mode 100644 coderd/agentapi/activitybump.go rename coderd/{agentapi/activitybump_test.go => workspaceapps/activity_bump_test.go} (98%) diff --git a/coderd/agentapi/activitybump.go b/coderd/agentapi/activitybump.go deleted file mode 100644 index a28ba695d018e..0000000000000 --- a/coderd/agentapi/activitybump.go +++ /dev/null @@ -1,62 +0,0 @@ -package agentapi - -import ( - "context" - "time" - - "github.com/google/uuid" - "golang.org/x/xerrors" - - "cdr.dev/slog" - "github.com/coder/coder/v2/coderd/database" -) - -// ActivityBumpWorkspace automatically bumps the workspace's auto-off timer -// if it is set to expire soon. The deadline will be bumped by 1 hour*. -// If the bump crosses over an autostart time, the workspace will be -// bumped by the workspace ttl instead. -// -// If nextAutostart is the zero value or in the past, the workspace -// will be bumped by 1 hour. -// It handles the edge case in the example: -// 1. Autostart is set to 9am. -// 2. User works all day, and leaves a terminal open to the workspace overnight. -// 3. The open terminal continually bumps the workspace deadline. -// 4. 9am the next day, the activity bump pushes to 10am. -// 5. If the user goes inactive for 1 hour during the day, the workspace will -// now stop, because it has been extended by 1 hour durations. Despite the TTL -// being set to 8hrs from the autostart time. -// -// So the issue is that when the workspace is bumped across an autostart -// deadline, we should treat the workspace as being "started" again and -// extend the deadline by the autostart time + workspace ttl instead. -// -// The issue still remains with build_max_deadline. We need to respect the original -// maximum deadline, so that will need to be handled separately. -// A way to avoid this is to configure the max deadline to something that will not -// span more than 1 day. This will force the workspace to restart and reset the deadline -// each morning when it autostarts. -func ActivityBumpWorkspace(ctx context.Context, log slog.Logger, db database.Store, workspaceID uuid.UUID, nextAutostart time.Time) { - // We set a short timeout so if the app is under load, these - // low priority operations fail first. - ctx, cancel := context.WithTimeout(ctx, time.Second*15) - defer cancel() - // nolint:gocritic // (#13146) Will be moved soon as part of refactor. - err := db.ActivityBumpWorkspace(ctx, database.ActivityBumpWorkspaceParams{ - NextAutostart: nextAutostart.UTC(), - WorkspaceID: workspaceID, - }) - if err != nil { - if !xerrors.Is(err, context.Canceled) && !database.IsQueryCanceledError(err) { - // Bump will fail if the context is canceled, but this is ok. - log.Error(ctx, "activity bump failed", slog.Error(err), - slog.F("workspace_id", workspaceID), - ) - } - return - } - - log.Debug(ctx, "bumped deadline from activity", - slog.F("workspace_id", workspaceID), - ) -} diff --git a/coderd/agentapi/activitybump_test.go b/coderd/workspaceapps/activity_bump_test.go similarity index 98% rename from coderd/agentapi/activitybump_test.go rename to coderd/workspaceapps/activity_bump_test.go index 5c82454c97cef..db51730d00519 100644 --- a/coderd/agentapi/activitybump_test.go +++ b/coderd/workspaceapps/activity_bump_test.go @@ -1,4 +1,4 @@ -package agentapi_test +package workspaceapps_test import ( "database/sql" @@ -8,12 +8,12 @@ import ( "github.com/google/uuid" "cdr.dev/slog/sloggers/slogtest" - "github.com/coder/coder/v2/coderd/agentapi" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbgen" "github.com/coder/coder/v2/coderd/database/dbtestutil" "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/util/ptr" + "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/testutil" "github.com/stretchr/testify/assert" @@ -272,7 +272,7 @@ func Test_ActivityBumpWorkspace(t *testing.T) { // Bump duration is measured from the time of the bump, so we measure from here. start := dbtime.Now() - agentapi.ActivityBumpWorkspace(ctx, log, db, bld.WorkspaceID, nextAutostart(start)) + workspaceapps.ActivityBumpWorkspace(ctx, log, db, bld.WorkspaceID, nextAutostart(start)) end := dbtime.Now() // Validate our state after bump From dc818d7142f1aa53398ad91d42a9692eab9791a5 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Thu, 23 May 2024 16:19:55 +0000 Subject: [PATCH 4/6] remove workspace tracker --- coderd/coderd.go | 19 +-- coderd/coderdtest/coderdtest.go | 8 - coderd/workspaceagents.go | 51 +----- coderd/workspaces.go | 6 +- coderd/workspaceusage/tracker.go | 235 -------------------------- coderd/workspaceusage/tracker_test.go | 225 ------------------------ 6 files changed, 17 insertions(+), 527 deletions(-) delete mode 100644 coderd/workspaceusage/tracker.go delete mode 100644 coderd/workspaceusage/tracker_test.go diff --git a/coderd/coderd.go b/coderd/coderd.go index 80f77d92ee672..916e81d1ef193 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -68,7 +68,6 @@ import ( "github.com/coder/coder/v2/coderd/updatecheck" "github.com/coder/coder/v2/coderd/util/slice" "github.com/coder/coder/v2/coderd/workspaceapps" - "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/drpc" "github.com/coder/coder/v2/codersdk/healthsdk" @@ -204,8 +203,6 @@ type Options struct { // DatabaseRolluper rolls up template usage stats from raw agent and app // stats. This is used to provide insights in the WebUI. DatabaseRolluper *dbrollup.Rolluper - // WorkspaceUsageTracker tracks workspace usage by the CLI. - WorkspaceUsageTracker *workspaceusage.Tracker } // @title Coder API @@ -382,12 +379,6 @@ func New(options *Options) *API { options.DatabaseRolluper = dbrollup.New(options.Logger.Named("dbrollup"), options.Database) } - if options.WorkspaceUsageTracker == nil { - options.WorkspaceUsageTracker = workspaceusage.New(options.Database, - workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), - ) - } - ctx, cancel := context.WithCancel(context.Background()) r := chi.NewRouter() @@ -432,8 +423,7 @@ func New(options *Options) *API { options.Database, options.Pubsub, ), - dbRolluper: options.DatabaseRolluper, - workspaceUsageTracker: options.WorkspaceUsageTracker, + dbRolluper: options.DatabaseRolluper, } api.AppearanceFetcher.Store(&appearance.DefaultFetcher) @@ -1275,13 +1265,13 @@ type API struct { healthCheckGroup *singleflight.Group[string, *healthsdk.HealthcheckReport] healthCheckCache atomic.Pointer[healthsdk.HealthcheckReport] - statsBatcher *batchstats.Batcher + statsBatcher *batchstats.Batcher + statsCollector workspaceapps.StatsCollector Acquirer *provisionerdserver.Acquirer // dbRolluper rolls up template usage stats from raw agent and app // stats. This is used to provide insights in the WebUI. - dbRolluper *dbrollup.Rolluper - workspaceUsageTracker *workspaceusage.Tracker + dbRolluper *dbrollup.Rolluper } // Close waits for all WebSocket connections to drain before returning. @@ -1320,7 +1310,6 @@ func (api *API) Close() error { _ = (*coordinator).Close() } _ = api.agentProvider.Close() - api.workspaceUsageTracker.Close() return nil } diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index 6153f1a68abcb..817490055a53e 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -71,7 +71,6 @@ import ( "github.com/coder/coder/v2/coderd/util/ptr" "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/coderd/workspaceapps/appurl" - "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/drpc" @@ -335,12 +334,6 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can if options.WorkspaceUsageTrackerTick == nil { options.WorkspaceUsageTrackerTick = make(chan time.Time, 1) // buffering just in case } - // Close is called by API.Close() - wuTracker := workspaceusage.New( - options.Database, - workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), - workspaceusage.WithTickFlush(options.WorkspaceUsageTrackerTick, options.WorkspaceUsageTrackerFlush), - ) var mutex sync.RWMutex var handler http.Handler @@ -495,7 +488,6 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can AllowWorkspaceRenames: options.AllowWorkspaceRenames, NewTicker: options.NewTicker, DatabaseRolluper: options.DatabaseRolluper, - WorkspaceUsageTracker: wuTracker, } } diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 9faae72f22ef7..436b3bbc11466 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -36,7 +36,7 @@ import ( "github.com/coder/coder/v2/coderd/httpmw" "github.com/coder/coder/v2/coderd/prometheusmetrics" "github.com/coder/coder/v2/coderd/rbac/policy" - "github.com/coder/coder/v2/coderd/schedule" + "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/workspacesdk" @@ -1167,35 +1167,6 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques slog.F("payload", req), ) - if req.ConnectionCount > 0 { - var nextAutostart time.Time - if workspace.AutostartSchedule.String != "" { - templateSchedule, err := (*(api.TemplateScheduleStore.Load())).Get(ctx, api.Database, workspace.TemplateID) - // If the template schedule fails to load, just default to bumping without the next transition and log it. - if err != nil { - // There's nothing we can do if the query was canceled, the - // client most likely went away so we just return an internal - // server error. - if database.IsQueryCanceledError(err) { - httpapi.InternalServerError(rw, err) - return - } - api.Logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", - slog.F("workspace_id", workspace.ID), - slog.F("template_id", workspace.TemplateID), - slog.Error(err), - ) - } else { - next, allowed := schedule.NextAutostart(time.Now(), workspace.AutostartSchedule.String, templateSchedule) - if allowed { - nextAutostart = next - } - } - } - agentapi.ActivityBumpWorkspace(ctx, api.Logger.Named("activity_bump"), api.Database, workspace.ID, nextAutostart) - } - - now := dbtime.Now() protoStats := &agentproto.Stats{ ConnectionsByProto: req.ConnectionsByProto, ConnectionCount: req.ConnectionCount, @@ -1242,19 +1213,6 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques } return nil }) - if req.SessionCount() > 0 { - errGroup.Go(func() error { - // nolint:gocritic // (#13146) Will be moved soon as part of refactor. - err := api.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{ - ID: workspace.ID, - LastUsedAt: now, - }) - if err != nil { - return xerrors.Errorf("can't update workspace LastUsedAt: %w", err) - } - return nil - }) - } if api.Options.UpdateAgentMetrics != nil { errGroup.Go(func() error { user, err := api.Database.GetUserByID(ctx, workspace.OwnerID) @@ -1277,6 +1235,13 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques return } + // Flushing the stats collector will update last_used_at, + // dealine for the workspace, and will publish a workspace update event. + api.statsCollector.CollectAndFlush(ctx, workspaceapps.StatsReport{ + WorkspaceID: workspace.ID, + // TODO: fill out + }) + httpapi.Write(ctx, rw, http.StatusOK, agentsdk.StatsResponse{ ReportInterval: api.AgentStatsRefreshInterval, }) diff --git a/coderd/workspaces.go b/coderd/workspaces.go index 7d0344be4e321..572a78362319e 100644 --- a/coderd/workspaces.go +++ b/coderd/workspaces.go @@ -29,6 +29,7 @@ import ( "github.com/coder/coder/v2/coderd/searchquery" "github.com/coder/coder/v2/coderd/telemetry" "github.com/coder/coder/v2/coderd/util/ptr" + "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/coderd/wsbuilder" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" @@ -1115,7 +1116,10 @@ func (api *API) postWorkspaceUsage(rw http.ResponseWriter, r *http.Request) { return } - api.workspaceUsageTracker.Add(workspace.ID) + api.statsCollector.CollectAndFlush(r.Context(), workspaceapps.StatsReport{ + WorkspaceID: workspace.ID, + }) + rw.WriteHeader(http.StatusNoContent) } diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go deleted file mode 100644 index 118b021d71d52..0000000000000 --- a/coderd/workspaceusage/tracker.go +++ /dev/null @@ -1,235 +0,0 @@ -package workspaceusage - -import ( - "bytes" - "context" - "flag" - "os" - "sort" - "sync" - "time" - - "github.com/google/uuid" - - "github.com/coder/coder/v2/coderd/database" - "github.com/coder/coder/v2/coderd/database/dbauthz" - - "cdr.dev/slog" - "cdr.dev/slog/sloggers/sloghuman" -) - -var DefaultFlushInterval = 60 * time.Second - -// Store is a subset of database.Store -type Store interface { - BatchUpdateWorkspaceLastUsedAt(context.Context, database.BatchUpdateWorkspaceLastUsedAtParams) error -} - -// Tracker tracks and de-bounces updates to workspace usage activity. -// It keeps an internal map of workspace IDs that have been used and -// periodically flushes this to its configured Store. -type Tracker struct { - log slog.Logger // you know, for logs - flushLock sync.Mutex // protects m - flushErrors int // tracks the number of consecutive errors flushing - m *uuidSet // stores workspace ids - s Store // for flushing data - tickCh <-chan time.Time // controls flush interval - stopTick func() // stops flushing - stopCh chan struct{} // signals us to stop - stopOnce sync.Once // because you only stop once - doneCh chan struct{} // signifies that we have stopped - flushCh chan int // used for testing. -} - -// New returns a new Tracker. It is the caller's responsibility -// to call Close(). -func New(s Store, opts ...Option) *Tracker { - tr := &Tracker{ - log: slog.Make(sloghuman.Sink(os.Stderr)), - m: &uuidSet{}, - s: s, - tickCh: nil, - stopTick: nil, - stopCh: make(chan struct{}), - doneCh: make(chan struct{}), - flushCh: nil, - } - for _, opt := range opts { - opt(tr) - } - if tr.tickCh == nil && tr.stopTick == nil { - tick := time.NewTicker(DefaultFlushInterval) - tr.tickCh = tick.C - tr.stopTick = tick.Stop - } - go tr.loop() - return tr -} - -type Option func(*Tracker) - -// WithLogger sets the logger to be used by Tracker. -func WithLogger(log slog.Logger) Option { - return func(h *Tracker) { - h.log = log - } -} - -// WithFlushInterval allows configuring the flush interval of Tracker. -func WithFlushInterval(d time.Duration) Option { - return func(h *Tracker) { - ticker := time.NewTicker(d) - h.tickCh = ticker.C - h.stopTick = ticker.Stop - } -} - -// WithTickFlush allows passing two channels: one that reads -// a time.Time, and one that returns the number of marked workspaces -// every time Tracker flushes. -// For testing only and will panic if used outside of tests. -func WithTickFlush(tickCh <-chan time.Time, flushCh chan int) Option { - if flag.Lookup("test.v") == nil { - panic("developer error: WithTickFlush is not to be used outside of tests.") - } - return func(h *Tracker) { - h.tickCh = tickCh - h.stopTick = func() {} - h.flushCh = flushCh - } -} - -// Add marks the workspace with the given ID as having been used recently. -// Tracker will periodically flush this to its configured Store. -func (tr *Tracker) Add(workspaceID uuid.UUID) { - tr.m.Add(workspaceID) -} - -// flush updates last_used_at of all current workspace IDs. -// If this is held while a previous flush is in progress, it will -// deadlock until the previous flush has completed. -func (tr *Tracker) flush(now time.Time) { - // Copy our current set of IDs - ids := tr.m.UniqueAndClear() - count := len(ids) - if tr.flushCh != nil { // only used for testing - defer func() { - tr.flushCh <- count - }() - } - if count == 0 { - tr.log.Debug(context.Background(), "nothing to flush") - return - } - - // Set a short-ish timeout for this. We don't want to hang forever. - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - // nolint: gocritic // system function - authCtx := dbauthz.AsSystemRestricted(ctx) - tr.flushLock.Lock() - defer tr.flushLock.Unlock() - // nolint:gocritic // (#13146) Will be moved soon as part of refactor. - if err := tr.s.BatchUpdateWorkspaceLastUsedAt(authCtx, database.BatchUpdateWorkspaceLastUsedAtParams{ - LastUsedAt: now, - IDs: ids, - }); err != nil { - // A single failure to flush is likely not a huge problem. If the workspace is still connected at - // the next iteration, either another coderd instance will likely have this data or the CLI - // will tell us again that the workspace is in use. - tr.flushErrors++ - if tr.flushErrors > 1 { - tr.log.Error(ctx, "multiple failures updating workspaces last_used_at", slog.F("count", count), slog.F("consecutive_errors", tr.flushErrors), slog.Error(err)) - // TODO: if this keeps failing, it indicates a fundamental problem with the database connection. - // How to surface it correctly to admins besides just screaming into the logs? - } else { - tr.log.Warn(ctx, "failed updating workspaces last_used_at", slog.F("count", count), slog.Error(err)) - } - return - } - tr.flushErrors = 0 - tr.log.Info(ctx, "updated workspaces last_used_at", slog.F("count", count), slog.F("now", now)) -} - -// loop periodically flushes every tick. -// If loop is called after Close, it will exit immediately and log an error. -func (tr *Tracker) loop() { - select { - case <-tr.doneCh: - tr.log.Error(context.Background(), "developer error: Loop called after Close") - return - default: - } - defer func() { - close(tr.doneCh) - tr.log.Debug(context.Background(), "workspace usage tracker loop exited") - }() - for { - select { - case <-tr.stopCh: - return - case now, ok := <-tr.tickCh: - if !ok { - return - } - // NOTE: we do not update last_used_at with the time at which each workspace was added. - // Instead, we update with the time of the flush. If the BatchUpdateWorkspacesLastUsedAt - // query can be rewritten to update each id with a corresponding last_used_at timestamp - // then we could capture the exact usage time of each workspace. For now however, as - // we perform this query at a regular interval, the time of the flush is 'close enough' - // for the purposes of both dormancy (and for autostop, in future). - tr.flush(now.UTC()) - } - } -} - -// Close stops Tracker and returns once Loop has exited. -// After calling Close(), Loop must not be called. -func (tr *Tracker) Close() error { - tr.stopOnce.Do(func() { - tr.stopCh <- struct{}{} - tr.stopTick() - <-tr.doneCh - }) - return nil -} - -// uuidSet is a set of UUIDs. Safe for concurrent usage. -// The zero value can be used. -type uuidSet struct { - l sync.Mutex - m map[uuid.UUID]struct{} -} - -func (s *uuidSet) Add(id uuid.UUID) { - s.l.Lock() - defer s.l.Unlock() - if s.m == nil { - s.m = make(map[uuid.UUID]struct{}) - } - s.m[id] = struct{}{} -} - -// UniqueAndClear returns the unique set of entries in s and -// resets the internal map. -func (s *uuidSet) UniqueAndClear() []uuid.UUID { - s.l.Lock() - defer s.l.Unlock() - if s.m == nil { - s.m = make(map[uuid.UUID]struct{}) - return []uuid.UUID{} - } - l := make([]uuid.UUID, 0) - for k := range s.m { - l = append(l, k) - } - // For ease of testing, sort the IDs lexically - sort.Slice(l, func(i, j int) bool { - // For some unfathomable reason, byte arrays are not comparable? - // See https://github.com/golang/go/issues/61004 - return bytes.Compare(l[i][:], l[j][:]) < 0 - }) - clear(s.m) - return l -} diff --git a/coderd/workspaceusage/tracker_test.go b/coderd/workspaceusage/tracker_test.go deleted file mode 100644 index ae9a9d2162d1c..0000000000000 --- a/coderd/workspaceusage/tracker_test.go +++ /dev/null @@ -1,225 +0,0 @@ -package workspaceusage_test - -import ( - "bytes" - "sort" - "sync" - "testing" - "time" - - "github.com/google/uuid" - "github.com/stretchr/testify/require" - "go.uber.org/goleak" - "go.uber.org/mock/gomock" - - "cdr.dev/slog" - "cdr.dev/slog/sloggers/slogtest" - "github.com/coder/coder/v2/coderd/coderdtest" - "github.com/coder/coder/v2/coderd/database" - "github.com/coder/coder/v2/coderd/database/dbfake" - "github.com/coder/coder/v2/coderd/database/dbmock" - "github.com/coder/coder/v2/coderd/database/dbtestutil" - "github.com/coder/coder/v2/coderd/database/dbtime" - "github.com/coder/coder/v2/coderd/database/pubsub" - "github.com/coder/coder/v2/coderd/workspaceusage" - "github.com/coder/coder/v2/codersdk" - "github.com/coder/coder/v2/testutil" -) - -func TestTracker(t *testing.T) { - t.Parallel() - - ctrl := gomock.NewController(t) - mDB := dbmock.NewMockStore(ctrl) - log := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - - tickCh := make(chan time.Time) - flushCh := make(chan int, 1) - wut := workspaceusage.New(mDB, - workspaceusage.WithLogger(log), - workspaceusage.WithTickFlush(tickCh, flushCh), - ) - defer wut.Close() - - // 1. No marked workspaces should imply no flush. - now := dbtime.Now() - tickCh <- now - count := <-flushCh - require.Equal(t, 0, count, "expected zero flushes") - - // 2. One marked workspace should cause a flush. - ids := []uuid.UUID{uuid.New()} - now = dbtime.Now() - wut.Add(ids[0]) - mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{ - LastUsedAt: now, - IDs: ids, - }).Times(1) - tickCh <- now - count = <-flushCh - require.Equal(t, 1, count, "expected one flush with one id") - - // 3. Lots of marked workspaces should also cause a flush. - for i := 0; i < 31; i++ { - ids = append(ids, uuid.New()) - } - - // Sort ids so mDB know what to expect. - sort.Slice(ids, func(i, j int) bool { - return bytes.Compare(ids[i][:], ids[j][:]) < 0 - }) - - now = dbtime.Now() - mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{ - LastUsedAt: now, - IDs: ids, - }) - for _, id := range ids { - wut.Add(id) - } - tickCh <- now - count = <-flushCh - require.Equal(t, len(ids), count, "incorrect number of ids flushed") - - // 4. Try to cause a race condition! - now = dbtime.Now() - // Difficult to know what to EXPECT here, so we won't check strictly here. - mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), gomock.Any()).MinTimes(1).MaxTimes(len(ids)) - // Try to force a race condition. - var wg sync.WaitGroup - count = 0 - for i := 0; i < len(ids); i++ { - wg.Add(1) - go func() { - defer wg.Done() - tickCh <- now - }() - wut.Add(ids[i]) - } - - for i := 0; i < len(ids); i++ { - count += <-flushCh - } - - wg.Wait() - require.Equal(t, len(ids), count, "incorrect number of ids flushed") - - // 5. Closing multiple times should not be a problem. - wut.Close() - wut.Close() -} - -// This test performs a more 'integration-style' test with multiple instances. -func TestTracker_MultipleInstances(t *testing.T) { - t.Parallel() - if !dbtestutil.WillUsePostgres() { - t.Skip("this test only makes sense with postgres") - } - - // Given we have two coderd instances connected to the same database - var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, _ = dbtestutil.NewDB(t) - // real pubsub is not safe for concurrent use, and this test currently - // does not depend on pubsub - ps = pubsub.NewInMemory() - wuTickA = make(chan time.Time) - wuFlushA = make(chan int, 1) - wuTickB = make(chan time.Time) - wuFlushB = make(chan int, 1) - clientA = coderdtest.New(t, &coderdtest.Options{ - WorkspaceUsageTrackerTick: wuTickA, - WorkspaceUsageTrackerFlush: wuFlushA, - Database: db, - Pubsub: ps, - }) - clientB = coderdtest.New(t, &coderdtest.Options{ - WorkspaceUsageTrackerTick: wuTickB, - WorkspaceUsageTrackerFlush: wuFlushB, - Database: db, - Pubsub: ps, - }) - owner = coderdtest.CreateFirstUser(t, clientA) - now = dbtime.Now() - ) - - clientB.SetSessionToken(clientA.SessionToken()) - - // Create a number of workspaces - numWorkspaces := 10 - w := make([]dbfake.WorkspaceResponse, numWorkspaces) - for i := 0; i < numWorkspaces; i++ { - wr := dbfake.WorkspaceBuild(t, db, database.Workspace{ - OwnerID: owner.UserID, - OrganizationID: owner.OrganizationID, - LastUsedAt: now, - }).WithAgent().Do() - w[i] = wr - } - - // Use client A to update LastUsedAt of the first three - require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[0].Workspace.ID)) - require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[1].Workspace.ID)) - require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[2].Workspace.ID)) - // Use client B to update LastUsedAt of the next three - require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[3].Workspace.ID)) - require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[4].Workspace.ID)) - require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[5].Workspace.ID)) - // The next two will have updated from both instances - require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[6].Workspace.ID)) - require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[6].Workspace.ID)) - require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[7].Workspace.ID)) - require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[7].Workspace.ID)) - // The last two will not report any usage. - - // Tick both with different times and wait for both flushes to complete - nowA := now.Add(time.Minute) - nowB := now.Add(2 * time.Minute) - var wg sync.WaitGroup - var flushedA, flushedB int - wg.Add(1) - go func() { - defer wg.Done() - wuTickA <- nowA - flushedA = <-wuFlushA - }() - wg.Add(1) - go func() { - defer wg.Done() - wuTickB <- nowB - flushedB = <-wuFlushB - }() - wg.Wait() - - // We expect 5 flushed IDs each - require.Equal(t, 5, flushedA) - require.Equal(t, 5, flushedB) - - // Fetch updated workspaces - updated := make([]codersdk.Workspace, numWorkspaces) - for i := 0; i < numWorkspaces; i++ { - ws, err := clientA.Workspace(ctx, w[i].Workspace.ID) - require.NoError(t, err) - updated[i] = ws - } - // We expect the first three to have the timestamp of flushA - require.Equal(t, nowA.UTC(), updated[0].LastUsedAt.UTC()) - require.Equal(t, nowA.UTC(), updated[1].LastUsedAt.UTC()) - require.Equal(t, nowA.UTC(), updated[2].LastUsedAt.UTC()) - // We expect the next three to have the timestamp of flushB - require.Equal(t, nowB.UTC(), updated[3].LastUsedAt.UTC()) - require.Equal(t, nowB.UTC(), updated[4].LastUsedAt.UTC()) - require.Equal(t, nowB.UTC(), updated[5].LastUsedAt.UTC()) - // The next two should have the timestamp of flushB as it is newer than flushA - require.Equal(t, nowB.UTC(), updated[6].LastUsedAt.UTC()) - require.Equal(t, nowB.UTC(), updated[7].LastUsedAt.UTC()) - // And the last two should be untouched - require.Equal(t, w[8].Workspace.LastUsedAt.UTC(), updated[8].LastUsedAt.UTC()) - require.Equal(t, w[8].Workspace.LastUsedAt.UTC(), updated[8].LastUsedAt.UTC()) - require.Equal(t, w[9].Workspace.LastUsedAt.UTC(), updated[9].LastUsedAt.UTC()) - require.Equal(t, w[9].Workspace.LastUsedAt.UTC(), updated[9].LastUsedAt.UTC()) -} - -func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) -} From a60de2b8274dbc7279f1ae0dc9c04c35636ad978 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Fri, 24 May 2024 18:26:47 +0000 Subject: [PATCH 5/6] Revert "remove workspace tracker" This reverts commit dc818d7142f1aa53398ad91d42a9692eab9791a5. --- coderd/coderd.go | 19 ++- coderd/coderdtest/coderdtest.go | 8 + coderd/workspaceagents.go | 51 +++++- coderd/workspaces.go | 6 +- coderd/workspaceusage/tracker.go | 235 ++++++++++++++++++++++++++ coderd/workspaceusage/tracker_test.go | 225 ++++++++++++++++++++++++ 6 files changed, 527 insertions(+), 17 deletions(-) create mode 100644 coderd/workspaceusage/tracker.go create mode 100644 coderd/workspaceusage/tracker_test.go diff --git a/coderd/coderd.go b/coderd/coderd.go index 916e81d1ef193..80f77d92ee672 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -68,6 +68,7 @@ import ( "github.com/coder/coder/v2/coderd/updatecheck" "github.com/coder/coder/v2/coderd/util/slice" "github.com/coder/coder/v2/coderd/workspaceapps" + "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/drpc" "github.com/coder/coder/v2/codersdk/healthsdk" @@ -203,6 +204,8 @@ type Options struct { // DatabaseRolluper rolls up template usage stats from raw agent and app // stats. This is used to provide insights in the WebUI. DatabaseRolluper *dbrollup.Rolluper + // WorkspaceUsageTracker tracks workspace usage by the CLI. + WorkspaceUsageTracker *workspaceusage.Tracker } // @title Coder API @@ -379,6 +382,12 @@ func New(options *Options) *API { options.DatabaseRolluper = dbrollup.New(options.Logger.Named("dbrollup"), options.Database) } + if options.WorkspaceUsageTracker == nil { + options.WorkspaceUsageTracker = workspaceusage.New(options.Database, + workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), + ) + } + ctx, cancel := context.WithCancel(context.Background()) r := chi.NewRouter() @@ -423,7 +432,8 @@ func New(options *Options) *API { options.Database, options.Pubsub, ), - dbRolluper: options.DatabaseRolluper, + dbRolluper: options.DatabaseRolluper, + workspaceUsageTracker: options.WorkspaceUsageTracker, } api.AppearanceFetcher.Store(&appearance.DefaultFetcher) @@ -1265,13 +1275,13 @@ type API struct { healthCheckGroup *singleflight.Group[string, *healthsdk.HealthcheckReport] healthCheckCache atomic.Pointer[healthsdk.HealthcheckReport] - statsBatcher *batchstats.Batcher - statsCollector workspaceapps.StatsCollector + statsBatcher *batchstats.Batcher Acquirer *provisionerdserver.Acquirer // dbRolluper rolls up template usage stats from raw agent and app // stats. This is used to provide insights in the WebUI. - dbRolluper *dbrollup.Rolluper + dbRolluper *dbrollup.Rolluper + workspaceUsageTracker *workspaceusage.Tracker } // Close waits for all WebSocket connections to drain before returning. @@ -1310,6 +1320,7 @@ func (api *API) Close() error { _ = (*coordinator).Close() } _ = api.agentProvider.Close() + api.workspaceUsageTracker.Close() return nil } diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index 817490055a53e..6153f1a68abcb 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -71,6 +71,7 @@ import ( "github.com/coder/coder/v2/coderd/util/ptr" "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/coderd/workspaceapps/appurl" + "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/drpc" @@ -334,6 +335,12 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can if options.WorkspaceUsageTrackerTick == nil { options.WorkspaceUsageTrackerTick = make(chan time.Time, 1) // buffering just in case } + // Close is called by API.Close() + wuTracker := workspaceusage.New( + options.Database, + workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), + workspaceusage.WithTickFlush(options.WorkspaceUsageTrackerTick, options.WorkspaceUsageTrackerFlush), + ) var mutex sync.RWMutex var handler http.Handler @@ -488,6 +495,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can AllowWorkspaceRenames: options.AllowWorkspaceRenames, NewTicker: options.NewTicker, DatabaseRolluper: options.DatabaseRolluper, + WorkspaceUsageTracker: wuTracker, } } diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 436b3bbc11466..9faae72f22ef7 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -36,7 +36,7 @@ import ( "github.com/coder/coder/v2/coderd/httpmw" "github.com/coder/coder/v2/coderd/prometheusmetrics" "github.com/coder/coder/v2/coderd/rbac/policy" - "github.com/coder/coder/v2/coderd/workspaceapps" + "github.com/coder/coder/v2/coderd/schedule" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/workspacesdk" @@ -1167,6 +1167,35 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques slog.F("payload", req), ) + if req.ConnectionCount > 0 { + var nextAutostart time.Time + if workspace.AutostartSchedule.String != "" { + templateSchedule, err := (*(api.TemplateScheduleStore.Load())).Get(ctx, api.Database, workspace.TemplateID) + // If the template schedule fails to load, just default to bumping without the next transition and log it. + if err != nil { + // There's nothing we can do if the query was canceled, the + // client most likely went away so we just return an internal + // server error. + if database.IsQueryCanceledError(err) { + httpapi.InternalServerError(rw, err) + return + } + api.Logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", + slog.F("workspace_id", workspace.ID), + slog.F("template_id", workspace.TemplateID), + slog.Error(err), + ) + } else { + next, allowed := schedule.NextAutostart(time.Now(), workspace.AutostartSchedule.String, templateSchedule) + if allowed { + nextAutostart = next + } + } + } + agentapi.ActivityBumpWorkspace(ctx, api.Logger.Named("activity_bump"), api.Database, workspace.ID, nextAutostart) + } + + now := dbtime.Now() protoStats := &agentproto.Stats{ ConnectionsByProto: req.ConnectionsByProto, ConnectionCount: req.ConnectionCount, @@ -1213,6 +1242,19 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques } return nil }) + if req.SessionCount() > 0 { + errGroup.Go(func() error { + // nolint:gocritic // (#13146) Will be moved soon as part of refactor. + err := api.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{ + ID: workspace.ID, + LastUsedAt: now, + }) + if err != nil { + return xerrors.Errorf("can't update workspace LastUsedAt: %w", err) + } + return nil + }) + } if api.Options.UpdateAgentMetrics != nil { errGroup.Go(func() error { user, err := api.Database.GetUserByID(ctx, workspace.OwnerID) @@ -1235,13 +1277,6 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques return } - // Flushing the stats collector will update last_used_at, - // dealine for the workspace, and will publish a workspace update event. - api.statsCollector.CollectAndFlush(ctx, workspaceapps.StatsReport{ - WorkspaceID: workspace.ID, - // TODO: fill out - }) - httpapi.Write(ctx, rw, http.StatusOK, agentsdk.StatsResponse{ ReportInterval: api.AgentStatsRefreshInterval, }) diff --git a/coderd/workspaces.go b/coderd/workspaces.go index 572a78362319e..7d0344be4e321 100644 --- a/coderd/workspaces.go +++ b/coderd/workspaces.go @@ -29,7 +29,6 @@ import ( "github.com/coder/coder/v2/coderd/searchquery" "github.com/coder/coder/v2/coderd/telemetry" "github.com/coder/coder/v2/coderd/util/ptr" - "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/coderd/wsbuilder" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" @@ -1116,10 +1115,7 @@ func (api *API) postWorkspaceUsage(rw http.ResponseWriter, r *http.Request) { return } - api.statsCollector.CollectAndFlush(r.Context(), workspaceapps.StatsReport{ - WorkspaceID: workspace.ID, - }) - + api.workspaceUsageTracker.Add(workspace.ID) rw.WriteHeader(http.StatusNoContent) } diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go new file mode 100644 index 0000000000000..118b021d71d52 --- /dev/null +++ b/coderd/workspaceusage/tracker.go @@ -0,0 +1,235 @@ +package workspaceusage + +import ( + "bytes" + "context" + "flag" + "os" + "sort" + "sync" + "time" + + "github.com/google/uuid" + + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbauthz" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/sloghuman" +) + +var DefaultFlushInterval = 60 * time.Second + +// Store is a subset of database.Store +type Store interface { + BatchUpdateWorkspaceLastUsedAt(context.Context, database.BatchUpdateWorkspaceLastUsedAtParams) error +} + +// Tracker tracks and de-bounces updates to workspace usage activity. +// It keeps an internal map of workspace IDs that have been used and +// periodically flushes this to its configured Store. +type Tracker struct { + log slog.Logger // you know, for logs + flushLock sync.Mutex // protects m + flushErrors int // tracks the number of consecutive errors flushing + m *uuidSet // stores workspace ids + s Store // for flushing data + tickCh <-chan time.Time // controls flush interval + stopTick func() // stops flushing + stopCh chan struct{} // signals us to stop + stopOnce sync.Once // because you only stop once + doneCh chan struct{} // signifies that we have stopped + flushCh chan int // used for testing. +} + +// New returns a new Tracker. It is the caller's responsibility +// to call Close(). +func New(s Store, opts ...Option) *Tracker { + tr := &Tracker{ + log: slog.Make(sloghuman.Sink(os.Stderr)), + m: &uuidSet{}, + s: s, + tickCh: nil, + stopTick: nil, + stopCh: make(chan struct{}), + doneCh: make(chan struct{}), + flushCh: nil, + } + for _, opt := range opts { + opt(tr) + } + if tr.tickCh == nil && tr.stopTick == nil { + tick := time.NewTicker(DefaultFlushInterval) + tr.tickCh = tick.C + tr.stopTick = tick.Stop + } + go tr.loop() + return tr +} + +type Option func(*Tracker) + +// WithLogger sets the logger to be used by Tracker. +func WithLogger(log slog.Logger) Option { + return func(h *Tracker) { + h.log = log + } +} + +// WithFlushInterval allows configuring the flush interval of Tracker. +func WithFlushInterval(d time.Duration) Option { + return func(h *Tracker) { + ticker := time.NewTicker(d) + h.tickCh = ticker.C + h.stopTick = ticker.Stop + } +} + +// WithTickFlush allows passing two channels: one that reads +// a time.Time, and one that returns the number of marked workspaces +// every time Tracker flushes. +// For testing only and will panic if used outside of tests. +func WithTickFlush(tickCh <-chan time.Time, flushCh chan int) Option { + if flag.Lookup("test.v") == nil { + panic("developer error: WithTickFlush is not to be used outside of tests.") + } + return func(h *Tracker) { + h.tickCh = tickCh + h.stopTick = func() {} + h.flushCh = flushCh + } +} + +// Add marks the workspace with the given ID as having been used recently. +// Tracker will periodically flush this to its configured Store. +func (tr *Tracker) Add(workspaceID uuid.UUID) { + tr.m.Add(workspaceID) +} + +// flush updates last_used_at of all current workspace IDs. +// If this is held while a previous flush is in progress, it will +// deadlock until the previous flush has completed. +func (tr *Tracker) flush(now time.Time) { + // Copy our current set of IDs + ids := tr.m.UniqueAndClear() + count := len(ids) + if tr.flushCh != nil { // only used for testing + defer func() { + tr.flushCh <- count + }() + } + if count == 0 { + tr.log.Debug(context.Background(), "nothing to flush") + return + } + + // Set a short-ish timeout for this. We don't want to hang forever. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // nolint: gocritic // system function + authCtx := dbauthz.AsSystemRestricted(ctx) + tr.flushLock.Lock() + defer tr.flushLock.Unlock() + // nolint:gocritic // (#13146) Will be moved soon as part of refactor. + if err := tr.s.BatchUpdateWorkspaceLastUsedAt(authCtx, database.BatchUpdateWorkspaceLastUsedAtParams{ + LastUsedAt: now, + IDs: ids, + }); err != nil { + // A single failure to flush is likely not a huge problem. If the workspace is still connected at + // the next iteration, either another coderd instance will likely have this data or the CLI + // will tell us again that the workspace is in use. + tr.flushErrors++ + if tr.flushErrors > 1 { + tr.log.Error(ctx, "multiple failures updating workspaces last_used_at", slog.F("count", count), slog.F("consecutive_errors", tr.flushErrors), slog.Error(err)) + // TODO: if this keeps failing, it indicates a fundamental problem with the database connection. + // How to surface it correctly to admins besides just screaming into the logs? + } else { + tr.log.Warn(ctx, "failed updating workspaces last_used_at", slog.F("count", count), slog.Error(err)) + } + return + } + tr.flushErrors = 0 + tr.log.Info(ctx, "updated workspaces last_used_at", slog.F("count", count), slog.F("now", now)) +} + +// loop periodically flushes every tick. +// If loop is called after Close, it will exit immediately and log an error. +func (tr *Tracker) loop() { + select { + case <-tr.doneCh: + tr.log.Error(context.Background(), "developer error: Loop called after Close") + return + default: + } + defer func() { + close(tr.doneCh) + tr.log.Debug(context.Background(), "workspace usage tracker loop exited") + }() + for { + select { + case <-tr.stopCh: + return + case now, ok := <-tr.tickCh: + if !ok { + return + } + // NOTE: we do not update last_used_at with the time at which each workspace was added. + // Instead, we update with the time of the flush. If the BatchUpdateWorkspacesLastUsedAt + // query can be rewritten to update each id with a corresponding last_used_at timestamp + // then we could capture the exact usage time of each workspace. For now however, as + // we perform this query at a regular interval, the time of the flush is 'close enough' + // for the purposes of both dormancy (and for autostop, in future). + tr.flush(now.UTC()) + } + } +} + +// Close stops Tracker and returns once Loop has exited. +// After calling Close(), Loop must not be called. +func (tr *Tracker) Close() error { + tr.stopOnce.Do(func() { + tr.stopCh <- struct{}{} + tr.stopTick() + <-tr.doneCh + }) + return nil +} + +// uuidSet is a set of UUIDs. Safe for concurrent usage. +// The zero value can be used. +type uuidSet struct { + l sync.Mutex + m map[uuid.UUID]struct{} +} + +func (s *uuidSet) Add(id uuid.UUID) { + s.l.Lock() + defer s.l.Unlock() + if s.m == nil { + s.m = make(map[uuid.UUID]struct{}) + } + s.m[id] = struct{}{} +} + +// UniqueAndClear returns the unique set of entries in s and +// resets the internal map. +func (s *uuidSet) UniqueAndClear() []uuid.UUID { + s.l.Lock() + defer s.l.Unlock() + if s.m == nil { + s.m = make(map[uuid.UUID]struct{}) + return []uuid.UUID{} + } + l := make([]uuid.UUID, 0) + for k := range s.m { + l = append(l, k) + } + // For ease of testing, sort the IDs lexically + sort.Slice(l, func(i, j int) bool { + // For some unfathomable reason, byte arrays are not comparable? + // See https://github.com/golang/go/issues/61004 + return bytes.Compare(l[i][:], l[j][:]) < 0 + }) + clear(s.m) + return l +} diff --git a/coderd/workspaceusage/tracker_test.go b/coderd/workspaceusage/tracker_test.go new file mode 100644 index 0000000000000..ae9a9d2162d1c --- /dev/null +++ b/coderd/workspaceusage/tracker_test.go @@ -0,0 +1,225 @@ +package workspaceusage_test + +import ( + "bytes" + "sort" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "go.uber.org/mock/gomock" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/coder/v2/coderd/coderdtest" + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbfake" + "github.com/coder/coder/v2/coderd/database/dbmock" + "github.com/coder/coder/v2/coderd/database/dbtestutil" + "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/coderd/database/pubsub" + "github.com/coder/coder/v2/coderd/workspaceusage" + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/testutil" +) + +func TestTracker(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + mDB := dbmock.NewMockStore(ctrl) + log := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + + tickCh := make(chan time.Time) + flushCh := make(chan int, 1) + wut := workspaceusage.New(mDB, + workspaceusage.WithLogger(log), + workspaceusage.WithTickFlush(tickCh, flushCh), + ) + defer wut.Close() + + // 1. No marked workspaces should imply no flush. + now := dbtime.Now() + tickCh <- now + count := <-flushCh + require.Equal(t, 0, count, "expected zero flushes") + + // 2. One marked workspace should cause a flush. + ids := []uuid.UUID{uuid.New()} + now = dbtime.Now() + wut.Add(ids[0]) + mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{ + LastUsedAt: now, + IDs: ids, + }).Times(1) + tickCh <- now + count = <-flushCh + require.Equal(t, 1, count, "expected one flush with one id") + + // 3. Lots of marked workspaces should also cause a flush. + for i := 0; i < 31; i++ { + ids = append(ids, uuid.New()) + } + + // Sort ids so mDB know what to expect. + sort.Slice(ids, func(i, j int) bool { + return bytes.Compare(ids[i][:], ids[j][:]) < 0 + }) + + now = dbtime.Now() + mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{ + LastUsedAt: now, + IDs: ids, + }) + for _, id := range ids { + wut.Add(id) + } + tickCh <- now + count = <-flushCh + require.Equal(t, len(ids), count, "incorrect number of ids flushed") + + // 4. Try to cause a race condition! + now = dbtime.Now() + // Difficult to know what to EXPECT here, so we won't check strictly here. + mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), gomock.Any()).MinTimes(1).MaxTimes(len(ids)) + // Try to force a race condition. + var wg sync.WaitGroup + count = 0 + for i := 0; i < len(ids); i++ { + wg.Add(1) + go func() { + defer wg.Done() + tickCh <- now + }() + wut.Add(ids[i]) + } + + for i := 0; i < len(ids); i++ { + count += <-flushCh + } + + wg.Wait() + require.Equal(t, len(ids), count, "incorrect number of ids flushed") + + // 5. Closing multiple times should not be a problem. + wut.Close() + wut.Close() +} + +// This test performs a more 'integration-style' test with multiple instances. +func TestTracker_MultipleInstances(t *testing.T) { + t.Parallel() + if !dbtestutil.WillUsePostgres() { + t.Skip("this test only makes sense with postgres") + } + + // Given we have two coderd instances connected to the same database + var ( + ctx = testutil.Context(t, testutil.WaitLong) + db, _ = dbtestutil.NewDB(t) + // real pubsub is not safe for concurrent use, and this test currently + // does not depend on pubsub + ps = pubsub.NewInMemory() + wuTickA = make(chan time.Time) + wuFlushA = make(chan int, 1) + wuTickB = make(chan time.Time) + wuFlushB = make(chan int, 1) + clientA = coderdtest.New(t, &coderdtest.Options{ + WorkspaceUsageTrackerTick: wuTickA, + WorkspaceUsageTrackerFlush: wuFlushA, + Database: db, + Pubsub: ps, + }) + clientB = coderdtest.New(t, &coderdtest.Options{ + WorkspaceUsageTrackerTick: wuTickB, + WorkspaceUsageTrackerFlush: wuFlushB, + Database: db, + Pubsub: ps, + }) + owner = coderdtest.CreateFirstUser(t, clientA) + now = dbtime.Now() + ) + + clientB.SetSessionToken(clientA.SessionToken()) + + // Create a number of workspaces + numWorkspaces := 10 + w := make([]dbfake.WorkspaceResponse, numWorkspaces) + for i := 0; i < numWorkspaces; i++ { + wr := dbfake.WorkspaceBuild(t, db, database.Workspace{ + OwnerID: owner.UserID, + OrganizationID: owner.OrganizationID, + LastUsedAt: now, + }).WithAgent().Do() + w[i] = wr + } + + // Use client A to update LastUsedAt of the first three + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[0].Workspace.ID)) + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[1].Workspace.ID)) + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[2].Workspace.ID)) + // Use client B to update LastUsedAt of the next three + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[3].Workspace.ID)) + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[4].Workspace.ID)) + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[5].Workspace.ID)) + // The next two will have updated from both instances + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[6].Workspace.ID)) + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[6].Workspace.ID)) + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[7].Workspace.ID)) + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[7].Workspace.ID)) + // The last two will not report any usage. + + // Tick both with different times and wait for both flushes to complete + nowA := now.Add(time.Minute) + nowB := now.Add(2 * time.Minute) + var wg sync.WaitGroup + var flushedA, flushedB int + wg.Add(1) + go func() { + defer wg.Done() + wuTickA <- nowA + flushedA = <-wuFlushA + }() + wg.Add(1) + go func() { + defer wg.Done() + wuTickB <- nowB + flushedB = <-wuFlushB + }() + wg.Wait() + + // We expect 5 flushed IDs each + require.Equal(t, 5, flushedA) + require.Equal(t, 5, flushedB) + + // Fetch updated workspaces + updated := make([]codersdk.Workspace, numWorkspaces) + for i := 0; i < numWorkspaces; i++ { + ws, err := clientA.Workspace(ctx, w[i].Workspace.ID) + require.NoError(t, err) + updated[i] = ws + } + // We expect the first three to have the timestamp of flushA + require.Equal(t, nowA.UTC(), updated[0].LastUsedAt.UTC()) + require.Equal(t, nowA.UTC(), updated[1].LastUsedAt.UTC()) + require.Equal(t, nowA.UTC(), updated[2].LastUsedAt.UTC()) + // We expect the next three to have the timestamp of flushB + require.Equal(t, nowB.UTC(), updated[3].LastUsedAt.UTC()) + require.Equal(t, nowB.UTC(), updated[4].LastUsedAt.UTC()) + require.Equal(t, nowB.UTC(), updated[5].LastUsedAt.UTC()) + // The next two should have the timestamp of flushB as it is newer than flushA + require.Equal(t, nowB.UTC(), updated[6].LastUsedAt.UTC()) + require.Equal(t, nowB.UTC(), updated[7].LastUsedAt.UTC()) + // And the last two should be untouched + require.Equal(t, w[8].Workspace.LastUsedAt.UTC(), updated[8].LastUsedAt.UTC()) + require.Equal(t, w[8].Workspace.LastUsedAt.UTC(), updated[8].LastUsedAt.UTC()) + require.Equal(t, w[9].Workspace.LastUsedAt.UTC(), updated[9].LastUsedAt.UTC()) + require.Equal(t, w[9].Workspace.LastUsedAt.UTC(), updated[9].LastUsedAt.UTC()) +} + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} From 4fc5e470113dffabafc485852b3e5e8fb46c2a7f Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Tue, 28 May 2024 14:25:06 +0000 Subject: [PATCH 6/6] save --- coderd/batchstats/batcher.go | 2 +- coderd/coderd.go | 3 ++- coderd/workspaceagents.go | 49 +++++------------------------------ coderd/workspaceapps/stats.go | 34 ++++++++++++++++++++---- 4 files changed, 38 insertions(+), 50 deletions(-) diff --git a/coderd/batchstats/batcher.go b/coderd/batchstats/batcher.go index bbff38b0413c0..1c19d62c7553d 100644 --- a/coderd/batchstats/batcher.go +++ b/coderd/batchstats/batcher.go @@ -25,7 +25,7 @@ const ( ) // Batcher holds a buffer of agent stats and periodically flushes them to -// its configured store. It also updates the workspace's last used time. +// its configured store. type Batcher struct { store database.Store log slog.Logger diff --git a/coderd/coderd.go b/coderd/coderd.go index 80f77d92ee672..f712ee16f6a77 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -1275,7 +1275,8 @@ type API struct { healthCheckGroup *singleflight.Group[string, *healthsdk.HealthcheckReport] healthCheckCache atomic.Pointer[healthsdk.HealthcheckReport] - statsBatcher *batchstats.Batcher + statsBatcher *batchstats.Batcher + statsCollector workspaceapps.StatsCollector Acquirer *provisionerdserver.Acquirer // dbRolluper rolls up template usage stats from raw agent and app diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 9faae72f22ef7..820733600b2fb 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -36,7 +36,7 @@ import ( "github.com/coder/coder/v2/coderd/httpmw" "github.com/coder/coder/v2/coderd/prometheusmetrics" "github.com/coder/coder/v2/coderd/rbac/policy" - "github.com/coder/coder/v2/coderd/schedule" + "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/workspacesdk" @@ -1167,35 +1167,6 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques slog.F("payload", req), ) - if req.ConnectionCount > 0 { - var nextAutostart time.Time - if workspace.AutostartSchedule.String != "" { - templateSchedule, err := (*(api.TemplateScheduleStore.Load())).Get(ctx, api.Database, workspace.TemplateID) - // If the template schedule fails to load, just default to bumping without the next transition and log it. - if err != nil { - // There's nothing we can do if the query was canceled, the - // client most likely went away so we just return an internal - // server error. - if database.IsQueryCanceledError(err) { - httpapi.InternalServerError(rw, err) - return - } - api.Logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", - slog.F("workspace_id", workspace.ID), - slog.F("template_id", workspace.TemplateID), - slog.Error(err), - ) - } else { - next, allowed := schedule.NextAutostart(time.Now(), workspace.AutostartSchedule.String, templateSchedule) - if allowed { - nextAutostart = next - } - } - } - agentapi.ActivityBumpWorkspace(ctx, api.Logger.Named("activity_bump"), api.Database, workspace.ID, nextAutostart) - } - - now := dbtime.Now() protoStats := &agentproto.Stats{ ConnectionsByProto: req.ConnectionsByProto, ConnectionCount: req.ConnectionCount, @@ -1242,19 +1213,6 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques } return nil }) - if req.SessionCount() > 0 { - errGroup.Go(func() error { - // nolint:gocritic // (#13146) Will be moved soon as part of refactor. - err := api.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{ - ID: workspace.ID, - LastUsedAt: now, - }) - if err != nil { - return xerrors.Errorf("can't update workspace LastUsedAt: %w", err) - } - return nil - }) - } if api.Options.UpdateAgentMetrics != nil { errGroup.Go(func() error { user, err := api.Database.GetUserByID(ctx, workspace.OwnerID) @@ -1277,6 +1235,11 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques return } + api.statsCollector.CollectAndFlush(ctx, workspaceapps.StatsReport{ + WorkspaceID: workspace.ID, + // TODO: fill out + }) + httpapi.Write(ctx, rw, http.StatusOK, agentsdk.StatsResponse{ ReportInterval: api.AgentStatsRefreshInterval, }) diff --git a/coderd/workspaceapps/stats.go b/coderd/workspaceapps/stats.go index f306202798cd7..6ac76fc5254ab 100644 --- a/coderd/workspaceapps/stats.go +++ b/coderd/workspaceapps/stats.go @@ -11,6 +11,7 @@ import ( "cdr.dev/slog" + agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/database/dbtime" @@ -236,7 +237,10 @@ type StatsCollectorOptions struct { // RollupWindow is the window size for rolling up stats, session shorter // than this will be rolled up and longer than this will be tracked // individually. - RollupWindow time.Duration + RollupWindow time.Duration + DB database.Store + Pubsub pubsub.Pubsub + TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore] // Options for tests. Flush <-chan chan<- struct{} @@ -295,11 +299,31 @@ func (sc *StatsCollector) Collect(report StatsReport) { sc.opts.Logger.Debug(sc.ctx, "collected workspace app stats", slog.F("report", report)) } -func (sc *StatsCollector) CollectAndFlush(ctx context.Context, report StatsReport) error { - sc.Collect(report) - err := sc.flush(ctx) +func (sc *StatsCollector) CollectAgentStat(ctx context.Context, now time.Time, agentID uuid.UUID, workspace database.Workspace, st *agentproto.Stats) error { + var nextAutostart time.Time + if workspace.AutostartSchedule.String != "" { + templateSchedule, err := (*(sc.opts.TemplateScheduleStore.Load())).Get(ctx, sc.opts.DB, workspace.TemplateID) + // If the template schedule fails to load, just default to bumping + // without the next transition and log it. + if err != nil { + sc.opts.Logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", + slog.F("workspace_id", workspace.ID), + slog.F("template_id", workspace.TemplateID), + slog.Error(err), + ) + } else { + next, allowed := schedule.NextAutostart(dbtime.Now(), workspace.AutostartSchedule.String, templateSchedule) + if allowed { + nextAutostart = next + } + } + } + ActivityBumpWorkspace(ctx, sc.opts.Logger.Named("activity_bump"), sc.opts.DB, workspace.ID, nextAutostart) + + err := sc.opts.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspace.ID), []byte{}) if err != nil { - return xerrors.Errorf("flushing collector: %w", err) + sc.opts.Logger.Warn(ctx, "failed to publish workspace agent stats", + slog.F("workspace_id", workspace.ID), slog.Error(err)) } return nil