diff --git a/coderd/agentapi/api.go b/coderd/agentapi/api.go index fa8563a141a45..b8b07672d6aa2 100644 --- a/coderd/agentapi/api.go +++ b/coderd/agentapi/api.go @@ -24,6 +24,7 @@ import ( "github.com/coder/coder/v2/coderd/prometheusmetrics" "github.com/coder/coder/v2/coderd/schedule" "github.com/coder/coder/v2/coderd/tracing" + "github.com/coder/coder/v2/coderd/workspacestats" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/tailnet" tailnetproto "github.com/coder/coder/v2/tailnet/proto" @@ -59,7 +60,7 @@ type Options struct { DerpMapFn func() *tailcfg.DERPMap TailnetCoordinator *atomic.Pointer[tailnet.Coordinator] TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore] - StatsBatcher StatsBatcher + StatsReporter *workspacestats.Reporter AppearanceFetcher *atomic.Pointer[appearance.Fetcher] PublishWorkspaceUpdateFn func(ctx context.Context, workspaceID uuid.UUID) PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage) @@ -114,12 +115,9 @@ func New(opts Options) *API { api.StatsAPI = &StatsAPI{ AgentFn: api.agent, Database: opts.Database, - Pubsub: opts.Pubsub, Log: opts.Log, - StatsBatcher: opts.StatsBatcher, - TemplateScheduleStore: opts.TemplateScheduleStore, + StatsReporter: opts.StatsReporter, AgentStatsRefreshInterval: opts.AgentStatsRefreshInterval, - UpdateAgentMetricsFn: opts.UpdateAgentMetricsFn, } api.LifecycleAPI = &LifecycleAPI{ diff --git a/coderd/agentapi/stats.go b/coderd/agentapi/stats.go index e91a3624e915d..ee17897572f3d 100644 --- a/coderd/agentapi/stats.go +++ b/coderd/agentapi/stats.go @@ -2,10 +2,8 @@ package agentapi import ( "context" - "sync/atomic" "time" - "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "google.golang.org/protobuf/types/known/durationpb" @@ -15,10 +13,7 @@ import ( agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbtime" - "github.com/coder/coder/v2/coderd/database/pubsub" - "github.com/coder/coder/v2/coderd/prometheusmetrics" - "github.com/coder/coder/v2/coderd/schedule" - "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/coderd/workspacestats" ) type StatsBatcher interface { @@ -28,12 +23,9 @@ type StatsBatcher interface { type StatsAPI struct { AgentFn func(context.Context) (database.WorkspaceAgent, error) Database database.Store - Pubsub pubsub.Pubsub Log slog.Logger - StatsBatcher StatsBatcher - TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore] + StatsReporter *workspacestats.Reporter AgentStatsRefreshInterval time.Duration - UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) TimeNowFn func() time.Time // defaults to dbtime.Now() } @@ -69,80 +61,17 @@ func (a *StatsAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsR slog.F("payload", req), ) - now := a.now() - if req.Stats.ConnectionCount > 0 { - var nextAutostart time.Time - if workspace.AutostartSchedule.String != "" { - templateSchedule, err := (*(a.TemplateScheduleStore.Load())).Get(ctx, a.Database, workspace.TemplateID) - // If the template schedule fails to load, just default to bumping - // without the next transition and log it. - if err != nil { - a.Log.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", - slog.F("workspace_id", workspace.ID), - slog.F("template_id", workspace.TemplateID), - slog.Error(err), - ) - } else { - next, allowed := schedule.NextAutostart(now, workspace.AutostartSchedule.String, templateSchedule) - if allowed { - nextAutostart = next - } - } - } - ActivityBumpWorkspace(ctx, a.Log.Named("activity_bump"), a.Database, workspace.ID, nextAutostart) - } - - var errGroup errgroup.Group - errGroup.Go(func() error { - err := a.StatsBatcher.Add(now, workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req.Stats) - if err != nil { - a.Log.Error(ctx, "add agent stats to batcher", slog.Error(err)) - return xerrors.Errorf("insert workspace agent stats batch: %w", err) - } - return nil - }) - errGroup.Go(func() error { - // nolint:gocritic // (#13146) Will be moved soon as part of refactor. - err := a.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{ - ID: workspace.ID, - LastUsedAt: now, - }) - if err != nil { - return xerrors.Errorf("update workspace LastUsedAt: %w", err) - } - return nil - }) - if a.UpdateAgentMetricsFn != nil { - errGroup.Go(func() error { - user, err := a.Database.GetUserByID(ctx, workspace.OwnerID) - if err != nil { - return xerrors.Errorf("get user: %w", err) - } - - a.UpdateAgentMetricsFn(ctx, prometheusmetrics.AgentMetricLabels{ - Username: user.Username, - WorkspaceName: workspace.Name, - AgentName: workspaceAgent.Name, - TemplateName: getWorkspaceAgentByIDRow.TemplateName, - }, req.Stats.Metrics) - return nil - }) - } - err = errGroup.Wait() + err = a.StatsReporter.ReportAgentStats( + ctx, + a.now(), + workspace, + workspaceAgent, + getWorkspaceAgentByIDRow.TemplateName, + req.Stats, + ) if err != nil { - return nil, xerrors.Errorf("update stats in database: %w", err) + return nil, xerrors.Errorf("report agent stats: %w", err) } - // Tell the frontend about the new agent report, now that everything is updated - a.publishWorkspaceAgentStats(ctx, workspace.ID) - return res, nil } - -func (a *StatsAPI) publishWorkspaceAgentStats(ctx context.Context, workspaceID uuid.UUID) { - err := a.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspaceID), []byte{}) - if err != nil { - a.Log.Warn(ctx, "failed to publish workspace agent stats", - slog.F("workspace_id", workspaceID), slog.Error(err)) - } -} diff --git a/coderd/agentapi/stats_test.go b/coderd/agentapi/stats_test.go index 943c8e7ac0e17..c304dea93ecc9 100644 --- a/coderd/agentapi/stats_test.go +++ b/coderd/agentapi/stats_test.go @@ -22,6 +22,7 @@ import ( "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/prometheusmetrics" "github.com/coder/coder/v2/coderd/schedule" + "github.com/coder/coder/v2/coderd/workspacestats" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/testutil" ) @@ -129,21 +130,24 @@ func TestUpdateStates(t *testing.T) { AgentFn: func(context.Context) (database.WorkspaceAgent, error) { return agent, nil }, - Database: dbM, - Pubsub: ps, - StatsBatcher: batcher, - TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore), + Database: dbM, + StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{ + Database: dbM, + Pubsub: ps, + StatsBatcher: batcher, + TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore), + UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) { + updateAgentMetricsFnCalled = true + assert.Equal(t, prometheusmetrics.AgentMetricLabels{ + Username: user.Username, + WorkspaceName: workspace.Name, + AgentName: agent.Name, + TemplateName: template.Name, + }, labels) + assert.Equal(t, req.Stats.Metrics, metrics) + }, + }), AgentStatsRefreshInterval: 10 * time.Second, - UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) { - updateAgentMetricsFnCalled = true - assert.Equal(t, prometheusmetrics.AgentMetricLabels{ - Username: user.Username, - WorkspaceName: workspace.Name, - AgentName: agent.Name, - TemplateName: template.Name, - }, labels) - assert.Equal(t, req.Stats.Metrics, metrics) - }, TimeNowFn: func() time.Time { return now }, @@ -232,13 +236,16 @@ func TestUpdateStates(t *testing.T) { AgentFn: func(context.Context) (database.WorkspaceAgent, error) { return agent, nil }, - Database: dbM, - Pubsub: ps, - StatsBatcher: batcher, - TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore), + Database: dbM, + StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{ + Database: dbM, + Pubsub: ps, + StatsBatcher: batcher, + TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore), + // Ignored when nil. + UpdateAgentMetricsFn: nil, + }), AgentStatsRefreshInterval: 10 * time.Second, - // Ignored when nil. - UpdateAgentMetricsFn: nil, TimeNowFn: func() time.Time { return now }, @@ -274,12 +281,15 @@ func TestUpdateStates(t *testing.T) { AgentFn: func(context.Context) (database.WorkspaceAgent, error) { return agent, nil }, - Database: dbM, - Pubsub: ps, - StatsBatcher: nil, // should not be called - TemplateScheduleStore: nil, // should not be called + Database: dbM, + StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{ + Database: dbM, + Pubsub: ps, + StatsBatcher: nil, // should not be called + TemplateScheduleStore: nil, // should not be called + UpdateAgentMetricsFn: nil, // should not be called + }), AgentStatsRefreshInterval: 10 * time.Second, - UpdateAgentMetricsFn: nil, // should not be called TimeNowFn: func() time.Time { panic("should not be called") }, @@ -343,21 +353,24 @@ func TestUpdateStates(t *testing.T) { AgentFn: func(context.Context) (database.WorkspaceAgent, error) { return agent, nil }, - Database: dbM, - Pubsub: ps, - StatsBatcher: batcher, - TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore), + Database: dbM, + StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{ + Database: dbM, + Pubsub: ps, + StatsBatcher: batcher, + TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore), + UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) { + updateAgentMetricsFnCalled = true + assert.Equal(t, prometheusmetrics.AgentMetricLabels{ + Username: user.Username, + WorkspaceName: workspace.Name, + AgentName: agent.Name, + TemplateName: template.Name, + }, labels) + assert.Equal(t, req.Stats.Metrics, metrics) + }, + }), AgentStatsRefreshInterval: 15 * time.Second, - UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) { - updateAgentMetricsFnCalled = true - assert.Equal(t, prometheusmetrics.AgentMetricLabels{ - Username: user.Username, - WorkspaceName: workspace.Name, - AgentName: agent.Name, - TemplateName: template.Name, - }, labels) - assert.Equal(t, req.Stats.Metrics, metrics) - }, TimeNowFn: func() time.Time { return now }, diff --git a/coderd/coderd.go b/coderd/coderd.go index 9ee21a23cf79f..1ded4f49a1ab1 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -68,6 +68,7 @@ import ( "github.com/coder/coder/v2/coderd/updatecheck" "github.com/coder/coder/v2/coderd/util/slice" "github.com/coder/coder/v2/coderd/workspaceapps" + "github.com/coder/coder/v2/coderd/workspacestats" "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/drpc" @@ -547,13 +548,22 @@ func New(options *Options) *API { api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err)) } + api.statsReporter = workspacestats.NewReporter(workspacestats.ReporterOptions{ + Database: options.Database, + Logger: options.Logger.Named("workspacestats"), + Pubsub: options.Pubsub, + TemplateScheduleStore: options.TemplateScheduleStore, + StatsBatcher: options.StatsBatcher, + UpdateAgentMetricsFn: options.UpdateAgentMetrics, + AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize, + }) workspaceAppsLogger := options.Logger.Named("workspaceapps") if options.WorkspaceAppsStatsCollectorOptions.Logger == nil { named := workspaceAppsLogger.Named("stats_collector") options.WorkspaceAppsStatsCollectorOptions.Logger = &named } if options.WorkspaceAppsStatsCollectorOptions.Reporter == nil { - options.WorkspaceAppsStatsCollectorOptions.Reporter = workspaceapps.NewStatsDBReporter(options.Database, workspaceapps.DefaultStatsDBReporterBatchSize) + options.WorkspaceAppsStatsCollectorOptions.Reporter = api.statsReporter } api.workspaceAppServer = &workspaceapps.Server{ @@ -623,8 +633,6 @@ func New(options *Options) *API { cors := httpmw.Cors(options.DeploymentValues.Dangerous.AllowAllCors.Value()) prometheusMW := httpmw.Prometheus(options.PrometheusRegistry) - api.statsBatcher = options.StatsBatcher - r.Use( httpmw.Recover(api.Logger), tracing.StatusWriterMiddleware, @@ -1277,7 +1285,7 @@ type API struct { healthCheckGroup *singleflight.Group[string, *healthsdk.HealthcheckReport] healthCheckCache atomic.Pointer[healthsdk.HealthcheckReport] - statsBatcher *batchstats.Batcher + statsReporter *workspacestats.Reporter Acquirer *provisionerdserver.Acquirer // dbRolluper rolls up template usage stats from raw agent and app diff --git a/coderd/database/dbauthz/setup_test.go b/coderd/database/dbauthz/setup_test.go index 3385ca3f3240c..95d8b70a42b40 100644 --- a/coderd/database/dbauthz/setup_test.go +++ b/coderd/database/dbauthz/setup_test.go @@ -264,7 +264,7 @@ func (s *MethodTestSuite) NotAuthorizedErrorTest(ctx context.Context, az *coderd // any case where the error is nil and the response is an empty slice. if err != nil || !hasEmptySliceResponse(resp) { s.Errorf(err, "method should an error with cancellation") - s.ErrorIsf(err, context.Canceled, "error should match context.Cancelled") + s.ErrorIsf(err, context.Canceled, "error should match context.Canceled") } }) } diff --git a/coderd/insights_test.go b/coderd/insights_test.go index b6a28f7b0c59b..22e7ed6947bac 100644 --- a/coderd/insights_test.go +++ b/coderd/insights_test.go @@ -31,6 +31,7 @@ import ( "github.com/coder/coder/v2/coderd/rbac" "github.com/coder/coder/v2/coderd/rbac/policy" "github.com/coder/coder/v2/coderd/workspaceapps" + "github.com/coder/coder/v2/coderd/workspacestats" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/workspacesdk" @@ -736,9 +737,12 @@ func TestTemplateInsights_Golden(t *testing.T) { }) } } - reporter := workspaceapps.NewStatsDBReporter(db, workspaceapps.DefaultStatsDBReporterBatchSize) + reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{ + Database: db, + AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize, + }) //nolint:gocritic // This is a test. - err = reporter.Report(dbauthz.AsSystemRestricted(ctx), stats) + err = reporter.ReportAppStats(dbauthz.AsSystemRestricted(ctx), stats) require.NoError(t, err, "want no error inserting app stats") return client, events @@ -1632,9 +1636,12 @@ func TestUserActivityInsights_Golden(t *testing.T) { }) } } - reporter := workspaceapps.NewStatsDBReporter(db, workspaceapps.DefaultStatsDBReporterBatchSize) + reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{ + Database: db, + AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize, + }) //nolint:gocritic // This is a test. - err = reporter.Report(dbauthz.AsSystemRestricted(ctx), stats) + err = reporter.ReportAppStats(dbauthz.AsSystemRestricted(ctx), stats) require.NoError(t, err, "want no error inserting app stats") return client, events diff --git a/coderd/prometheusmetrics/insights/metricscollector_test.go b/coderd/prometheusmetrics/insights/metricscollector_test.go index 598c154db08d8..91ef3c7ee88fa 100644 --- a/coderd/prometheusmetrics/insights/metricscollector_test.go +++ b/coderd/prometheusmetrics/insights/metricscollector_test.go @@ -25,6 +25,7 @@ import ( "github.com/coder/coder/v2/coderd/database/dbtestutil" "github.com/coder/coder/v2/coderd/prometheusmetrics/insights" "github.com/coder/coder/v2/coderd/workspaceapps" + "github.com/coder/coder/v2/coderd/workspacestats" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/testutil" ) @@ -109,10 +110,13 @@ func TestCollectInsights(t *testing.T) { require.NoError(t, err, "unable to post fake stats") // Fake app usage - reporter := workspaceapps.NewStatsDBReporter(db, workspaceapps.DefaultStatsDBReporterBatchSize) + reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{ + Database: db, + AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize, + }) refTime := time.Now().Add(-3 * time.Minute).Truncate(time.Minute) //nolint:gocritic // This is a test. - err = reporter.Report(dbauthz.AsSystemRestricted(context.Background()), []workspaceapps.StatsReport{ + err = reporter.ReportAppStats(dbauthz.AsSystemRestricted(context.Background()), []workspaceapps.StatsReport{ { UserID: user.ID, WorkspaceID: workspace1.ID, diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 9faae72f22ef7..1821948572e29 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -34,9 +34,7 @@ import ( "github.com/coder/coder/v2/coderd/externalauth" "github.com/coder/coder/v2/coderd/httpapi" "github.com/coder/coder/v2/coderd/httpmw" - "github.com/coder/coder/v2/coderd/prometheusmetrics" "github.com/coder/coder/v2/coderd/rbac/policy" - "github.com/coder/coder/v2/coderd/schedule" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/workspacesdk" @@ -1167,35 +1165,6 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques slog.F("payload", req), ) - if req.ConnectionCount > 0 { - var nextAutostart time.Time - if workspace.AutostartSchedule.String != "" { - templateSchedule, err := (*(api.TemplateScheduleStore.Load())).Get(ctx, api.Database, workspace.TemplateID) - // If the template schedule fails to load, just default to bumping without the next transition and log it. - if err != nil { - // There's nothing we can do if the query was canceled, the - // client most likely went away so we just return an internal - // server error. - if database.IsQueryCanceledError(err) { - httpapi.InternalServerError(rw, err) - return - } - api.Logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", - slog.F("workspace_id", workspace.ID), - slog.F("template_id", workspace.TemplateID), - slog.Error(err), - ) - } else { - next, allowed := schedule.NextAutostart(time.Now(), workspace.AutostartSchedule.String, templateSchedule) - if allowed { - nextAutostart = next - } - } - } - agentapi.ActivityBumpWorkspace(ctx, api.Logger.Named("activity_bump"), api.Database, workspace.ID, nextAutostart) - } - - now := dbtime.Now() protoStats := &agentproto.Stats{ ConnectionsByProto: req.ConnectionsByProto, ConnectionCount: req.ConnectionCount, @@ -1232,46 +1201,14 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques } } } - - var errGroup errgroup.Group - errGroup.Go(func() error { - err := api.statsBatcher.Add(time.Now(), workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, protoStats) - if err != nil { - api.Logger.Error(ctx, "failed to add stats to batcher", slog.Error(err)) - return xerrors.Errorf("can't insert workspace agent stat: %w", err) - } - return nil - }) - if req.SessionCount() > 0 { - errGroup.Go(func() error { - // nolint:gocritic // (#13146) Will be moved soon as part of refactor. - err := api.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{ - ID: workspace.ID, - LastUsedAt: now, - }) - if err != nil { - return xerrors.Errorf("can't update workspace LastUsedAt: %w", err) - } - return nil - }) - } - if api.Options.UpdateAgentMetrics != nil { - errGroup.Go(func() error { - user, err := api.Database.GetUserByID(ctx, workspace.OwnerID) - if err != nil { - return xerrors.Errorf("can't get user: %w", err) - } - - api.Options.UpdateAgentMetrics(ctx, prometheusmetrics.AgentMetricLabels{ - Username: user.Username, - WorkspaceName: workspace.Name, - AgentName: workspaceAgent.Name, - TemplateName: row.TemplateName, - }, protoStats.Metrics) - return nil - }) - } - err = errGroup.Wait() + err = api.statsReporter.ReportAgentStats( + ctx, + dbtime.Now(), + workspace, + workspaceAgent, + row.TemplateName, + protoStats, + ) if err != nil { httpapi.InternalServerError(rw, err) return diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index 73f0c74cd765a..e99b6a297c103 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -939,32 +939,6 @@ func TestWorkspaceAgentReportStats(t *testing.T) { agentClient.SetSessionToken(r.AgentToken) _, err := agentClient.PostStats(context.Background(), &agentsdk.Stats{ - ConnectionsByProto: map[string]int64{"TCP": 1}, - // Set connection count to 1 but all session counts to zero to - // assert we aren't updating last_used_at for a connections that may - // be spawned passively by the dashboard. - ConnectionCount: 1, - RxPackets: 1, - RxBytes: 1, - TxPackets: 1, - TxBytes: 1, - SessionCountVSCode: 0, - SessionCountJetBrains: 0, - SessionCountReconnectingPTY: 0, - SessionCountSSH: 0, - ConnectionMedianLatencyMS: 10, - }) - require.NoError(t, err) - - newWorkspace, err := client.Workspace(context.Background(), r.Workspace.ID) - require.NoError(t, err) - - assert.True(t, - newWorkspace.LastUsedAt.Equal(r.Workspace.LastUsedAt), - "%s and %s should not differ", newWorkspace.LastUsedAt, r.Workspace.LastUsedAt, - ) - - _, err = agentClient.PostStats(context.Background(), &agentsdk.Stats{ ConnectionsByProto: map[string]int64{"TCP": 1}, ConnectionCount: 1, RxPackets: 1, @@ -979,7 +953,7 @@ func TestWorkspaceAgentReportStats(t *testing.T) { }) require.NoError(t, err) - newWorkspace, err = client.Workspace(context.Background(), r.Workspace.ID) + newWorkspace, err := client.Workspace(context.Background(), r.Workspace.ID) require.NoError(t, err) assert.True(t, diff --git a/coderd/workspaceagentsrpc.go b/coderd/workspaceagentsrpc.go index a0cd4c1032e97..24b6088ddd8f2 100644 --- a/coderd/workspaceagentsrpc.go +++ b/coderd/workspaceagentsrpc.go @@ -132,7 +132,7 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) { TailnetCoordinator: &api.TailnetCoordinator, TemplateScheduleStore: api.TemplateScheduleStore, AppearanceFetcher: &api.AppearanceFetcher, - StatsBatcher: api.statsBatcher, + StatsReporter: api.statsReporter, PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate, PublishWorkspaceAgentLogsUpdateFn: api.publishWorkspaceAgentLogsUpdate, diff --git a/coderd/workspaceapps/apptest/apptest.go b/coderd/workspaceapps/apptest/apptest.go index 851d8ff144eb0..3cd5e5a2f9935 100644 --- a/coderd/workspaceapps/apptest/apptest.go +++ b/coderd/workspaceapps/apptest/apptest.go @@ -1688,7 +1688,7 @@ func (r *fakeStatsReporter) stats() []workspaceapps.StatsReport { return r.s } -func (r *fakeStatsReporter) Report(_ context.Context, stats []workspaceapps.StatsReport) error { +func (r *fakeStatsReporter) ReportAppStats(_ context.Context, stats []workspaceapps.StatsReport) error { r.mu.Lock() r.s = append(r.s, stats...) r.mu.Unlock() diff --git a/coderd/workspaceapps/stats.go b/coderd/workspaceapps/stats.go index 76a60c6fbb5df..53f9109c254b7 100644 --- a/coderd/workspaceapps/stats.go +++ b/coderd/workspaceapps/stats.go @@ -10,10 +10,8 @@ import ( "cdr.dev/slog" - "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/database/dbtime" - "github.com/coder/coder/v2/coderd/util/slice" ) const ( @@ -52,100 +50,7 @@ func newStatsReportFromSignedToken(token SignedToken) StatsReport { // StatsReporter reports workspace app StatsReports. type StatsReporter interface { - Report(context.Context, []StatsReport) error -} - -var _ StatsReporter = (*StatsDBReporter)(nil) - -// StatsDBReporter writes workspace app StatsReports to the database. -type StatsDBReporter struct { - db database.Store - batchSize int -} - -// NewStatsDBReporter returns a new StatsDBReporter. -func NewStatsDBReporter(db database.Store, batchSize int) *StatsDBReporter { - return &StatsDBReporter{ - db: db, - batchSize: batchSize, - } -} - -// Report writes the given StatsReports to the database. -func (r *StatsDBReporter) Report(ctx context.Context, stats []StatsReport) error { - err := r.db.InTx(func(tx database.Store) error { - maxBatchSize := r.batchSize - if len(stats) < maxBatchSize { - maxBatchSize = len(stats) - } - batch := database.InsertWorkspaceAppStatsParams{ - UserID: make([]uuid.UUID, 0, maxBatchSize), - WorkspaceID: make([]uuid.UUID, 0, maxBatchSize), - AgentID: make([]uuid.UUID, 0, maxBatchSize), - AccessMethod: make([]string, 0, maxBatchSize), - SlugOrPort: make([]string, 0, maxBatchSize), - SessionID: make([]uuid.UUID, 0, maxBatchSize), - SessionStartedAt: make([]time.Time, 0, maxBatchSize), - SessionEndedAt: make([]time.Time, 0, maxBatchSize), - Requests: make([]int32, 0, maxBatchSize), - } - for _, stat := range stats { - batch.UserID = append(batch.UserID, stat.UserID) - batch.WorkspaceID = append(batch.WorkspaceID, stat.WorkspaceID) - batch.AgentID = append(batch.AgentID, stat.AgentID) - batch.AccessMethod = append(batch.AccessMethod, string(stat.AccessMethod)) - batch.SlugOrPort = append(batch.SlugOrPort, stat.SlugOrPort) - batch.SessionID = append(batch.SessionID, stat.SessionID) - batch.SessionStartedAt = append(batch.SessionStartedAt, stat.SessionStartedAt) - batch.SessionEndedAt = append(batch.SessionEndedAt, stat.SessionEndedAt) - batch.Requests = append(batch.Requests, int32(stat.Requests)) - - if len(batch.UserID) >= r.batchSize { - err := tx.InsertWorkspaceAppStats(ctx, batch) - if err != nil { - return err - } - - // Reset batch. - batch.UserID = batch.UserID[:0] - batch.WorkspaceID = batch.WorkspaceID[:0] - batch.AgentID = batch.AgentID[:0] - batch.AccessMethod = batch.AccessMethod[:0] - batch.SlugOrPort = batch.SlugOrPort[:0] - batch.SessionID = batch.SessionID[:0] - batch.SessionStartedAt = batch.SessionStartedAt[:0] - batch.SessionEndedAt = batch.SessionEndedAt[:0] - batch.Requests = batch.Requests[:0] - } - } - if len(batch.UserID) == 0 { - return nil - } - - if err := tx.InsertWorkspaceAppStats(ctx, batch); err != nil { - return err - } - - // TODO: We currently measure workspace usage based on when we get stats from it. - // There are currently two paths for this: - // 1) From SSH -> workspace agent stats POSTed from agent - // 2) From workspace apps / rpty -> workspace app stats (from coderd / wsproxy) - // Ideally we would have a single code path for this. - uniqueIDs := slice.Unique(batch.WorkspaceID) - if err := tx.BatchUpdateWorkspaceLastUsedAt(ctx, database.BatchUpdateWorkspaceLastUsedAtParams{ - IDs: uniqueIDs, - LastUsedAt: dbtime.Now(), // This isn't 100% accurate, but it's good enough. - }); err != nil { - return err - } - - return nil - }, nil) - if err != nil { - return xerrors.Errorf("insert workspace app stats failed: %w", err) - } - - return nil + ReportAppStats(context.Context, []StatsReport) error } // This should match the database unique constraint. @@ -353,7 +258,7 @@ func (sc *StatsCollector) flush(ctx context.Context) (err error) { // backlog and the stats we're about to report, but it's not worth // the complexity. if len(sc.backlog) > 0 { - err = sc.opts.Reporter.Report(ctx, sc.backlog) + err = sc.opts.Reporter.ReportAppStats(ctx, sc.backlog) if err != nil { return xerrors.Errorf("report workspace app stats from backlog failed: %w", err) } @@ -366,7 +271,7 @@ func (sc *StatsCollector) flush(ctx context.Context) (err error) { return nil } - err = sc.opts.Reporter.Report(ctx, stats) + err = sc.opts.Reporter.ReportAppStats(ctx, stats) if err != nil { sc.backlog = stats return xerrors.Errorf("report workspace app stats failed: %w", err) diff --git a/coderd/workspaceapps/stats_test.go b/coderd/workspaceapps/stats_test.go index b1c4686197743..c2c722929ea83 100644 --- a/coderd/workspaceapps/stats_test.go +++ b/coderd/workspaceapps/stats_test.go @@ -43,7 +43,7 @@ func (r *fakeReporter) setError(err error) { r.err = err } -func (r *fakeReporter) Report(_ context.Context, stats []workspaceapps.StatsReport) error { +func (r *fakeReporter) ReportAppStats(_ context.Context, stats []workspaceapps.StatsReport) error { r.mu.Lock() if r.err != nil { r.errN++ diff --git a/coderd/agentapi/activitybump.go b/coderd/workspacestats/activitybump.go similarity index 96% rename from coderd/agentapi/activitybump.go rename to coderd/workspacestats/activitybump.go index a28ba695d018e..29c7dc3686dfe 100644 --- a/coderd/agentapi/activitybump.go +++ b/coderd/workspacestats/activitybump.go @@ -1,4 +1,4 @@ -package agentapi +package workspacestats import ( "context" @@ -41,7 +41,6 @@ func ActivityBumpWorkspace(ctx context.Context, log slog.Logger, db database.Sto // low priority operations fail first. ctx, cancel := context.WithTimeout(ctx, time.Second*15) defer cancel() - // nolint:gocritic // (#13146) Will be moved soon as part of refactor. err := db.ActivityBumpWorkspace(ctx, database.ActivityBumpWorkspaceParams{ NextAutostart: nextAutostart.UTC(), WorkspaceID: workspaceID, diff --git a/coderd/agentapi/activitybump_test.go b/coderd/workspacestats/activitybump_test.go similarity index 98% rename from coderd/agentapi/activitybump_test.go rename to coderd/workspacestats/activitybump_test.go index 5c82454c97cef..3abb46b7ab343 100644 --- a/coderd/agentapi/activitybump_test.go +++ b/coderd/workspacestats/activitybump_test.go @@ -1,4 +1,4 @@ -package agentapi_test +package workspacestats_test import ( "database/sql" @@ -8,12 +8,12 @@ import ( "github.com/google/uuid" "cdr.dev/slog/sloggers/slogtest" - "github.com/coder/coder/v2/coderd/agentapi" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbgen" "github.com/coder/coder/v2/coderd/database/dbtestutil" "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/util/ptr" + "github.com/coder/coder/v2/coderd/workspacestats" "github.com/coder/coder/v2/testutil" "github.com/stretchr/testify/assert" @@ -272,7 +272,7 @@ func Test_ActivityBumpWorkspace(t *testing.T) { // Bump duration is measured from the time of the bump, so we measure from here. start := dbtime.Now() - agentapi.ActivityBumpWorkspace(ctx, log, db, bld.WorkspaceID, nextAutostart(start)) + workspacestats.ActivityBumpWorkspace(ctx, log, db, bld.WorkspaceID, nextAutostart(start)) end := dbtime.Now() // Validate our state after bump diff --git a/coderd/workspacestats/reporter.go b/coderd/workspacestats/reporter.go new file mode 100644 index 0000000000000..ec2c6a44fcb24 --- /dev/null +++ b/coderd/workspacestats/reporter.go @@ -0,0 +1,194 @@ +package workspacestats + +import ( + "context" + "sync/atomic" + "time" + + "github.com/google/uuid" + "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" + + "cdr.dev/slog" + + agentproto "github.com/coder/coder/v2/agent/proto" + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/coderd/database/pubsub" + "github.com/coder/coder/v2/coderd/prometheusmetrics" + "github.com/coder/coder/v2/coderd/schedule" + "github.com/coder/coder/v2/coderd/util/slice" + "github.com/coder/coder/v2/coderd/workspaceapps" + "github.com/coder/coder/v2/codersdk" +) + +type StatsBatcher interface { + Add(now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, workspaceID uuid.UUID, st *agentproto.Stats) error +} + +type ReporterOptions struct { + Database database.Store + Logger slog.Logger + Pubsub pubsub.Pubsub + TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore] + StatsBatcher StatsBatcher + UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) + + AppStatBatchSize int +} + +type Reporter struct { + opts ReporterOptions +} + +func NewReporter(opts ReporterOptions) *Reporter { + return &Reporter{opts: opts} +} + +func (r *Reporter) ReportAppStats(ctx context.Context, stats []workspaceapps.StatsReport) error { + err := r.opts.Database.InTx(func(tx database.Store) error { + maxBatchSize := r.opts.AppStatBatchSize + if len(stats) < maxBatchSize { + maxBatchSize = len(stats) + } + batch := database.InsertWorkspaceAppStatsParams{ + UserID: make([]uuid.UUID, 0, maxBatchSize), + WorkspaceID: make([]uuid.UUID, 0, maxBatchSize), + AgentID: make([]uuid.UUID, 0, maxBatchSize), + AccessMethod: make([]string, 0, maxBatchSize), + SlugOrPort: make([]string, 0, maxBatchSize), + SessionID: make([]uuid.UUID, 0, maxBatchSize), + SessionStartedAt: make([]time.Time, 0, maxBatchSize), + SessionEndedAt: make([]time.Time, 0, maxBatchSize), + Requests: make([]int32, 0, maxBatchSize), + } + for _, stat := range stats { + batch.UserID = append(batch.UserID, stat.UserID) + batch.WorkspaceID = append(batch.WorkspaceID, stat.WorkspaceID) + batch.AgentID = append(batch.AgentID, stat.AgentID) + batch.AccessMethod = append(batch.AccessMethod, string(stat.AccessMethod)) + batch.SlugOrPort = append(batch.SlugOrPort, stat.SlugOrPort) + batch.SessionID = append(batch.SessionID, stat.SessionID) + batch.SessionStartedAt = append(batch.SessionStartedAt, stat.SessionStartedAt) + batch.SessionEndedAt = append(batch.SessionEndedAt, stat.SessionEndedAt) + batch.Requests = append(batch.Requests, int32(stat.Requests)) + + if len(batch.UserID) >= r.opts.AppStatBatchSize { + err := tx.InsertWorkspaceAppStats(ctx, batch) + if err != nil { + return err + } + + // Reset batch. + batch.UserID = batch.UserID[:0] + batch.WorkspaceID = batch.WorkspaceID[:0] + batch.AgentID = batch.AgentID[:0] + batch.AccessMethod = batch.AccessMethod[:0] + batch.SlugOrPort = batch.SlugOrPort[:0] + batch.SessionID = batch.SessionID[:0] + batch.SessionStartedAt = batch.SessionStartedAt[:0] + batch.SessionEndedAt = batch.SessionEndedAt[:0] + batch.Requests = batch.Requests[:0] + } + } + if len(batch.UserID) == 0 { + return nil + } + + if err := tx.InsertWorkspaceAppStats(ctx, batch); err != nil { + return err + } + + // TODO: We currently measure workspace usage based on when we get stats from it. + // There are currently two paths for this: + // 1) From SSH -> workspace agent stats POSTed from agent + // 2) From workspace apps / rpty -> workspace app stats (from coderd / wsproxy) + // Ideally we would have a single code path for this. + uniqueIDs := slice.Unique(batch.WorkspaceID) + if err := tx.BatchUpdateWorkspaceLastUsedAt(ctx, database.BatchUpdateWorkspaceLastUsedAtParams{ + IDs: uniqueIDs, + LastUsedAt: dbtime.Now(), // This isn't 100% accurate, but it's good enough. + }); err != nil { + return err + } + + return nil + }, nil) + if err != nil { + return xerrors.Errorf("insert workspace app stats failed: %w", err) + } + + return nil +} + +func (r *Reporter) ReportAgentStats(ctx context.Context, now time.Time, workspace database.Workspace, workspaceAgent database.WorkspaceAgent, templateName string, stats *agentproto.Stats) error { + if stats.ConnectionCount > 0 { + var nextAutostart time.Time + if workspace.AutostartSchedule.String != "" { + templateSchedule, err := (*(r.opts.TemplateScheduleStore.Load())).Get(ctx, r.opts.Database, workspace.TemplateID) + // If the template schedule fails to load, just default to bumping + // without the next transition and log it. + if err != nil { + r.opts.Logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min", + slog.F("workspace_id", workspace.ID), + slog.F("template_id", workspace.TemplateID), + slog.Error(err), + ) + } else { + next, allowed := schedule.NextAutostart(now, workspace.AutostartSchedule.String, templateSchedule) + if allowed { + nextAutostart = next + } + } + } + ActivityBumpWorkspace(ctx, r.opts.Logger.Named("activity_bump"), r.opts.Database, workspace.ID, nextAutostart) + } + + var errGroup errgroup.Group + errGroup.Go(func() error { + err := r.opts.StatsBatcher.Add(now, workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, stats) + if err != nil { + r.opts.Logger.Error(ctx, "add agent stats to batcher", slog.Error(err)) + return xerrors.Errorf("insert workspace agent stats batch: %w", err) + } + return nil + }) + errGroup.Go(func() error { + err := r.opts.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{ + ID: workspace.ID, + LastUsedAt: now, + }) + if err != nil { + return xerrors.Errorf("update workspace LastUsedAt: %w", err) + } + return nil + }) + if r.opts.UpdateAgentMetricsFn != nil { + errGroup.Go(func() error { + user, err := r.opts.Database.GetUserByID(ctx, workspace.OwnerID) + if err != nil { + return xerrors.Errorf("get user: %w", err) + } + + r.opts.UpdateAgentMetricsFn(ctx, prometheusmetrics.AgentMetricLabels{ + Username: user.Username, + WorkspaceName: workspace.Name, + AgentName: workspaceAgent.Name, + TemplateName: templateName, + }, stats.Metrics) + return nil + }) + } + err := errGroup.Wait() + if err != nil { + return xerrors.Errorf("update stats in database: %w", err) + } + + err = r.opts.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspace.ID), []byte{}) + if err != nil { + r.opts.Logger.Warn(ctx, "failed to publish workspace agent stats", + slog.F("workspace_id", workspace.ID), slog.Error(err)) + } + + return nil +} diff --git a/enterprise/coderd/workspaceproxy.go b/enterprise/coderd/workspaceproxy.go index 22fe1bc747cbe..22bd360ccbffe 100644 --- a/enterprise/coderd/workspaceproxy.go +++ b/enterprise/coderd/workspaceproxy.go @@ -518,7 +518,7 @@ func (api *API) workspaceProxyReportAppStats(rw http.ResponseWriter, r *http.Req api.Logger.Debug(ctx, "report app stats", slog.F("stats", req.Stats)) reporter := api.WorkspaceAppsStatsCollectorOptions.Reporter - if err := reporter.Report(ctx, req.Stats); err != nil { + if err := reporter.ReportAppStats(ctx, req.Stats); err != nil { api.Logger.Error(ctx, "report app stats failed", slog.Error(err)) httpapi.InternalServerError(rw, err) return diff --git a/enterprise/wsproxy/appstatsreporter.go b/enterprise/wsproxy/appstatsreporter.go index 44ffe87e1a5e3..a4e420ddceea1 100644 --- a/enterprise/wsproxy/appstatsreporter.go +++ b/enterprise/wsproxy/appstatsreporter.go @@ -13,7 +13,7 @@ type appStatsReporter struct { Client *wsproxysdk.Client } -func (r *appStatsReporter) Report(ctx context.Context, stats []workspaceapps.StatsReport) error { +func (r *appStatsReporter) ReportAppStats(ctx context.Context, stats []workspaceapps.StatsReport) error { err := r.Client.ReportAppStats(ctx, wsproxysdk.ReportAppStatsRequest{ Stats: stats, }) diff --git a/scripts/rules.go b/scripts/rules.go index 2ff2a503b8503..46aebabab4a1a 100644 --- a/scripts/rules.go +++ b/scripts/rules.go @@ -468,7 +468,7 @@ func withTimezoneUTC(m dsl.Matcher) { At(m["tz"]) } -// workspaceActivity ensures that updating workspace activity is only done in the workspaceapps package. +// workspaceActivity ensures that updating workspace activity is only done in the workspacestats package. // //nolint:unused,deadcode,varnamelen func workspaceActivity(m dsl.Matcher) { @@ -481,9 +481,9 @@ func workspaceActivity(m dsl.Matcher) { `$_.InsertWorkspaceAgentStats($_, $_)`, `$_.InsertWorkspaceAppStats($_, $_)`, ).Where( - !m.File().PkgPath.Matches(`workspaceapps`) && + !m.File().PkgPath.Matches(`workspacestats`) && !m.File().PkgPath.Matches(`dbauthz$`) && !m.File().PkgPath.Matches(`dbgen$`) && !m.File().Name.Matches(`_test\.go$`), - ).Report("Updating workspace activity should always be done in the workspaceapps package.") + ).Report("Updating workspace activity should always be done in the workspacestats package.") }