diff --git a/coderd/apidoc/docs.go b/coderd/apidoc/docs.go index 7a4ae642630e0..00f1b57ef7f8c 100644 --- a/coderd/apidoc/docs.go +++ b/coderd/apidoc/docs.go @@ -5491,6 +5491,42 @@ const docTemplate = `{ } } }, + "/workspaceproxies/me/app-stats": { + "post": { + "security": [ + { + "CoderSessionToken": [] + } + ], + "consumes": [ + "application/json" + ], + "tags": [ + "Enterprise" + ], + "summary": "Report workspace app stats", + "operationId": "report-workspace-app-stats", + "parameters": [ + { + "description": "Report app stats request", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/wsproxysdk.ReportAppStatsRequest" + } + } + ], + "responses": { + "204": { + "description": "No Content" + } + }, + "x-apidocgen": { + "skip": true + } + } + }, "/workspaceproxies/me/coordinate": { "get": { "security": [ @@ -11798,6 +11834,39 @@ const docTemplate = `{ } } }, + "workspaceapps.StatsReport": { + "type": "object", + "properties": { + "access_method": { + "$ref": "#/definitions/workspaceapps.AccessMethod" + }, + "agent_id": { + "type": "string" + }, + "requests": { + "type": "integer" + }, + "session_ended_at": { + "description": "Updated periodically while app is in use active and when the last connection is closed.", + "type": "string" + }, + "session_id": { + "type": "string" + }, + "session_started_at": { + "type": "string" + }, + "slug_or_port": { + "type": "string" + }, + "user_id": { + "type": "string" + }, + "workspace_id": { + "type": "string" + } + } + }, "wsproxysdk.AgentIsLegacyResponse": { "type": "object", "properties": { @@ -11888,6 +11957,17 @@ const docTemplate = `{ } } } + }, + "wsproxysdk.ReportAppStatsRequest": { + "type": "object", + "properties": { + "stats": { + "type": "array", + "items": { + "$ref": "#/definitions/workspaceapps.StatsReport" + } + } + } } }, "securityDefinitions": { diff --git a/coderd/apidoc/swagger.json b/coderd/apidoc/swagger.json index a5e27fa426fff..eef5f6d0dae2a 100644 --- a/coderd/apidoc/swagger.json +++ b/coderd/apidoc/swagger.json @@ -4841,6 +4841,38 @@ } } }, + "/workspaceproxies/me/app-stats": { + "post": { + "security": [ + { + "CoderSessionToken": [] + } + ], + "consumes": ["application/json"], + "tags": ["Enterprise"], + "summary": "Report workspace app stats", + "operationId": "report-workspace-app-stats", + "parameters": [ + { + "description": "Report app stats request", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/wsproxysdk.ReportAppStatsRequest" + } + } + ], + "responses": { + "204": { + "description": "No Content" + } + }, + "x-apidocgen": { + "skip": true + } + } + }, "/workspaceproxies/me/coordinate": { "get": { "security": [ @@ -10769,6 +10801,39 @@ } } }, + "workspaceapps.StatsReport": { + "type": "object", + "properties": { + "access_method": { + "$ref": "#/definitions/workspaceapps.AccessMethod" + }, + "agent_id": { + "type": "string" + }, + "requests": { + "type": "integer" + }, + "session_ended_at": { + "description": "Updated periodically while app is in use active and when the last connection is closed.", + "type": "string" + }, + "session_id": { + "type": "string" + }, + "session_started_at": { + "type": "string" + }, + "slug_or_port": { + "type": "string" + }, + "user_id": { + "type": "string" + }, + "workspace_id": { + "type": "string" + } + } + }, "wsproxysdk.AgentIsLegacyResponse": { "type": "object", "properties": { @@ -10859,6 +10924,17 @@ } } } + }, + "wsproxysdk.ReportAppStatsRequest": { + "type": "object", + "properties": { + "stats": { + "type": "array", + "items": { + "$ref": "#/definitions/workspaceapps.StatsReport" + } + } + } } }, "securityDefinitions": { diff --git a/coderd/coderd.go b/coderd/coderd.go index f7feca38bd549..6ffe623699aa4 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -162,6 +162,8 @@ type Options struct { UpdateAgentMetrics func(ctx context.Context, username, workspaceName, agentName string, metrics []agentsdk.AgentMetric) StatsBatcher *batchstats.Batcher + + WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions } // @title Coder API @@ -418,8 +420,17 @@ func New(options *Options) *API { } } + workspaceAppsLogger := options.Logger.Named("workspaceapps") + if options.WorkspaceAppsStatsCollectorOptions.Logger == nil { + named := workspaceAppsLogger.Named("stats_collector") + options.WorkspaceAppsStatsCollectorOptions.Logger = &named + } + if options.WorkspaceAppsStatsCollectorOptions.Reporter == nil { + options.WorkspaceAppsStatsCollectorOptions.Reporter = workspaceapps.NewStatsDBReporter(options.Database, workspaceapps.DefaultStatsDBReporterBatchSize) + } + api.workspaceAppServer = &workspaceapps.Server{ - Logger: options.Logger.Named("workspaceapps"), + Logger: workspaceAppsLogger, DashboardURL: api.AccessURL, AccessURL: api.AccessURL, @@ -430,6 +441,7 @@ func New(options *Options) *API { SignedTokenProvider: api.WorkspaceAppsProvider, AgentProvider: api.agentProvider, AppSecurityKey: options.AppSecurityKey, + StatsCollector: workspaceapps.NewStatsCollector(options.WorkspaceAppsStatsCollectorOptions), DisablePathApps: options.DeploymentValues.DisablePathApps.Value(), SecureAuthCookie: options.DeploymentValues.SecureAuthCookie.Value(), @@ -1020,6 +1032,7 @@ func (api *API) Close() error { if api.updateChecker != nil { api.updateChecker.Close() } + _ = api.workspaceAppServer.Close() coordinator := api.TailnetCoordinator.Load() if coordinator != nil { _ = (*coordinator).Close() diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index 04470509682e2..a53fc75353cae 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -144,6 +144,8 @@ type Options struct { // as part of your test. Logger *slog.Logger StatsBatcher *batchstats.Batcher + + WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions } // New constructs a codersdk client connected to an in-memory API instance. @@ -394,37 +396,38 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can Pubsub: options.Pubsub, GitAuthConfigs: options.GitAuthConfigs, - Auditor: options.Auditor, - AWSCertificates: options.AWSCertificates, - AzureCertificates: options.AzureCertificates, - GithubOAuth2Config: options.GithubOAuth2Config, - RealIPConfig: options.RealIPConfig, - OIDCConfig: options.OIDCConfig, - GoogleTokenValidator: options.GoogleTokenValidator, - SSHKeygenAlgorithm: options.SSHKeygenAlgorithm, - DERPServer: derpServer, - APIRateLimit: options.APIRateLimit, - LoginRateLimit: options.LoginRateLimit, - FilesRateLimit: options.FilesRateLimit, - Authorizer: options.Authorizer, - Telemetry: telemetry.NewNoop(), - TemplateScheduleStore: &templateScheduleStore, - TLSCertificates: options.TLSCertificates, - TrialGenerator: options.TrialGenerator, - TailnetCoordinator: options.Coordinator, - BaseDERPMap: derpMap, - DERPMapUpdateFrequency: 150 * time.Millisecond, - MetricsCacheRefreshInterval: options.MetricsCacheRefreshInterval, - AgentStatsRefreshInterval: options.AgentStatsRefreshInterval, - DeploymentValues: options.DeploymentValues, - UpdateCheckOptions: options.UpdateCheckOptions, - SwaggerEndpoint: options.SwaggerEndpoint, - AppSecurityKey: AppSecurityKey, - SSHConfig: options.ConfigSSH, - HealthcheckFunc: options.HealthcheckFunc, - HealthcheckTimeout: options.HealthcheckTimeout, - HealthcheckRefresh: options.HealthcheckRefresh, - StatsBatcher: options.StatsBatcher, + Auditor: options.Auditor, + AWSCertificates: options.AWSCertificates, + AzureCertificates: options.AzureCertificates, + GithubOAuth2Config: options.GithubOAuth2Config, + RealIPConfig: options.RealIPConfig, + OIDCConfig: options.OIDCConfig, + GoogleTokenValidator: options.GoogleTokenValidator, + SSHKeygenAlgorithm: options.SSHKeygenAlgorithm, + DERPServer: derpServer, + APIRateLimit: options.APIRateLimit, + LoginRateLimit: options.LoginRateLimit, + FilesRateLimit: options.FilesRateLimit, + Authorizer: options.Authorizer, + Telemetry: telemetry.NewNoop(), + TemplateScheduleStore: &templateScheduleStore, + TLSCertificates: options.TLSCertificates, + TrialGenerator: options.TrialGenerator, + TailnetCoordinator: options.Coordinator, + BaseDERPMap: derpMap, + DERPMapUpdateFrequency: 150 * time.Millisecond, + MetricsCacheRefreshInterval: options.MetricsCacheRefreshInterval, + AgentStatsRefreshInterval: options.AgentStatsRefreshInterval, + DeploymentValues: options.DeploymentValues, + UpdateCheckOptions: options.UpdateCheckOptions, + SwaggerEndpoint: options.SwaggerEndpoint, + AppSecurityKey: AppSecurityKey, + SSHConfig: options.ConfigSSH, + HealthcheckFunc: options.HealthcheckFunc, + HealthcheckTimeout: options.HealthcheckTimeout, + HealthcheckRefresh: options.HealthcheckRefresh, + StatsBatcher: options.StatsBatcher, + WorkspaceAppsStatsCollectorOptions: options.WorkspaceAppsStatsCollectorOptions, } } diff --git a/coderd/database/dbauthz/dbauthz.go b/coderd/database/dbauthz/dbauthz.go index 76d998a83dffe..896040efc368f 100644 --- a/coderd/database/dbauthz/dbauthz.go +++ b/coderd/database/dbauthz/dbauthz.go @@ -2046,6 +2046,13 @@ func (q *querier) InsertWorkspaceApp(ctx context.Context, arg database.InsertWor return q.db.InsertWorkspaceApp(ctx, arg) } +func (q *querier) InsertWorkspaceAppStats(ctx context.Context, arg database.InsertWorkspaceAppStatsParams) error { + if err := q.authorizeContext(ctx, rbac.ActionCreate, rbac.ResourceSystem); err != nil { + return err + } + return q.db.InsertWorkspaceAppStats(ctx, arg) +} + func (q *querier) InsertWorkspaceBuild(ctx context.Context, arg database.InsertWorkspaceBuildParams) error { w, err := q.db.GetWorkspaceByID(ctx, arg.WorkspaceID) if err != nil { diff --git a/coderd/database/dbfake/dbfake.go b/coderd/database/dbfake/dbfake.go index 07ba63caa17f1..8de3ee49be5c1 100644 --- a/coderd/database/dbfake/dbfake.go +++ b/coderd/database/dbfake/dbfake.go @@ -114,33 +114,35 @@ type data struct { userLinks []database.UserLink // New tables - workspaceAgentStats []database.WorkspaceAgentStat - auditLogs []database.AuditLog - files []database.File - gitAuthLinks []database.GitAuthLink - gitSSHKey []database.GitSSHKey - groupMembers []database.GroupMember - groups []database.Group - licenses []database.License - parameterSchemas []database.ParameterSchema - provisionerDaemons []database.ProvisionerDaemon - provisionerJobLogs []database.ProvisionerJobLog - provisionerJobs []database.ProvisionerJob - replicas []database.Replica - templateVersions []database.TemplateVersionTable - templateVersionParameters []database.TemplateVersionParameter - templateVersionVariables []database.TemplateVersionVariable - templates []database.TemplateTable - workspaceAgents []database.WorkspaceAgent - workspaceAgentMetadata []database.WorkspaceAgentMetadatum - workspaceAgentLogs []database.WorkspaceAgentLog - workspaceApps []database.WorkspaceApp - workspaceBuilds []database.WorkspaceBuildTable - workspaceBuildParameters []database.WorkspaceBuildParameter - workspaceResourceMetadata []database.WorkspaceResourceMetadatum - workspaceResources []database.WorkspaceResource - workspaces []database.Workspace - workspaceProxies []database.WorkspaceProxy + workspaceAgentStats []database.WorkspaceAgentStat + auditLogs []database.AuditLog + files []database.File + gitAuthLinks []database.GitAuthLink + gitSSHKey []database.GitSSHKey + groupMembers []database.GroupMember + groups []database.Group + licenses []database.License + parameterSchemas []database.ParameterSchema + provisionerDaemons []database.ProvisionerDaemon + provisionerJobLogs []database.ProvisionerJobLog + provisionerJobs []database.ProvisionerJob + replicas []database.Replica + templateVersions []database.TemplateVersionTable + templateVersionParameters []database.TemplateVersionParameter + templateVersionVariables []database.TemplateVersionVariable + templates []database.TemplateTable + workspaceAgents []database.WorkspaceAgent + workspaceAgentMetadata []database.WorkspaceAgentMetadatum + workspaceAgentLogs []database.WorkspaceAgentLog + workspaceApps []database.WorkspaceApp + workspaceAppStatsLastInsertID int64 + workspaceAppStats []database.WorkspaceAppStat + workspaceBuilds []database.WorkspaceBuildTable + workspaceBuildParameters []database.WorkspaceBuildParameter + workspaceResourceMetadata []database.WorkspaceResourceMetadatum + workspaceResources []database.WorkspaceResource + workspaces []database.Workspace + workspaceProxies []database.WorkspaceProxy // Locks is a map of lock names. Any keys within the map are currently // locked. locks map[int64]struct{} @@ -4336,6 +4338,44 @@ func (q *FakeQuerier) InsertWorkspaceApp(_ context.Context, arg database.InsertW return workspaceApp, nil } +func (q *FakeQuerier) InsertWorkspaceAppStats(_ context.Context, arg database.InsertWorkspaceAppStatsParams) error { + err := validateDatabaseType(arg) + if err != nil { + return err + } + + q.mutex.Lock() + defer q.mutex.Unlock() + +InsertWorkspaceAppStatsLoop: + for i := 0; i < len(arg.UserID); i++ { + stat := database.WorkspaceAppStat{ + ID: q.workspaceAppStatsLastInsertID + 1, + UserID: arg.UserID[i], + WorkspaceID: arg.WorkspaceID[i], + AgentID: arg.AgentID[i], + AccessMethod: arg.AccessMethod[i], + SlugOrPort: arg.SlugOrPort[i], + SessionID: arg.SessionID[i], + SessionStartedAt: arg.SessionStartedAt[i], + SessionEndedAt: arg.SessionEndedAt[i], + Requests: arg.Requests[i], + } + for j, s := range q.workspaceAppStats { + // Check unique constraint for upsert. + if s.UserID == stat.UserID && s.AgentID == stat.AgentID && s.SessionID == stat.SessionID { + q.workspaceAppStats[j].SessionEndedAt = stat.SessionEndedAt + q.workspaceAppStats[j].Requests = stat.Requests + continue InsertWorkspaceAppStatsLoop + } + } + q.workspaceAppStats = append(q.workspaceAppStats, stat) + q.workspaceAppStatsLastInsertID++ + } + + return nil +} + func (q *FakeQuerier) InsertWorkspaceBuild(_ context.Context, arg database.InsertWorkspaceBuildParams) error { if err := validateDatabaseType(arg); err != nil { return err diff --git a/coderd/database/dbmetrics/dbmetrics.go b/coderd/database/dbmetrics/dbmetrics.go index 613ac28f68c59..f2c98f6594c43 100644 --- a/coderd/database/dbmetrics/dbmetrics.go +++ b/coderd/database/dbmetrics/dbmetrics.go @@ -1264,6 +1264,13 @@ func (m metricsStore) InsertWorkspaceApp(ctx context.Context, arg database.Inser return app, err } +func (m metricsStore) InsertWorkspaceAppStats(ctx context.Context, arg database.InsertWorkspaceAppStatsParams) error { + start := time.Now() + r0 := m.s.InsertWorkspaceAppStats(ctx, arg) + m.queryLatencies.WithLabelValues("InsertWorkspaceAppStats").Observe(time.Since(start).Seconds()) + return r0 +} + func (m metricsStore) InsertWorkspaceBuild(ctx context.Context, arg database.InsertWorkspaceBuildParams) error { start := time.Now() err := m.s.InsertWorkspaceBuild(ctx, arg) diff --git a/coderd/database/dbmock/dbmock.go b/coderd/database/dbmock/dbmock.go index 49e80eb168bef..45db2e6bd4f58 100644 --- a/coderd/database/dbmock/dbmock.go +++ b/coderd/database/dbmock/dbmock.go @@ -2657,6 +2657,20 @@ func (mr *MockStoreMockRecorder) InsertWorkspaceApp(arg0, arg1 interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertWorkspaceApp", reflect.TypeOf((*MockStore)(nil).InsertWorkspaceApp), arg0, arg1) } +// InsertWorkspaceAppStats mocks base method. +func (m *MockStore) InsertWorkspaceAppStats(arg0 context.Context, arg1 database.InsertWorkspaceAppStatsParams) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InsertWorkspaceAppStats", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// InsertWorkspaceAppStats indicates an expected call of InsertWorkspaceAppStats. +func (mr *MockStoreMockRecorder) InsertWorkspaceAppStats(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertWorkspaceAppStats", reflect.TypeOf((*MockStore)(nil).InsertWorkspaceAppStats), arg0, arg1) +} + // InsertWorkspaceBuild mocks base method. func (m *MockStore) InsertWorkspaceBuild(arg0 context.Context, arg1 database.InsertWorkspaceBuildParams) error { m.ctrl.T.Helper() diff --git a/coderd/database/dump.sql b/coderd/database/dump.sql index 95a1e3534ae8b..71106e5da771d 100644 --- a/coderd/database/dump.sql +++ b/coderd/database/dump.sql @@ -812,6 +812,50 @@ COMMENT ON COLUMN workspace_agents.started_at IS 'The time the agent entered the COMMENT ON COLUMN workspace_agents.ready_at IS 'The time the agent entered the ready or start_error lifecycle state'; +CREATE TABLE workspace_app_stats ( + id bigint NOT NULL, + user_id uuid NOT NULL, + workspace_id uuid NOT NULL, + agent_id uuid NOT NULL, + access_method text NOT NULL, + slug_or_port text NOT NULL, + session_id uuid NOT NULL, + session_started_at timestamp with time zone NOT NULL, + session_ended_at timestamp with time zone NOT NULL, + requests integer NOT NULL +); + +COMMENT ON TABLE workspace_app_stats IS 'A record of workspace app usage statistics'; + +COMMENT ON COLUMN workspace_app_stats.id IS 'The ID of the record'; + +COMMENT ON COLUMN workspace_app_stats.user_id IS 'The user who used the workspace app'; + +COMMENT ON COLUMN workspace_app_stats.workspace_id IS 'The workspace that the workspace app was used in'; + +COMMENT ON COLUMN workspace_app_stats.agent_id IS 'The workspace agent that was used'; + +COMMENT ON COLUMN workspace_app_stats.access_method IS 'The method used to access the workspace app'; + +COMMENT ON COLUMN workspace_app_stats.slug_or_port IS 'The slug or port used to to identify the app'; + +COMMENT ON COLUMN workspace_app_stats.session_id IS 'The unique identifier for the session'; + +COMMENT ON COLUMN workspace_app_stats.session_started_at IS 'The time the session started'; + +COMMENT ON COLUMN workspace_app_stats.session_ended_at IS 'The time the session ended'; + +COMMENT ON COLUMN workspace_app_stats.requests IS 'The number of requests made during the session, a number larger than 1 indicates that multiple sessions were rolled up into one'; + +CREATE SEQUENCE workspace_app_stats_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + +ALTER SEQUENCE workspace_app_stats_id_seq OWNED BY workspace_app_stats.id; + CREATE TABLE workspace_apps ( id uuid NOT NULL, created_at timestamp with time zone NOT NULL, @@ -969,6 +1013,8 @@ ALTER TABLE ONLY provisioner_job_logs ALTER COLUMN id SET DEFAULT nextval('provi ALTER TABLE ONLY workspace_agent_logs ALTER COLUMN id SET DEFAULT nextval('workspace_agent_startup_logs_id_seq'::regclass); +ALTER TABLE ONLY workspace_app_stats ALTER COLUMN id SET DEFAULT nextval('workspace_app_stats_id_seq'::regclass); + ALTER TABLE ONLY workspace_proxies ALTER COLUMN region_id SET DEFAULT nextval('workspace_proxies_region_id_seq'::regclass); ALTER TABLE ONLY workspace_resource_metadata ALTER COLUMN id SET DEFAULT nextval('workspace_resource_metadata_id_seq'::regclass); @@ -1081,6 +1127,12 @@ ALTER TABLE ONLY workspace_agent_logs ALTER TABLE ONLY workspace_agents ADD CONSTRAINT workspace_agents_pkey PRIMARY KEY (id); +ALTER TABLE ONLY workspace_app_stats + ADD CONSTRAINT workspace_app_stats_pkey PRIMARY KEY (id); + +ALTER TABLE ONLY workspace_app_stats + ADD CONSTRAINT workspace_app_stats_user_id_agent_id_session_id_key UNIQUE (user_id, agent_id, session_id); + ALTER TABLE ONLY workspace_apps ADD CONSTRAINT workspace_apps_agent_id_slug_idx UNIQUE (agent_id, slug); @@ -1167,6 +1219,8 @@ CREATE INDEX workspace_agents_auth_token_idx ON workspace_agents USING btree (au CREATE INDEX workspace_agents_resource_id_idx ON workspace_agents USING btree (resource_id); +CREATE INDEX workspace_app_stats_workspace_id_idx ON workspace_app_stats USING btree (workspace_id); + CREATE UNIQUE INDEX workspace_proxies_lower_name_idx ON workspace_proxies USING btree (lower(name)) WHERE (deleted = false); CREATE INDEX workspace_resources_job_id_idx ON workspace_resources USING btree (job_id); @@ -1252,6 +1306,15 @@ ALTER TABLE ONLY workspace_agent_logs ALTER TABLE ONLY workspace_agents ADD CONSTRAINT workspace_agents_resource_id_fkey FOREIGN KEY (resource_id) REFERENCES workspace_resources(id) ON DELETE CASCADE; +ALTER TABLE ONLY workspace_app_stats + ADD CONSTRAINT workspace_app_stats_agent_id_fkey FOREIGN KEY (agent_id) REFERENCES workspace_agents(id); + +ALTER TABLE ONLY workspace_app_stats + ADD CONSTRAINT workspace_app_stats_user_id_fkey FOREIGN KEY (user_id) REFERENCES users(id); + +ALTER TABLE ONLY workspace_app_stats + ADD CONSTRAINT workspace_app_stats_workspace_id_fkey FOREIGN KEY (workspace_id) REFERENCES workspaces(id); + ALTER TABLE ONLY workspace_apps ADD CONSTRAINT workspace_apps_agent_id_fkey FOREIGN KEY (agent_id) REFERENCES workspace_agents(id) ON DELETE CASCADE; diff --git a/coderd/database/migrations/000150_workspace_app_stats.down.sql b/coderd/database/migrations/000150_workspace_app_stats.down.sql new file mode 100644 index 0000000000000..983a13c180edc --- /dev/null +++ b/coderd/database/migrations/000150_workspace_app_stats.down.sql @@ -0,0 +1 @@ +DROP TABLE workspace_app_stats; diff --git a/coderd/database/migrations/000150_workspace_app_stats.up.sql b/coderd/database/migrations/000150_workspace_app_stats.up.sql new file mode 100644 index 0000000000000..ace09e52760f6 --- /dev/null +++ b/coderd/database/migrations/000150_workspace_app_stats.up.sql @@ -0,0 +1,32 @@ +CREATE TABLE workspace_app_stats ( + id BIGSERIAL PRIMARY KEY, + user_id uuid NOT NULL REFERENCES users (id), + workspace_id uuid NOT NULL REFERENCES workspaces (id), + agent_id uuid NOT NULL REFERENCES workspace_agents (id), + access_method text NOT NULL, + slug_or_port text NOT NULL, + session_id uuid NOT NULL, + session_started_at timestamptz NOT NULL, + session_ended_at timestamptz NOT NULL, + requests integer NOT NULL, + + -- Set a unique constraint to allow upserting the session_ended_at + -- and requests fields without risk of collisions. + UNIQUE(user_id, agent_id, session_id) +); + +COMMENT ON TABLE workspace_app_stats IS 'A record of workspace app usage statistics'; + +COMMENT ON COLUMN workspace_app_stats.id IS 'The ID of the record'; +COMMENT ON COLUMN workspace_app_stats.user_id IS 'The user who used the workspace app'; +COMMENT ON COLUMN workspace_app_stats.workspace_id IS 'The workspace that the workspace app was used in'; +COMMENT ON COLUMN workspace_app_stats.agent_id IS 'The workspace agent that was used'; +COMMENT ON COLUMN workspace_app_stats.access_method IS 'The method used to access the workspace app'; +COMMENT ON COLUMN workspace_app_stats.slug_or_port IS 'The slug or port used to to identify the app'; +COMMENT ON COLUMN workspace_app_stats.session_id IS 'The unique identifier for the session'; +COMMENT ON COLUMN workspace_app_stats.session_started_at IS 'The time the session started'; +COMMENT ON COLUMN workspace_app_stats.session_ended_at IS 'The time the session ended'; +COMMENT ON COLUMN workspace_app_stats.requests IS 'The number of requests made during the session, a number larger than 1 indicates that multiple sessions were rolled up into one'; + +-- Create index on workspace_id for joining/filtering by templates. +CREATE INDEX workspace_app_stats_workspace_id_idx ON workspace_app_stats (workspace_id); diff --git a/coderd/database/migrations/testdata/fixtures/000150_workspace_app_usage_stats.up.sql b/coderd/database/migrations/testdata/fixtures/000150_workspace_app_usage_stats.up.sql new file mode 100644 index 0000000000000..9a9a8f0fa72dc --- /dev/null +++ b/coderd/database/migrations/testdata/fixtures/000150_workspace_app_usage_stats.up.sql @@ -0,0 +1,133 @@ +INSERT INTO public.workspace_app_stats ( + id, + user_id, + workspace_id, + agent_id, + access_method, + slug_or_port, + session_id, + session_started_at, + session_ended_at, + requests +) +VALUES + ( + 1498, + '30095c71-380b-457a-8995-97b8ee6e5307', + '3a9a1feb-e89d-457c-9d53-ac751b198ebe', + '7a1ce5f8-8d00-431c-ad1b-97a846512804', + 'path', + 'code-server', + '562cbfb8-3d9a-4018-9c04-e8159d5aa43e', + '2023-08-14 20:15:00+00', + '2023-08-14 20:16:00+00', + 1 + ), + ( + 59, + '30095c71-380b-457a-8995-97b8ee6e5307', + '3a9a1feb-e89d-457c-9d53-ac751b198ebe', + '7a1ce5f8-8d00-431c-ad1b-97a846512804', + 'terminal', + '', + '281919d0-5d99-48fb-8a93-2c3019010387', + '2023-08-14 14:15:40.085827+00', + '2023-08-14 14:17:41.295989+00', + 1 + ), + ( + 58, + '30095c71-380b-457a-8995-97b8ee6e5307', + '3a9a1feb-e89d-457c-9d53-ac751b198ebe', + '7a1ce5f8-8d00-431c-ad1b-97a846512804', + 'path', + 'code-server', + '5b7c9d43-19e6-4401-997b-c26de2c86c55', + '2023-08-14 14:15:34.620496+00', + '2023-08-14 23:58:37.158964+00', + 1 + ), + ( + 57, + '30095c71-380b-457a-8995-97b8ee6e5307', + '3a9a1feb-e89d-457c-9d53-ac751b198ebe', + '7a1ce5f8-8d00-431c-ad1b-97a846512804', + 'path', + 'code-server', + 'fe546a68-0921-4a2b-bced-5dc5c5635576', + '2023-08-14 14:15:34.129002+00', + '2023-08-14 23:58:37.158901+00', + 1 + ), + ( + 56, + '30095c71-380b-457a-8995-97b8ee6e5307', + '3a9a1feb-e89d-457c-9d53-ac751b198ebe', + '7a1ce5f8-8d00-431c-ad1b-97a846512804', + 'path', + 'code-server', + '96e4e857-598c-4881-bc40-e13008b48bb0', + '2023-08-14 14:15:00+00', + '2023-08-14 14:16:00+00', + 36 + ), + ( + 7, + '30095c71-380b-457a-8995-97b8ee6e5307', + '3a9a1feb-e89d-457c-9d53-ac751b198ebe', + '7a1ce5f8-8d00-431c-ad1b-97a846512804', + 'terminal', + '', + '95d22d41-0fde-447b-9743-0b8583edb60a', + '2023-08-14 13:00:28.732837+00', + '2023-08-14 13:09:23.990797+00', + 1 + ), + ( + 4, + '30095c71-380b-457a-8995-97b8ee6e5307', + '3a9a1feb-e89d-457c-9d53-ac751b198ebe', + '7a1ce5f8-8d00-431c-ad1b-97a846512804', + 'path', + 'code-server', + '442688ce-f9e7-46df-ba3d-623ef9a1d30d', + '2023-08-14 13:00:12.843977+00', + '2023-08-14 13:09:26.276696+00', + 1 + ), + ( + 3, + '30095c71-380b-457a-8995-97b8ee6e5307', + '3a9a1feb-e89d-457c-9d53-ac751b198ebe', + '7a1ce5f8-8d00-431c-ad1b-97a846512804', + 'path', + 'code-server', + 'f963c4f0-55b7-4813-8b61-ea58536754db', + '2023-08-14 13:00:12.323196+00', + '2023-08-14 13:09:26.277073+00', + 1 + ), + ( + 2, + '30095c71-380b-457a-8995-97b8ee6e5307', + '3a9a1feb-e89d-457c-9d53-ac751b198ebe', + '7a1ce5f8-8d00-431c-ad1b-97a846512804', + 'terminal', + '', + '5a034459-73e4-4642-91b8-80b0f718f29e', + '2023-08-14 13:00:00+00', + '2023-08-14 13:01:00+00', + 4 + ), + ( + 1, + '30095c71-380b-457a-8995-97b8ee6e5307', + '3a9a1feb-e89d-457c-9d53-ac751b198ebe', + '7a1ce5f8-8d00-431c-ad1b-97a846512804', + 'path', + 'code-server', + 'd7a0d8e1-069e-421d-b876-b5d0ddbcaf6d', + '2023-08-14 13:00:00+00', + '2023-08-14 13:01:00+00', + 36 + ); diff --git a/coderd/database/models.go b/coderd/database/models.go index b51671afaa5da..e795049c16413 100644 --- a/coderd/database/models.go +++ b/coderd/database/models.go @@ -2016,6 +2016,30 @@ type WorkspaceApp struct { External bool `db:"external" json:"external"` } +// A record of workspace app usage statistics +type WorkspaceAppStat struct { + // The ID of the record + ID int64 `db:"id" json:"id"` + // The user who used the workspace app + UserID uuid.UUID `db:"user_id" json:"user_id"` + // The workspace that the workspace app was used in + WorkspaceID uuid.UUID `db:"workspace_id" json:"workspace_id"` + // The workspace agent that was used + AgentID uuid.UUID `db:"agent_id" json:"agent_id"` + // The method used to access the workspace app + AccessMethod string `db:"access_method" json:"access_method"` + // The slug or port used to to identify the app + SlugOrPort string `db:"slug_or_port" json:"slug_or_port"` + // The unique identifier for the session + SessionID uuid.UUID `db:"session_id" json:"session_id"` + // The time the session started + SessionStartedAt time.Time `db:"session_started_at" json:"session_started_at"` + // The time the session ended + SessionEndedAt time.Time `db:"session_ended_at" json:"session_ended_at"` + // The number of requests made during the session, a number larger than 1 indicates that multiple sessions were rolled up into one + Requests int32 `db:"requests" json:"requests"` +} + // Joins in the username + avatar url of the initiated by user. type WorkspaceBuild struct { ID uuid.UUID `db:"id" json:"id"` diff --git a/coderd/database/querier.go b/coderd/database/querier.go index 7280617ef878f..d9f0d31df5232 100644 --- a/coderd/database/querier.go +++ b/coderd/database/querier.go @@ -233,6 +233,7 @@ type sqlcQuerier interface { InsertWorkspaceAgentStat(ctx context.Context, arg InsertWorkspaceAgentStatParams) (WorkspaceAgentStat, error) InsertWorkspaceAgentStats(ctx context.Context, arg InsertWorkspaceAgentStatsParams) error InsertWorkspaceApp(ctx context.Context, arg InsertWorkspaceAppParams) (WorkspaceApp, error) + InsertWorkspaceAppStats(ctx context.Context, arg InsertWorkspaceAppStatsParams) error InsertWorkspaceBuild(ctx context.Context, arg InsertWorkspaceBuildParams) error InsertWorkspaceBuildParameters(ctx context.Context, arg InsertWorkspaceBuildParametersParams) error InsertWorkspaceProxy(ctx context.Context, arg InsertWorkspaceProxyParams) (WorkspaceProxy, error) diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index a7cf539b3bc6a..a5f48eb9006d1 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -7831,6 +7831,72 @@ func (q *sqlQuerier) UpdateWorkspaceAppHealthByID(ctx context.Context, arg Updat return err } +const insertWorkspaceAppStats = `-- name: InsertWorkspaceAppStats :exec +INSERT INTO + workspace_app_stats ( + user_id, + workspace_id, + agent_id, + access_method, + slug_or_port, + session_id, + session_started_at, + session_ended_at, + requests + ) +SELECT + unnest($1::uuid[]) AS user_id, + unnest($2::uuid[]) AS workspace_id, + unnest($3::uuid[]) AS agent_id, + unnest($4::text[]) AS access_method, + unnest($5::text[]) AS slug_or_port, + unnest($6::uuid[]) AS session_id, + unnest($7::timestamptz[]) AS session_started_at, + unnest($8::timestamptz[]) AS session_ended_at, + unnest($9::int[]) AS requests +ON CONFLICT + (user_id, agent_id, session_id) +DO + UPDATE SET + session_ended_at = EXCLUDED.session_ended_at, + requests = EXCLUDED.requests + WHERE + workspace_app_stats.user_id = EXCLUDED.user_id + AND workspace_app_stats.agent_id = EXCLUDED.agent_id + AND workspace_app_stats.session_id = EXCLUDED.session_id + -- Since stats are updated in place as time progresses, we only + -- want to update this row if it's fresh. + AND workspace_app_stats.session_ended_at <= EXCLUDED.session_ended_at + AND workspace_app_stats.requests <= EXCLUDED.requests +` + +type InsertWorkspaceAppStatsParams struct { + UserID []uuid.UUID `db:"user_id" json:"user_id"` + WorkspaceID []uuid.UUID `db:"workspace_id" json:"workspace_id"` + AgentID []uuid.UUID `db:"agent_id" json:"agent_id"` + AccessMethod []string `db:"access_method" json:"access_method"` + SlugOrPort []string `db:"slug_or_port" json:"slug_or_port"` + SessionID []uuid.UUID `db:"session_id" json:"session_id"` + SessionStartedAt []time.Time `db:"session_started_at" json:"session_started_at"` + SessionEndedAt []time.Time `db:"session_ended_at" json:"session_ended_at"` + Requests []int32 `db:"requests" json:"requests"` +} + +func (q *sqlQuerier) InsertWorkspaceAppStats(ctx context.Context, arg InsertWorkspaceAppStatsParams) error { + _, err := q.db.ExecContext(ctx, insertWorkspaceAppStats, + pq.Array(arg.UserID), + pq.Array(arg.WorkspaceID), + pq.Array(arg.AgentID), + pq.Array(arg.AccessMethod), + pq.Array(arg.SlugOrPort), + pq.Array(arg.SessionID), + pq.Array(arg.SessionStartedAt), + pq.Array(arg.SessionEndedAt), + pq.Array(arg.Requests), + ) + return err +} + const getWorkspaceBuildParameters = `-- name: GetWorkspaceBuildParameters :many SELECT workspace_build_id, name, value diff --git a/coderd/database/queries/workspaceappstats.sql b/coderd/database/queries/workspaceappstats.sql new file mode 100644 index 0000000000000..98da75e6972c7 --- /dev/null +++ b/coderd/database/queries/workspaceappstats.sql @@ -0,0 +1,37 @@ +-- name: InsertWorkspaceAppStats :exec +INSERT INTO + workspace_app_stats ( + user_id, + workspace_id, + agent_id, + access_method, + slug_or_port, + session_id, + session_started_at, + session_ended_at, + requests + ) +SELECT + unnest(@user_id::uuid[]) AS user_id, + unnest(@workspace_id::uuid[]) AS workspace_id, + unnest(@agent_id::uuid[]) AS agent_id, + unnest(@access_method::text[]) AS access_method, + unnest(@slug_or_port::text[]) AS slug_or_port, + unnest(@session_id::uuid[]) AS session_id, + unnest(@session_started_at::timestamptz[]) AS session_started_at, + unnest(@session_ended_at::timestamptz[]) AS session_ended_at, + unnest(@requests::int[]) AS requests +ON CONFLICT + (user_id, agent_id, session_id) +DO + UPDATE SET + session_ended_at = EXCLUDED.session_ended_at, + requests = EXCLUDED.requests + WHERE + workspace_app_stats.user_id = EXCLUDED.user_id + AND workspace_app_stats.agent_id = EXCLUDED.agent_id + AND workspace_app_stats.session_id = EXCLUDED.session_id + -- Since stats are updated in place as time progresses, we only + -- want to update this row if it's fresh. + AND workspace_app_stats.session_ended_at <= EXCLUDED.session_ended_at + AND workspace_app_stats.requests <= EXCLUDED.requests; diff --git a/coderd/database/unique_constraint.go b/coderd/database/unique_constraint.go index c8dbc831e8651..294b4b12d51af 100644 --- a/coderd/database/unique_constraint.go +++ b/coderd/database/unique_constraint.go @@ -18,6 +18,7 @@ const ( UniqueTemplateVersionParametersTemplateVersionIDNameKey UniqueConstraint = "template_version_parameters_template_version_id_name_key" // ALTER TABLE ONLY template_version_parameters ADD CONSTRAINT template_version_parameters_template_version_id_name_key UNIQUE (template_version_id, name); UniqueTemplateVersionVariablesTemplateVersionIDNameKey UniqueConstraint = "template_version_variables_template_version_id_name_key" // ALTER TABLE ONLY template_version_variables ADD CONSTRAINT template_version_variables_template_version_id_name_key UNIQUE (template_version_id, name); UniqueTemplateVersionsTemplateIDNameKey UniqueConstraint = "template_versions_template_id_name_key" // ALTER TABLE ONLY template_versions ADD CONSTRAINT template_versions_template_id_name_key UNIQUE (template_id, name); + UniqueWorkspaceAppStatsUserIDAgentIDSessionIDKey UniqueConstraint = "workspace_app_stats_user_id_agent_id_session_id_key" // ALTER TABLE ONLY workspace_app_stats ADD CONSTRAINT workspace_app_stats_user_id_agent_id_session_id_key UNIQUE (user_id, agent_id, session_id); UniqueWorkspaceAppsAgentIDSlugIndex UniqueConstraint = "workspace_apps_agent_id_slug_idx" // ALTER TABLE ONLY workspace_apps ADD CONSTRAINT workspace_apps_agent_id_slug_idx UNIQUE (agent_id, slug); UniqueWorkspaceBuildParametersWorkspaceBuildIDNameKey UniqueConstraint = "workspace_build_parameters_workspace_build_id_name_key" // ALTER TABLE ONLY workspace_build_parameters ADD CONSTRAINT workspace_build_parameters_workspace_build_id_name_key UNIQUE (workspace_build_id, name); UniqueWorkspaceBuildsJobIDKey UniqueConstraint = "workspace_builds_job_id_key" // ALTER TABLE ONLY workspace_builds ADD CONSTRAINT workspace_builds_job_id_key UNIQUE (job_id); diff --git a/coderd/workspaceapps/apptest/apptest.go b/coderd/workspaceapps/apptest/apptest.go index 1c20fd19f5b28..228ebceee90fb 100644 --- a/coderd/workspaceapps/apptest/apptest.go +++ b/coderd/workspaceapps/apptest/apptest.go @@ -16,6 +16,7 @@ import ( "runtime" "strconv" "strings" + "sync" "testing" "time" @@ -1342,6 +1343,67 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) { require.Equal(t, []string{"Origin", "X-Foobar"}, deduped) require.Equal(t, []string{"baz"}, resp.Header.Values("X-Foobar")) }) + + t.Run("ReportStats", func(t *testing.T) { + t.Parallel() + + flush := make(chan chan<- struct{}, 1) + + reporter := &fakeStatsReporter{} + appDetails := setupProxyTest(t, &DeploymentOptions{ + StatsCollectorOptions: workspaceapps.StatsCollectorOptions{ + Reporter: reporter, + ReportInterval: time.Hour, + RollupWindow: time.Minute, + + Flush: flush, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) + defer cancel() + + u := appDetails.PathAppURL(appDetails.Apps.Owner) + resp, err := requestWithRetries(ctx, t, appDetails.AppClient(t), http.MethodGet, u.String(), nil) + require.NoError(t, err) + defer resp.Body.Close() + _, err = io.Copy(io.Discard, resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + var stats []workspaceapps.StatsReport + require.Eventually(t, func() bool { + // Keep flushing until we get a non-empty stats report. + flushDone := make(chan struct{}, 1) + flush <- flushDone + <-flushDone + + stats = reporter.stats() + return len(stats) > 0 + }, testutil.WaitLong, testutil.IntervalFast, "stats not reported") + + assert.Equal(t, workspaceapps.AccessMethodPath, stats[0].AccessMethod) + assert.Equal(t, "test-app-owner", stats[0].SlugOrPort) + assert.Equal(t, 1, stats[0].Requests) + }) +} + +type fakeStatsReporter struct { + mu sync.Mutex + s []workspaceapps.StatsReport +} + +func (r *fakeStatsReporter) stats() []workspaceapps.StatsReport { + r.mu.Lock() + defer r.mu.Unlock() + return r.s +} + +func (r *fakeStatsReporter) Report(_ context.Context, stats []workspaceapps.StatsReport) error { + r.mu.Lock() + r.s = append(r.s, stats...) + r.mu.Unlock() + return nil } func testReconnectingPTY(ctx context.Context, t *testing.T, client *codersdk.Client, opts codersdk.WorkspaceAgentReconnectingPTYOpts) { diff --git a/coderd/workspaceapps/apptest/setup.go b/coderd/workspaceapps/apptest/setup.go index 4245204268391..dc3b0e0e1c40f 100644 --- a/coderd/workspaceapps/apptest/setup.go +++ b/coderd/workspaceapps/apptest/setup.go @@ -21,6 +21,7 @@ import ( "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/agent" "github.com/coder/coder/coderd/coderdtest" + "github.com/coder/coder/coderd/workspaceapps" "github.com/coder/coder/codersdk" "github.com/coder/coder/codersdk/agentsdk" "github.com/coder/coder/provisioner/echo" @@ -51,6 +52,8 @@ type DeploymentOptions struct { DangerousAllowPathAppSiteOwnerAccess bool ServeHTTPS bool + StatsCollectorOptions workspaceapps.StatsCollectorOptions + // The following fields are only used by setupProxyTestWithFactory. noWorkspace bool port uint16 diff --git a/coderd/workspaceapps/proxy.go b/coderd/workspaceapps/proxy.go index d75970bcfa8d9..da2f5e6a098f1 100644 --- a/coderd/workspaceapps/proxy.go +++ b/coderd/workspaceapps/proxy.go @@ -19,6 +19,7 @@ import ( "cdr.dev/slog" "github.com/coder/coder/agent/agentssh" + "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/httpapi" "github.com/coder/coder/coderd/httpmw" "github.com/coder/coder/coderd/tracing" @@ -109,7 +110,8 @@ type Server struct { DisablePathApps bool SecureAuthCookie bool - AgentProvider AgentProvider + AgentProvider AgentProvider + StatsCollector *StatsCollector websocketWaitMutex sync.Mutex websocketWaitGroup sync.WaitGroup @@ -122,6 +124,10 @@ func (s *Server) Close() error { s.websocketWaitGroup.Wait() s.websocketWaitMutex.Unlock() + if s.StatsCollector != nil { + _ = s.StatsCollector.Close() + } + // The caller must close the SignedTokenProvider and the AgentProvider (if // necessary). @@ -586,6 +592,14 @@ func (s *Server) proxyWorkspaceApp(rw http.ResponseWriter, r *http.Request, appT // end span so we don't get long lived trace data tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx)) + report := newStatsReportFromSignedToken(appToken) + s.collectStats(report) + defer func() { + // We must use defer here because ServeHTTP may panic. + report.SessionEndedAt = database.Now() + s.collectStats(report) + }() + proxy.ServeHTTP(rw, r) } @@ -678,10 +692,24 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) { } defer ptNetConn.Close() log.Debug(ctx, "obtained PTY") + + report := newStatsReportFromSignedToken(*appToken) + s.collectStats(report) + defer func() { + report.SessionEndedAt = database.Now() + s.collectStats(report) + }() + agentssh.Bicopy(ctx, wsNetConn, ptNetConn) log.Debug(ctx, "pty Bicopy finished") } +func (s *Server) collectStats(stats StatsReport) { + if s.StatsCollector != nil { + s.StatsCollector.Collect(stats) + } +} + // wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func // is called if a read or write error is encountered. type wsNetConn struct { diff --git a/coderd/workspaceapps/stats.go b/coderd/workspaceapps/stats.go new file mode 100644 index 0000000000000..72a8154d5e89c --- /dev/null +++ b/coderd/workspaceapps/stats.go @@ -0,0 +1,403 @@ +package workspaceapps + +import ( + "context" + "sync" + "time" + + "github.com/google/uuid" + "golang.org/x/xerrors" + + "cdr.dev/slog" + + "github.com/coder/coder/coderd/database" + "github.com/coder/coder/coderd/database/dbauthz" +) + +const ( + DefaultStatsCollectorReportInterval = 30 * time.Second + DefaultStatsCollectorRollupWindow = 1 * time.Minute + DefaultStatsDBReporterBatchSize = 1024 +) + +// StatsReport is a report of a workspace app session. +type StatsReport struct { + UserID uuid.UUID `json:"user_id"` + WorkspaceID uuid.UUID `json:"workspace_id"` + AgentID uuid.UUID `json:"agent_id"` + AccessMethod AccessMethod `json:"access_method"` + SlugOrPort string `json:"slug_or_port"` + SessionID uuid.UUID `json:"session_id"` + SessionStartedAt time.Time `json:"session_started_at"` + SessionEndedAt time.Time `json:"session_ended_at"` // Updated periodically while app is in use active and when the last connection is closed. + Requests int `json:"requests"` + + rolledUp bool // Indicates if this report has been rolled up. +} + +func newStatsReportFromSignedToken(token SignedToken) StatsReport { + return StatsReport{ + UserID: token.UserID, + WorkspaceID: token.WorkspaceID, + AgentID: token.AgentID, + AccessMethod: token.AccessMethod, + SlugOrPort: token.AppSlugOrPort, + SessionID: uuid.New(), + SessionStartedAt: database.Now(), + Requests: 1, + } +} + +// StatsReporter reports workspace app StatsReports. +type StatsReporter interface { + Report(context.Context, []StatsReport) error +} + +var _ StatsReporter = (*StatsDBReporter)(nil) + +// StatsDBReporter writes workspace app StatsReports to the database. +type StatsDBReporter struct { + db database.Store + batchSize int +} + +// NewStatsDBReporter returns a new StatsDBReporter. +func NewStatsDBReporter(db database.Store, batchSize int) *StatsDBReporter { + return &StatsDBReporter{ + db: db, + batchSize: batchSize, + } +} + +// Report writes the given StatsReports to the database. +func (r *StatsDBReporter) Report(ctx context.Context, stats []StatsReport) error { + err := r.db.InTx(func(tx database.Store) error { + maxBatchSize := r.batchSize + if len(stats) < maxBatchSize { + maxBatchSize = len(stats) + } + batch := database.InsertWorkspaceAppStatsParams{ + UserID: make([]uuid.UUID, 0, maxBatchSize), + WorkspaceID: make([]uuid.UUID, 0, maxBatchSize), + AgentID: make([]uuid.UUID, 0, maxBatchSize), + AccessMethod: make([]string, 0, maxBatchSize), + SlugOrPort: make([]string, 0, maxBatchSize), + SessionID: make([]uuid.UUID, 0, maxBatchSize), + SessionStartedAt: make([]time.Time, 0, maxBatchSize), + SessionEndedAt: make([]time.Time, 0, maxBatchSize), + Requests: make([]int32, 0, maxBatchSize), + } + for _, stat := range stats { + batch.UserID = append(batch.UserID, stat.UserID) + batch.WorkspaceID = append(batch.WorkspaceID, stat.WorkspaceID) + batch.AgentID = append(batch.AgentID, stat.AgentID) + batch.AccessMethod = append(batch.AccessMethod, string(stat.AccessMethod)) + batch.SlugOrPort = append(batch.SlugOrPort, stat.SlugOrPort) + batch.SessionID = append(batch.SessionID, stat.SessionID) + batch.SessionStartedAt = append(batch.SessionStartedAt, stat.SessionStartedAt) + batch.SessionEndedAt = append(batch.SessionEndedAt, stat.SessionEndedAt) + batch.Requests = append(batch.Requests, int32(stat.Requests)) + + if len(batch.UserID) >= r.batchSize { + err := tx.InsertWorkspaceAppStats(ctx, batch) + if err != nil { + return err + } + + // Reset batch. + batch.UserID = batch.UserID[:0] + batch.WorkspaceID = batch.WorkspaceID[:0] + batch.AgentID = batch.AgentID[:0] + batch.AccessMethod = batch.AccessMethod[:0] + batch.SlugOrPort = batch.SlugOrPort[:0] + batch.SessionID = batch.SessionID[:0] + batch.SessionStartedAt = batch.SessionStartedAt[:0] + batch.SessionEndedAt = batch.SessionEndedAt[:0] + batch.Requests = batch.Requests[:0] + } + } + if len(batch.UserID) > 0 { + err := tx.InsertWorkspaceAppStats(ctx, batch) + if err != nil { + return err + } + } + + return nil + }, nil) + if err != nil { + return xerrors.Errorf("insert workspace app stats failed: %w", err) + } + + return nil +} + +// This should match the database unique constraint. +type statsGroupKey struct { + StartTimeTrunc time.Time + UserID uuid.UUID + WorkspaceID uuid.UUID + AgentID uuid.UUID + AccessMethod AccessMethod + SlugOrPort string +} + +func (s StatsReport) groupKey(windowSize time.Duration) statsGroupKey { + return statsGroupKey{ + StartTimeTrunc: s.SessionStartedAt.Truncate(windowSize), + UserID: s.UserID, + WorkspaceID: s.WorkspaceID, + AgentID: s.AgentID, + AccessMethod: s.AccessMethod, + SlugOrPort: s.SlugOrPort, + } +} + +// StatsCollector collects workspace app StatsReports and reports them +// in batches, stats compaction is performed for short-lived sessions. +type StatsCollector struct { + opts StatsCollectorOptions + + ctx context.Context + cancel context.CancelFunc + done chan struct{} + + mu sync.Mutex // Protects following. + statsBySessionID map[uuid.UUID]*StatsReport // Track unique sessions. + groupedStats map[statsGroupKey][]*StatsReport // Rolled up stats for sessions in close proximity. + backlog []StatsReport // Stats that have not been reported yet (due to error). +} + +type StatsCollectorOptions struct { + Logger *slog.Logger + Reporter StatsReporter + // ReportInterval is the interval at which stats are reported, both partial + // and fully formed stats. + ReportInterval time.Duration + // RollupWindow is the window size for rolling up stats, session shorter + // than this will be rolled up and longer than this will be tracked + // individually. + RollupWindow time.Duration + + // Options for tests. + Flush <-chan chan<- struct{} + Now func() time.Time +} + +func NewStatsCollector(opts StatsCollectorOptions) *StatsCollector { + if opts.Logger == nil { + opts.Logger = &slog.Logger{} + } + if opts.ReportInterval == 0 { + opts.ReportInterval = DefaultStatsCollectorReportInterval + } + if opts.RollupWindow == 0 { + opts.RollupWindow = DefaultStatsCollectorRollupWindow + } + if opts.Now == nil { + opts.Now = time.Now + } + + ctx, cancel := context.WithCancel(context.Background()) + sc := &StatsCollector{ + ctx: ctx, + cancel: cancel, + done: make(chan struct{}), + opts: opts, + + statsBySessionID: make(map[uuid.UUID]*StatsReport), + groupedStats: make(map[statsGroupKey][]*StatsReport), + } + + go sc.start() + return sc +} + +// Collect the given StatsReport for later reporting (non-blocking). +func (sc *StatsCollector) Collect(report StatsReport) { + sc.mu.Lock() + defer sc.mu.Unlock() + + r := &report + if _, ok := sc.statsBySessionID[report.SessionID]; !ok { + groupKey := r.groupKey(sc.opts.RollupWindow) + sc.groupedStats[groupKey] = append(sc.groupedStats[groupKey], r) + } + + if r.SessionEndedAt.IsZero() { + sc.statsBySessionID[report.SessionID] = r + } else { + if stat, ok := sc.statsBySessionID[report.SessionID]; ok { + // Update in-place. + *stat = *r + } + delete(sc.statsBySessionID, report.SessionID) + } +} + +// rollup performs stats rollup for sessions that fall within the +// configured rollup window. For sessions longer than the window, +// we report them individually. +func (sc *StatsCollector) rollup(now time.Time) []StatsReport { + sc.mu.Lock() + defer sc.mu.Unlock() + + var report []StatsReport + + for g, group := range sc.groupedStats { + if len(group) == 0 { + // Safety check, this should not happen. + sc.opts.Logger.Error(sc.ctx, "empty stats group", "group", g) + delete(sc.groupedStats, g) + continue + } + + var rolledUp *StatsReport + if group[0].rolledUp { + rolledUp = group[0] + group = group[1:] + } else { + rolledUp = &StatsReport{ + UserID: g.UserID, + WorkspaceID: g.WorkspaceID, + AgentID: g.AgentID, + AccessMethod: g.AccessMethod, + SlugOrPort: g.SlugOrPort, + SessionStartedAt: g.StartTimeTrunc, + SessionEndedAt: g.StartTimeTrunc.Add(sc.opts.RollupWindow), + Requests: 0, + rolledUp: true, + } + } + rollupChanged := false + newGroup := []*StatsReport{rolledUp} // Must be first in slice for future iterations (see group[0] above). + for _, stat := range group { + if !stat.SessionEndedAt.IsZero() && stat.SessionEndedAt.Sub(stat.SessionStartedAt) <= sc.opts.RollupWindow { + // This is a short-lived session, roll it up. + if rolledUp.SessionID == uuid.Nil { + rolledUp.SessionID = stat.SessionID // Borrow the first session ID, useful in tests. + } + rolledUp.Requests += stat.Requests + rollupChanged = true + continue + } + if stat.SessionEndedAt.IsZero() && now.Sub(stat.SessionStartedAt) <= sc.opts.RollupWindow { + // This is an incomplete session, wait and see if it'll be rolled up or not. + newGroup = append(newGroup, stat) + continue + } + + // This is a long-lived session, report it individually. + // Make a copy of stat for reporting. + r := *stat + if r.SessionEndedAt.IsZero() { + // Report an end time for incomplete sessions, it will + // be updated later. This ensures that data in the DB + // will have an end time even if the service is stopped. + r.SessionEndedAt = now.UTC() // Use UTC like database.Now(). + } + report = append(report, r) // Report it (ended or incomplete). + if stat.SessionEndedAt.IsZero() { + newGroup = append(newGroup, stat) // Keep it for future updates. + } + } + if rollupChanged { + report = append(report, *rolledUp) + } + + // Future rollups should only consider the compacted group. + sc.groupedStats[g] = newGroup + + // Keep the group around until the next rollup window has passed + // in case data was collected late. + if len(newGroup) == 1 && rolledUp.SessionEndedAt.Add(sc.opts.RollupWindow).Before(now) { + delete(sc.groupedStats, g) + } + } + + return report +} + +func (sc *StatsCollector) flush(ctx context.Context) (err error) { + sc.opts.Logger.Debug(ctx, "flushing workspace app stats") + defer func() { + if err != nil { + sc.opts.Logger.Error(ctx, "failed to flush workspace app stats", "error", err) + } else { + sc.opts.Logger.Debug(ctx, "flushed workspace app stats") + } + }() + + // We keep the backlog as a simple slice so that we don't need to + // attempt to merge it with the stats we're about to report. This + // is because the rollup is a one-way operation and the backlog may + // contain stats that are still in the statsBySessionID map and will + // be reported again in the future. It is possible to merge the + // backlog and the stats we're about to report, but it's not worth + // the complexity. + if len(sc.backlog) > 0 { + err = sc.opts.Reporter.Report(ctx, sc.backlog) + if err != nil { + return xerrors.Errorf("report workspace app stats from backlog failed: %w", err) + } + sc.backlog = nil + } + + now := sc.opts.Now() + stats := sc.rollup(now) + if len(stats) == 0 { + return nil + } + + err = sc.opts.Reporter.Report(ctx, stats) + if err != nil { + sc.backlog = stats + return xerrors.Errorf("report workspace app stats failed: %w", err) + } + + return nil +} + +func (sc *StatsCollector) Close() error { + sc.cancel() + <-sc.done + return nil +} + +func (sc *StatsCollector) start() { + defer func() { + close(sc.done) + sc.opts.Logger.Debug(sc.ctx, "workspace app stats collector stopped") + }() + sc.opts.Logger.Debug(sc.ctx, "workspace app stats collector started") + + t := time.NewTimer(sc.opts.ReportInterval) + defer t.Stop() + + var reportFlushDone chan<- struct{} + done := false + for !done { + select { + case <-sc.ctx.Done(): + t.Stop() + done = true + case <-t.C: + case reportFlushDone = <-sc.opts.Flush: + } + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + //nolint:gocritic // Inserting app stats is a system function. + _ = sc.flush(dbauthz.AsSystemRestricted(ctx)) + cancel() + + if !done { + t.Reset(sc.opts.ReportInterval) + } + + // For tests. + if reportFlushDone != nil { + reportFlushDone <- struct{}{} + reportFlushDone = nil + } + } +} diff --git a/coderd/workspaceapps/stats_test.go b/coderd/workspaceapps/stats_test.go new file mode 100644 index 0000000000000..2ad0c5556c52c --- /dev/null +++ b/coderd/workspaceapps/stats_test.go @@ -0,0 +1,426 @@ +package workspaceapps_test + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" + "golang.org/x/xerrors" + + "github.com/coder/coder/coderd/database" + "github.com/coder/coder/coderd/workspaceapps" + "github.com/coder/coder/testutil" +) + +type fakeReporter struct { + mu sync.Mutex + s []workspaceapps.StatsReport + err error + errN int +} + +func (r *fakeReporter) stats() []workspaceapps.StatsReport { + r.mu.Lock() + defer r.mu.Unlock() + return r.s +} + +func (r *fakeReporter) errors() int { + r.mu.Lock() + defer r.mu.Unlock() + return r.errN +} + +func (r *fakeReporter) setError(err error) { + r.mu.Lock() + defer r.mu.Unlock() + r.err = err +} + +func (r *fakeReporter) Report(_ context.Context, stats []workspaceapps.StatsReport) error { + r.mu.Lock() + if r.err != nil { + r.errN++ + r.mu.Unlock() + return r.err + } + r.s = append(r.s, stats...) + r.mu.Unlock() + return nil +} + +func TestStatsCollector(t *testing.T) { + t.Parallel() + + rollupUUID := uuid.New() + rollupUUID2 := uuid.New() + someUUID := uuid.New() + + rollupWindow := time.Minute + start := database.Now().Truncate(time.Minute).UTC() + end := start.Add(10 * time.Second) + + tests := []struct { + name string + flushIncrement time.Duration + flushCount int + stats []workspaceapps.StatsReport + want []workspaceapps.StatsReport + }{ + { + name: "Single stat rolled up and reported once", + flushIncrement: 2*rollupWindow + time.Second, + flushCount: 10, // Only reported once. + stats: []workspaceapps.StatsReport{ + { + SessionID: rollupUUID, + SessionStartedAt: start, + SessionEndedAt: end, + Requests: 1, + }, + }, + want: []workspaceapps.StatsReport{ + { + SessionID: rollupUUID, + SessionStartedAt: start, + SessionEndedAt: start.Add(rollupWindow), + Requests: 1, + }, + }, + }, + { + name: "Two unique stat rolled up", + flushIncrement: 2*rollupWindow + time.Second, + flushCount: 10, // Only reported once. + stats: []workspaceapps.StatsReport{ + { + AccessMethod: workspaceapps.AccessMethodPath, + SlugOrPort: "code-server", + SessionID: rollupUUID, + SessionStartedAt: start, + SessionEndedAt: end, + Requests: 1, + }, + { + AccessMethod: workspaceapps.AccessMethodTerminal, + SessionID: rollupUUID2, + SessionStartedAt: start, + SessionEndedAt: end, + Requests: 1, + }, + }, + want: []workspaceapps.StatsReport{ + { + AccessMethod: workspaceapps.AccessMethodPath, + SlugOrPort: "code-server", + SessionID: rollupUUID, + SessionStartedAt: start, + SessionEndedAt: start.Add(rollupWindow), + Requests: 1, + }, + { + AccessMethod: workspaceapps.AccessMethodTerminal, + SessionID: rollupUUID2, + SessionStartedAt: start, + SessionEndedAt: start.Add(rollupWindow), + Requests: 1, + }, + }, + }, + { + name: "Multiple stats rolled up", + flushIncrement: 2*rollupWindow + time.Second, + flushCount: 2, + stats: []workspaceapps.StatsReport{ + { + SessionID: rollupUUID, + SessionStartedAt: start, + SessionEndedAt: end, + Requests: 1, + }, + { + SessionID: uuid.New(), + SessionStartedAt: start, + SessionEndedAt: end, + Requests: 1, + }, + }, + want: []workspaceapps.StatsReport{ + { + SessionID: rollupUUID, + SessionStartedAt: start, + SessionEndedAt: start.Add(rollupWindow), + Requests: 2, + }, + }, + }, + { + name: "Long sessions not rolled up but reported multiple times", + flushIncrement: rollupWindow + time.Second, + flushCount: 4, + stats: []workspaceapps.StatsReport{ + { + SessionID: rollupUUID, + SessionStartedAt: start, + Requests: 1, + }, + }, + want: []workspaceapps.StatsReport{ + { + SessionID: rollupUUID, + SessionStartedAt: start, + SessionEndedAt: start.Add(rollupWindow + time.Second), + Requests: 1, + }, + { + SessionID: rollupUUID, + SessionStartedAt: start, + SessionEndedAt: start.Add(2 * (rollupWindow + time.Second)), + Requests: 1, + }, + { + SessionID: rollupUUID, + SessionStartedAt: start, + SessionEndedAt: start.Add(3 * (rollupWindow + time.Second)), + Requests: 1, + }, + { + SessionID: rollupUUID, + SessionStartedAt: start, + SessionEndedAt: start.Add(4 * (rollupWindow + time.Second)), + Requests: 1, + }, + }, + }, + { + name: "Incomplete stats not reported until it exceeds rollup window", + flushIncrement: rollupWindow / 4, + flushCount: 6, + stats: []workspaceapps.StatsReport{ + { + SessionID: someUUID, + SessionStartedAt: start, + Requests: 1, + }, + }, + want: []workspaceapps.StatsReport{ + { + SessionID: someUUID, + SessionStartedAt: start, + SessionEndedAt: start.Add(rollupWindow / 4 * 5), + Requests: 1, + }, + { + SessionID: someUUID, + SessionStartedAt: start, + SessionEndedAt: start.Add(rollupWindow / 4 * 6), + Requests: 1, + }, + }, + }, + { + name: "Same stat reported without and with end time and rolled up", + flushIncrement: rollupWindow + time.Second, + flushCount: 1, + stats: []workspaceapps.StatsReport{ + { + SessionID: someUUID, + SessionStartedAt: start, + Requests: 1, + }, + { + SessionID: someUUID, + SessionStartedAt: start, + SessionEndedAt: start.Add(10 * time.Second), + Requests: 1, + }, + }, + want: []workspaceapps.StatsReport{ + { + SessionID: someUUID, + SessionStartedAt: start, + SessionEndedAt: start.Add(rollupWindow), + Requests: 1, + }, + }, + }, + { + name: "Same non-rolled up stat reported without and with end time", + flushIncrement: rollupWindow * 2, + flushCount: 1, + stats: []workspaceapps.StatsReport{ + { + SessionID: someUUID, + SessionStartedAt: start, + Requests: 1, + }, + { + SessionID: someUUID, + SessionStartedAt: start, + SessionEndedAt: start.Add(rollupWindow * 2), + Requests: 1, + }, + }, + want: []workspaceapps.StatsReport{ + { + SessionID: someUUID, + SessionStartedAt: start, + SessionEndedAt: start.Add(rollupWindow * 2), + Requests: 1, + }, + }, + }, + } + + // Run tests. + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + flush := make(chan chan<- struct{}, 1) + var now atomic.Pointer[time.Time] + now.Store(&start) + + reporter := &fakeReporter{} + collector := workspaceapps.NewStatsCollector(workspaceapps.StatsCollectorOptions{ + Reporter: reporter, + ReportInterval: time.Hour, + RollupWindow: rollupWindow, + + Flush: flush, + Now: func() time.Time { return *now.Load() }, + }) + + // Collect reports. + for _, report := range tt.stats { + collector.Collect(report) + } + + // Advance time. + flushTime := start.Add(tt.flushIncrement) + for i := 0; i < tt.flushCount; i++ { + now.Store(&flushTime) + flushDone := make(chan struct{}, 1) + flush <- flushDone + <-flushDone + flushTime = flushTime.Add(tt.flushIncrement) + } + + var gotStats []workspaceapps.StatsReport + require.Eventually(t, func() bool { + gotStats = reporter.stats() + return len(gotStats) == len(tt.want) + }, testutil.WaitMedium, testutil.IntervalFast) + + // Order is not guaranteed. + sortBySessionID := func(a, b workspaceapps.StatsReport) int { + if a.SessionID == b.SessionID { + return int(a.SessionEndedAt.Sub(b.SessionEndedAt)) + } + if a.SessionID.String() < b.SessionID.String() { + return -1 + } + return 1 + } + slices.SortFunc(tt.want, sortBySessionID) + slices.SortFunc(gotStats, sortBySessionID) + + // Verify reported stats. + for i, got := range gotStats { + want := tt.want[i] + + assert.Equal(t, want.SessionID, got.SessionID, "session ID; i = %d", i) + assert.Equal(t, want.SessionStartedAt, got.SessionStartedAt, "session started at; i = %d", i) + assert.Equal(t, want.SessionEndedAt, got.SessionEndedAt, "session ended at; i = %d", i) + assert.Equal(t, want.Requests, got.Requests, "requests; i = %d", i) + } + }) + } +} + +func TestStatsCollector_backlog(t *testing.T) { + t.Parallel() + + rollupWindow := time.Minute + flush := make(chan chan<- struct{}, 1) + + start := database.Now().Truncate(time.Minute).UTC() + var now atomic.Pointer[time.Time] + now.Store(&start) + + reporter := &fakeReporter{} + collector := workspaceapps.NewStatsCollector(workspaceapps.StatsCollectorOptions{ + Reporter: reporter, + ReportInterval: time.Hour, + RollupWindow: rollupWindow, + + Flush: flush, + Now: func() time.Time { return *now.Load() }, + }) + + reporter.setError(xerrors.New("some error")) + + // The first collected stat is "rolled up" and moved into the + // backlog during the first flush. On the second flush nothing is + // rolled up due to being unable to report the backlog. + for i := 0; i < 2; i++ { + collector.Collect(workspaceapps.StatsReport{ + SessionID: uuid.New(), + SessionStartedAt: start, + SessionEndedAt: start.Add(10 * time.Second), + Requests: 1, + }) + start = start.Add(time.Minute) + now.Store(&start) + + flushDone := make(chan struct{}, 1) + flush <- flushDone + <-flushDone + } + + // Flush was performed 2 times, 2 reports should have failed. + wantErrors := 2 + assert.Equal(t, wantErrors, reporter.errors()) + assert.Empty(t, reporter.stats()) + + reporter.setError(nil) + + // Flush again, this time the backlog should be reported in addition + // to the second collected stat being rolled up and reported. + flushDone := make(chan struct{}, 1) + flush <- flushDone + <-flushDone + + assert.Equal(t, wantErrors, reporter.errors()) + assert.Len(t, reporter.stats(), 2) +} + +func TestStatsCollector_Close(t *testing.T) { + t.Parallel() + + reporter := &fakeReporter{} + collector := workspaceapps.NewStatsCollector(workspaceapps.StatsCollectorOptions{ + Reporter: reporter, + ReportInterval: time.Hour, + RollupWindow: time.Minute, + }) + + collector.Collect(workspaceapps.StatsReport{ + SessionID: uuid.New(), + SessionStartedAt: database.Now(), + SessionEndedAt: database.Now(), + Requests: 1, + }) + + collector.Close() + + // Verify that stats are reported after close. + assert.NotEmpty(t, reporter.stats()) +} diff --git a/coderd/workspaceapps_test.go b/coderd/workspaceapps_test.go index 8f31bed4d27d2..1312a9d1e3198 100644 --- a/coderd/workspaceapps_test.go +++ b/coderd/workspaceapps_test.go @@ -275,6 +275,7 @@ func TestWorkspaceApps(t *testing.T) { "CF-Connecting-IP", }, }, + WorkspaceAppsStatsCollectorOptions: opts.StatsCollectorOptions, }) user := coderdtest.CreateFirstUser(t, client) diff --git a/docs/api/schemas.md b/docs/api/schemas.md index 0c066ca60391e..523a0e0c3c649 100644 --- a/docs/api/schemas.md +++ b/docs/api/schemas.md @@ -7610,6 +7610,36 @@ _None_ | `username_or_id` | string | false | | For the following fields, if the AccessMethod is AccessMethodTerminal, then only AgentNameOrID may be set and it must be a UUID. The other fields must be left blank. | | `workspace_name_or_id` | string | false | | | +## workspaceapps.StatsReport + +```json +{ + "access_method": "path", + "agent_id": "string", + "requests": 0, + "session_ended_at": "string", + "session_id": "string", + "session_started_at": "string", + "slug_or_port": "string", + "user_id": "string", + "workspace_id": "string" +} +``` + +### Properties + +| Name | Type | Required | Restrictions | Description | +| -------------------- | -------------------------------------------------------- | -------- | ------------ | --------------------------------------------------------------------------------------- | +| `access_method` | [workspaceapps.AccessMethod](#workspaceappsaccessmethod) | false | | | +| `agent_id` | string | false | | | +| `requests` | integer | false | | | +| `session_ended_at` | string | false | | Updated periodically while app is in use active and when the last connection is closed. | +| `session_id` | string | false | | | +| `session_started_at` | string | false | | | +| `slug_or_port` | string | false | | | +| `user_id` | string | false | | | +| `workspace_id` | string | false | | | + ## wsproxysdk.AgentIsLegacyResponse ```json @@ -7714,3 +7744,29 @@ _None_ | `derp_mesh_key` | string | false | | | | `derp_region_id` | integer | false | | | | `sibling_replicas` | array of [codersdk.Replica](#codersdkreplica) | false | | Sibling replicas is a list of all other replicas of the proxy that have not timed out. | + +## wsproxysdk.ReportAppStatsRequest + +```json +{ + "stats": [ + { + "access_method": "path", + "agent_id": "string", + "requests": 0, + "session_ended_at": "string", + "session_id": "string", + "session_started_at": "string", + "slug_or_port": "string", + "user_id": "string", + "workspace_id": "string" + } + ] +} +``` + +### Properties + +| Name | Type | Required | Restrictions | Description | +| ------- | --------------------------------------------------------------- | -------- | ------------ | ----------- | +| `stats` | array of [workspaceapps.StatsReport](#workspaceappsstatsreport) | false | | | diff --git a/enterprise/coderd/coderd.go b/enterprise/coderd/coderd.go index 8567b88aa7efe..ea587d7393075 100644 --- a/enterprise/coderd/coderd.go +++ b/enterprise/coderd/coderd.go @@ -167,6 +167,7 @@ func New(ctx context.Context, options *Options) (_ *API, err error) { ) r.Get("/coordinate", api.workspaceProxyCoordinate) r.Post("/issue-signed-app-token", api.workspaceProxyIssueSignedAppToken) + r.Post("/app-stats", api.workspaceProxyReportAppStats) r.Post("/register", api.workspaceProxyRegister) r.Post("/deregister", api.workspaceProxyDeregister) }) diff --git a/enterprise/coderd/coderdenttest/proxytest.go b/enterprise/coderd/coderdenttest/proxytest.go index c544e14b44a46..a9031e90a5f32 100644 --- a/enterprise/coderd/coderdenttest/proxytest.go +++ b/enterprise/coderd/coderdenttest/proxytest.go @@ -110,6 +110,10 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie }) require.NoError(t, err, "failed to create workspace proxy") + // Inherit collector options from coderd, but keep the wsproxy reporter. + statsCollectorOptions := coderdAPI.Options.WorkspaceAppsStatsCollectorOptions + statsCollectorOptions.Reporter = nil + wssrv, err := wsproxy.New(ctx, &wsproxy.Options{ Logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug), Experiments: options.Experiments, @@ -129,6 +133,7 @@ func NewWorkspaceProxy(t *testing.T, coderdAPI *coderd.API, owner *codersdk.Clie DERPEnabled: !options.DerpDisabled, DERPOnly: options.DerpOnly, DERPServerRelayAddress: accessURL.String(), + StatsCollectorOptions: statsCollectorOptions, }) require.NoError(t, err) t.Cleanup(func() { diff --git a/enterprise/coderd/workspaceproxy.go b/enterprise/coderd/workspaceproxy.go index 54279953920de..87a4f8898872c 100644 --- a/enterprise/coderd/workspaceproxy.go +++ b/enterprise/coderd/workspaceproxy.go @@ -497,6 +497,36 @@ func (api *API) workspaceProxyIssueSignedAppToken(rw http.ResponseWriter, r *htt }) } +// @Summary Report workspace app stats +// @ID report-workspace-app-stats +// @Security CoderSessionToken +// @Accept json +// @Tags Enterprise +// @Param request body wsproxysdk.ReportAppStatsRequest true "Report app stats request" +// @Success 204 +// @Router /workspaceproxies/me/app-stats [post] +// @x-apidocgen {"skip": true} +func (api *API) workspaceProxyReportAppStats(rw http.ResponseWriter, r *http.Request) { + ctx := r.Context() + _ = httpmw.WorkspaceProxy(r) // Ensure the proxy is authenticated. + + var req wsproxysdk.ReportAppStatsRequest + if !httpapi.Read(ctx, rw, r, &req) { + return + } + + api.Logger.Debug(ctx, "report app stats", slog.F("stats", req.Stats)) + + reporter := api.WorkspaceAppsStatsCollectorOptions.Reporter + if err := reporter.Report(ctx, req.Stats); err != nil { + api.Logger.Error(ctx, "report app stats failed", slog.Error(err)) + httpapi.InternalServerError(rw, err) + return + } + + httpapi.Write(ctx, rw, http.StatusNoContent, nil) +} + // workspaceProxyRegister is used to register a new workspace proxy. When a proxy // comes online, it will announce itself to this endpoint. This updates its values // in the database and returns a signed token that can be used to authenticate diff --git a/enterprise/wsproxy/appstatsreporter.go b/enterprise/wsproxy/appstatsreporter.go new file mode 100644 index 0000000000000..ba3cb92df93cc --- /dev/null +++ b/enterprise/wsproxy/appstatsreporter.go @@ -0,0 +1,21 @@ +package wsproxy + +import ( + "context" + + "github.com/coder/coder/coderd/workspaceapps" + "github.com/coder/coder/enterprise/wsproxy/wsproxysdk" +) + +var _ workspaceapps.StatsReporter = (*appStatsReporter)(nil) + +type appStatsReporter struct { + Client *wsproxysdk.Client +} + +func (r *appStatsReporter) Report(ctx context.Context, stats []workspaceapps.StatsReport) error { + err := r.Client.ReportAppStats(ctx, wsproxysdk.ReportAppStatsRequest{ + Stats: stats, + }) + return err +} diff --git a/enterprise/wsproxy/wsproxy.go b/enterprise/wsproxy/wsproxy.go index 5879e687fe485..80414e40ceae6 100644 --- a/enterprise/wsproxy/wsproxy.go +++ b/enterprise/wsproxy/wsproxy.go @@ -79,6 +79,8 @@ type Options struct { // By default, CORs is set to accept external requests // from the dashboardURL. This should only be used in development. AllowAllCors bool + + StatsCollectorOptions workspaceapps.StatsCollectorOptions } func (o *Options) Validate() error { @@ -262,8 +264,17 @@ func New(ctx context.Context, opts *Options) (*Server, error) { } } + workspaceAppsLogger := opts.Logger.Named("workspaceapps") + if opts.StatsCollectorOptions.Logger == nil { + named := workspaceAppsLogger.Named("stats_collector") + opts.StatsCollectorOptions.Logger = &named + } + if opts.StatsCollectorOptions.Reporter == nil { + opts.StatsCollectorOptions.Reporter = &appStatsReporter{Client: client} + } + s.AppServer = &workspaceapps.Server{ - Logger: opts.Logger.Named("workspaceapps"), + Logger: workspaceAppsLogger, DashboardURL: opts.DashboardURL, AccessURL: opts.AccessURL, Hostname: opts.AppHostname, @@ -279,9 +290,11 @@ func New(ctx context.Context, opts *Options) (*Server, error) { }, AppSecurityKey: secKey, - AgentProvider: agentProvider, DisablePathApps: opts.DisablePathApps, SecureAuthCookie: opts.SecureAuthCookie, + + AgentProvider: agentProvider, + StatsCollector: workspaceapps.NewStatsCollector(opts.StatsCollectorOptions), } derpHandler := derphttp.Handler(derpServer) diff --git a/enterprise/wsproxy/wsproxy_test.go b/enterprise/wsproxy/wsproxy_test.go index 3ba8463331d6a..64fe414fe5fac 100644 --- a/enterprise/wsproxy/wsproxy_test.go +++ b/enterprise/wsproxy/wsproxy_test.go @@ -478,6 +478,7 @@ func TestWorkspaceProxyWorkspaceApps_Wsconncache(t *testing.T) { "CF-Connecting-IP", }, }, + WorkspaceAppsStatsCollectorOptions: opts.StatsCollectorOptions, }, LicenseOptions: &coderdenttest.LicenseOptions{ Features: license.Features{ @@ -536,6 +537,7 @@ func TestWorkspaceProxyWorkspaceApps_SingleTailnet(t *testing.T) { "CF-Connecting-IP", }, }, + WorkspaceAppsStatsCollectorOptions: opts.StatsCollectorOptions, }, LicenseOptions: &coderdenttest.LicenseOptions{ Features: license.Features{ diff --git a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go index 6d1a4b1227b5d..d9b60d311eb0b 100644 --- a/enterprise/wsproxy/wsproxysdk/wsproxysdk.go +++ b/enterprise/wsproxy/wsproxysdk/wsproxysdk.go @@ -152,6 +152,25 @@ func (c *Client) IssueSignedAppTokenHTML(ctx context.Context, rw http.ResponseWr return res, true } +type ReportAppStatsRequest struct { + Stats []workspaceapps.StatsReport `json:"stats"` +} + +// ReportAppStats reports the given app stats to the primary coder server. +func (c *Client) ReportAppStats(ctx context.Context, req ReportAppStatsRequest) error { + resp, err := c.Request(ctx, http.MethodPost, "/api/v2/workspaceproxies/me/app-stats", req) + if err != nil { + return xerrors.Errorf("make request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusNoContent { + return codersdk.ReadBodyAsError(resp) + } + + return nil +} + type RegisterWorkspaceProxyRequest struct { // AccessURL that hits the workspace proxy api. AccessURL string `json:"access_url"`