diff --git a/CODEOWNERS b/CODEOWNERS index e571a160b12b7..c94a6bd3f34af 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -7,7 +7,6 @@ tailnet/proto/ @spikecurtis @johnstcn vpn/vpn.proto @spikecurtis @johnstcn vpn/version.go @spikecurtis @johnstcn - # This caching code is particularly tricky, and one must be very careful when # altering it. coderd/files/ @aslilac @@ -29,3 +28,8 @@ site/src/api/countriesGenerated.ts site/src/api/rbacresourcesGenerated.ts site/src/api/typesGenerated.ts site/CLAUDE.md + +# Usage tracking code requires intimate knowledge of Tallyman and Metronome, as +# well as guidance from revenue. +coderd/usage/ @deansheather +enterprise/coderd/usage/ @deansheather diff --git a/coderd/database/dbauthz/dbauthz.go b/coderd/database/dbauthz/dbauthz.go index 257cbc6e6b142..69ce7de485dd4 100644 --- a/coderd/database/dbauthz/dbauthz.go +++ b/coderd/database/dbauthz/dbauthz.go @@ -3913,6 +3913,13 @@ func (q *querier) InsertTemplateVersionWorkspaceTag(ctx context.Context, arg dat return q.db.InsertTemplateVersionWorkspaceTag(ctx, arg) } +func (q *querier) InsertUsageEvent(ctx context.Context, arg database.InsertUsageEventParams) error { + if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil { + return err + } + return q.db.InsertUsageEvent(ctx, arg) +} + func (q *querier) InsertUser(ctx context.Context, arg database.InsertUserParams) (database.User, error) { // Always check if the assigned roles can actually be assigned by this actor. impliedRoles := append([]rbac.RoleIdentifier{rbac.RoleMember()}, q.convertToDeploymentRoles(arg.RBACRoles)...) @@ -4260,6 +4267,14 @@ func (q *querier) RevokeDBCryptKey(ctx context.Context, activeKeyDigest string) return q.db.RevokeDBCryptKey(ctx, activeKeyDigest) } +func (q *querier) SelectUsageEventsForPublishing(ctx context.Context, arg time.Time) ([]database.UsageEvent, error) { + // ActionUpdate because we're updating the publish_started_at column. + if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil { + return nil, err + } + return q.db.SelectUsageEventsForPublishing(ctx, arg) +} + func (q *querier) TryAcquireLock(ctx context.Context, id int64) (bool, error) { return q.db.TryAcquireLock(ctx, id) } @@ -4725,6 +4740,13 @@ func (q *querier) UpdateTemplateWorkspacesLastUsedAt(ctx context.Context, arg da return fetchAndExec(q.log, q.auth, policy.ActionUpdate, fetch, q.db.UpdateTemplateWorkspacesLastUsedAt)(ctx, arg) } +func (q *querier) UpdateUsageEventsPostPublish(ctx context.Context, arg database.UpdateUsageEventsPostPublishParams) error { + if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil { + return err + } + return q.db.UpdateUsageEventsPostPublish(ctx, arg) +} + func (q *querier) UpdateUserDeletedByID(ctx context.Context, id uuid.UUID) error { return deleteQ(q.log, q.auth, q.db.GetUserByID, q.db.UpdateUserDeletedByID)(ctx, id) } diff --git a/coderd/database/dbauthz/dbauthz_test.go b/coderd/database/dbauthz/dbauthz_test.go index bcf0caa95c365..ece99b30d9b4a 100644 --- a/coderd/database/dbauthz/dbauthz_test.go +++ b/coderd/database/dbauthz/dbauthz_test.go @@ -5845,3 +5845,27 @@ func (s *MethodTestSuite) TestAuthorizePrebuiltWorkspace() { }).Asserts(w, policy.ActionUpdate, w.AsPrebuild(), policy.ActionUpdate) })) } + +func (s *MethodTestSuite) TestUsageEvents() { + s.Run("InsertUsageEvent", s.Subtest(func(db database.Store, check *expects) { + check.Args(database.InsertUsageEventParams{ + ID: "1", + EventType: database.UsageEventTypeDcManagedAgentsV1, + EventData: []byte("{}"), + CreatedAt: dbtime.Now(), + }).Asserts(rbac.ResourceSystem, policy.ActionCreate) + })) + + s.Run("SelectUsageEventsForPublishing", s.Subtest(func(db database.Store, check *expects) { + check.Args(dbtime.Now()).Asserts(rbac.ResourceSystem, policy.ActionUpdate) + })) + + s.Run("UpdateUsageEventsPostPublish", s.Subtest(func(db database.Store, check *expects) { + check.Args(database.UpdateUsageEventsPostPublishParams{ + Now: dbtime.Now(), + IDs: []string{"1", "2"}, + FailureMessages: []string{"error", "error"}, + SetPublishedAts: []bool{false, false}, + }).Asserts(rbac.ResourceSystem, policy.ActionUpdate) + })) +} diff --git a/coderd/database/dbmetrics/querymetrics.go b/coderd/database/dbmetrics/querymetrics.go index 811d945ac7da9..c126f4c23814f 100644 --- a/coderd/database/dbmetrics/querymetrics.go +++ b/coderd/database/dbmetrics/querymetrics.go @@ -2371,6 +2371,13 @@ func (m queryMetricsStore) InsertTemplateVersionWorkspaceTag(ctx context.Context return r0, r1 } +func (m queryMetricsStore) InsertUsageEvent(ctx context.Context, arg database.InsertUsageEventParams) error { + start := time.Now() + r0 := m.s.InsertUsageEvent(ctx, arg) + m.queryLatencies.WithLabelValues("InsertUsageEvent").Observe(time.Since(start).Seconds()) + return r0 +} + func (m queryMetricsStore) InsertUser(ctx context.Context, arg database.InsertUserParams) (database.User, error) { start := time.Now() user, err := m.s.InsertUser(ctx, arg) @@ -2623,6 +2630,13 @@ func (m queryMetricsStore) RevokeDBCryptKey(ctx context.Context, activeKeyDigest return r0 } +func (m queryMetricsStore) SelectUsageEventsForPublishing(ctx context.Context, arg time.Time) ([]database.UsageEvent, error) { + start := time.Now() + r0, r1 := m.s.SelectUsageEventsForPublishing(ctx, arg) + m.queryLatencies.WithLabelValues("SelectUsageEventsForPublishing").Observe(time.Since(start).Seconds()) + return r0, r1 +} + func (m queryMetricsStore) TryAcquireLock(ctx context.Context, pgTryAdvisoryXactLock int64) (bool, error) { start := time.Now() ok, err := m.s.TryAcquireLock(ctx, pgTryAdvisoryXactLock) @@ -2896,6 +2910,13 @@ func (m queryMetricsStore) UpdateTemplateWorkspacesLastUsedAt(ctx context.Contex return r0 } +func (m queryMetricsStore) UpdateUsageEventsPostPublish(ctx context.Context, arg database.UpdateUsageEventsPostPublishParams) error { + start := time.Now() + r0 := m.s.UpdateUsageEventsPostPublish(ctx, arg) + m.queryLatencies.WithLabelValues("UpdateUsageEventsPostPublish").Observe(time.Since(start).Seconds()) + return r0 +} + func (m queryMetricsStore) UpdateUserDeletedByID(ctx context.Context, id uuid.UUID) error { start := time.Now() r0 := m.s.UpdateUserDeletedByID(ctx, id) diff --git a/coderd/database/dbmock/dbmock.go b/coderd/database/dbmock/dbmock.go index b20c3d06209b5..949061b178b7f 100644 --- a/coderd/database/dbmock/dbmock.go +++ b/coderd/database/dbmock/dbmock.go @@ -5063,6 +5063,20 @@ func (mr *MockStoreMockRecorder) InsertTemplateVersionWorkspaceTag(ctx, arg any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertTemplateVersionWorkspaceTag", reflect.TypeOf((*MockStore)(nil).InsertTemplateVersionWorkspaceTag), ctx, arg) } +// InsertUsageEvent mocks base method. +func (m *MockStore) InsertUsageEvent(ctx context.Context, arg database.InsertUsageEventParams) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InsertUsageEvent", ctx, arg) + ret0, _ := ret[0].(error) + return ret0 +} + +// InsertUsageEvent indicates an expected call of InsertUsageEvent. +func (mr *MockStoreMockRecorder) InsertUsageEvent(ctx, arg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertUsageEvent", reflect.TypeOf((*MockStore)(nil).InsertUsageEvent), ctx, arg) +} + // InsertUser mocks base method. func (m *MockStore) InsertUser(ctx context.Context, arg database.InsertUserParams) (database.User, error) { m.ctrl.T.Helper() @@ -5623,6 +5637,21 @@ func (mr *MockStoreMockRecorder) RevokeDBCryptKey(ctx, activeKeyDigest any) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RevokeDBCryptKey", reflect.TypeOf((*MockStore)(nil).RevokeDBCryptKey), ctx, activeKeyDigest) } +// SelectUsageEventsForPublishing mocks base method. +func (m *MockStore) SelectUsageEventsForPublishing(ctx context.Context, now time.Time) ([]database.UsageEvent, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SelectUsageEventsForPublishing", ctx, now) + ret0, _ := ret[0].([]database.UsageEvent) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SelectUsageEventsForPublishing indicates an expected call of SelectUsageEventsForPublishing. +func (mr *MockStoreMockRecorder) SelectUsageEventsForPublishing(ctx, now any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectUsageEventsForPublishing", reflect.TypeOf((*MockStore)(nil).SelectUsageEventsForPublishing), ctx, now) +} + // TryAcquireLock mocks base method. func (m *MockStore) TryAcquireLock(ctx context.Context, pgTryAdvisoryXactLock int64) (bool, error) { m.ctrl.T.Helper() @@ -6183,6 +6212,20 @@ func (mr *MockStoreMockRecorder) UpdateTemplateWorkspacesLastUsedAt(ctx, arg any return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTemplateWorkspacesLastUsedAt", reflect.TypeOf((*MockStore)(nil).UpdateTemplateWorkspacesLastUsedAt), ctx, arg) } +// UpdateUsageEventsPostPublish mocks base method. +func (m *MockStore) UpdateUsageEventsPostPublish(ctx context.Context, arg database.UpdateUsageEventsPostPublishParams) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateUsageEventsPostPublish", ctx, arg) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateUsageEventsPostPublish indicates an expected call of UpdateUsageEventsPostPublish. +func (mr *MockStoreMockRecorder) UpdateUsageEventsPostPublish(ctx, arg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateUsageEventsPostPublish", reflect.TypeOf((*MockStore)(nil).UpdateUsageEventsPostPublish), ctx, arg) +} + // UpdateUserDeletedByID mocks base method. func (m *MockStore) UpdateUserDeletedByID(ctx context.Context, id uuid.UUID) error { m.ctrl.T.Helper() diff --git a/coderd/database/dump.sql b/coderd/database/dump.sql index 67d58ad05c802..3a1fa23c6595b 100644 --- a/coderd/database/dump.sql +++ b/coderd/database/dump.sql @@ -281,6 +281,12 @@ CREATE TYPE tailnet_status AS ENUM ( 'lost' ); +CREATE TYPE usage_event_type AS ENUM ( + 'dc_managed_agents_v1' +); + +COMMENT ON TYPE usage_event_type IS 'The usage event type with version. "dc" means "discrete" (e.g. a single event, for counters), "hb" means "heartbeat" (e.g. a recurring event that contains a total count of usage generated from the database, for gauges).'; + CREATE TYPE user_status AS ENUM ( 'active', 'suspended', @@ -1815,6 +1821,28 @@ CREATE VIEW template_with_names AS COMMENT ON VIEW template_with_names IS 'Joins in the display name information such as username, avatar, and organization name.'; +CREATE TABLE usage_events ( + id text NOT NULL, + event_type usage_event_type NOT NULL, + event_data jsonb NOT NULL, + created_at timestamp with time zone NOT NULL, + publish_started_at timestamp with time zone, + published_at timestamp with time zone, + failure_message text +); + +COMMENT ON TABLE usage_events IS 'usage_events contains usage data that is collected from the product and potentially shipped to the usage collector service.'; + +COMMENT ON COLUMN usage_events.id IS 'For "discrete" event types, this is a random UUID. For "heartbeat" event types, this is a combination of the event type and a truncated timestamp.'; + +COMMENT ON COLUMN usage_events.event_data IS 'Event payload. Determined by the matching usage struct for this event type.'; + +COMMENT ON COLUMN usage_events.publish_started_at IS 'Set to a timestamp while the event is being published by a Coder replica to the usage collector service. Used to avoid duplicate publishes by multiple replicas. Timestamps older than 1 hour are considered expired.'; + +COMMENT ON COLUMN usage_events.published_at IS 'Set to a timestamp when the event is successfully (or permanently unsuccessfully) published to the usage collector service. If set, the event should never be attempted to be published again.'; + +COMMENT ON COLUMN usage_events.failure_message IS 'Set to an error message when the event is temporarily or permanently unsuccessfully published to the usage collector service.'; + CREATE TABLE user_configs ( user_id uuid NOT NULL, key character varying(256) NOT NULL, @@ -2647,6 +2675,9 @@ ALTER TABLE ONLY template_versions ALTER TABLE ONLY templates ADD CONSTRAINT templates_pkey PRIMARY KEY (id); +ALTER TABLE ONLY usage_events + ADD CONSTRAINT usage_events_pkey PRIMARY KEY (id); + ALTER TABLE ONLY user_configs ADD CONSTRAINT user_configs_pkey PRIMARY KEY (user_id, key); @@ -2812,6 +2843,12 @@ CREATE INDEX idx_template_versions_has_ai_task ON template_versions USING btree CREATE UNIQUE INDEX idx_unique_preset_name ON template_version_presets USING btree (name, template_version_id); +CREATE INDEX idx_usage_events_created_at ON usage_events USING btree (created_at); + +CREATE INDEX idx_usage_events_publish_started_at ON usage_events USING btree (publish_started_at); + +CREATE INDEX idx_usage_events_published_at ON usage_events USING btree (published_at); + CREATE INDEX idx_user_deleted_deleted_at ON user_deleted USING btree (deleted_at); CREATE INDEX idx_user_status_changes_changed_at ON user_status_changes USING btree (changed_at); diff --git a/coderd/database/migrations/000353_create_usage_events_table.down.sql b/coderd/database/migrations/000353_create_usage_events_table.down.sql new file mode 100644 index 0000000000000..eacce867e0acc --- /dev/null +++ b/coderd/database/migrations/000353_create_usage_events_table.down.sql @@ -0,0 +1,2 @@ +DROP TABLE usage_events; +DROP TYPE usage_event_type; diff --git a/coderd/database/migrations/000353_create_usage_events_table.up.sql b/coderd/database/migrations/000353_create_usage_events_table.up.sql new file mode 100644 index 0000000000000..d15dcbaaad050 --- /dev/null +++ b/coderd/database/migrations/000353_create_usage_events_table.up.sql @@ -0,0 +1,26 @@ +CREATE TYPE usage_event_type AS ENUM ( + 'dc_managed_agents_v1' +); + +COMMENT ON TYPE usage_event_type IS 'The usage event type with version. "dc" means "discrete" (e.g. a single event, for counters), "hb" means "heartbeat" (e.g. a recurring event that contains a total count of usage generated from the database, for gauges).'; + +CREATE TABLE usage_events ( + id TEXT PRIMARY KEY, + event_type usage_event_type NOT NULL, + event_data JSONB NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + publish_started_at TIMESTAMP WITH TIME ZONE DEFAULT NULL, + published_at TIMESTAMP WITH TIME ZONE DEFAULT NULL, + failure_message TEXT DEFAULT NULL +); + +COMMENT ON TABLE usage_events IS 'usage_events contains usage data that is collected from the product and potentially shipped to the usage collector service.'; +COMMENT ON COLUMN usage_events.id IS 'For "discrete" event types, this is a random UUID. For "heartbeat" event types, this is a combination of the event type and a truncated timestamp.'; +COMMENT ON COLUMN usage_events.event_data IS 'Event payload. Determined by the matching usage struct for this event type.'; +COMMENT ON COLUMN usage_events.publish_started_at IS 'Set to a timestamp while the event is being published by a Coder replica to the usage collector service. Used to avoid duplicate publishes by multiple replicas. Timestamps older than 1 hour are considered expired.'; +COMMENT ON COLUMN usage_events.published_at IS 'Set to a timestamp when the event is successfully (or permanently unsuccessfully) published to the usage collector service. If set, the event should never be attempted to be published again.'; +COMMENT ON COLUMN usage_events.failure_message IS 'Set to an error message when the event is temporarily or permanently unsuccessfully published to the usage collector service.'; + +CREATE INDEX idx_usage_events_created_at ON usage_events (created_at); +CREATE INDEX idx_usage_events_publish_started_at ON usage_events (publish_started_at); +CREATE INDEX idx_usage_events_published_at ON usage_events (published_at); diff --git a/coderd/database/migrations/testdata/fixtures/000353_create_usage_events_table.up.sql b/coderd/database/migrations/testdata/fixtures/000353_create_usage_events_table.up.sql new file mode 100644 index 0000000000000..aa7c53f5eb94c --- /dev/null +++ b/coderd/database/migrations/testdata/fixtures/000353_create_usage_events_table.up.sql @@ -0,0 +1,60 @@ +INSERT INTO usage_events ( + id, + event_type, + event_data, + created_at, + publish_started_at, + published_at, + failure_message +) +VALUES +-- Unpublished dc_managed_agents_v1 event. +( + 'event1', + 'dc_managed_agents_v1', + '{"count":1}', + '2023-01-01 00:00:00+00', + NULL, + NULL, + NULL +), +-- Successfully published dc_managed_agents_v1 event. +( + 'event2', + 'dc_managed_agents_v1', + '{"count":2}', + '2023-01-01 00:00:00+00', + NULL, + '2023-01-01 00:00:02+00', + NULL +), +-- Publish in progress dc_managed_agents_v1 event. +( + 'event3', + 'dc_managed_agents_v1', + '{"count":3}', + '2023-01-01 00:00:00+00', + '2023-01-01 00:00:01+00', + NULL, + NULL +), +-- Temporarily failed to publish dc_managed_agents_v1 event. +( + 'event4', + 'dc_managed_agents_v1', + '{"count":4}', + '2023-01-01 00:00:00+00', + NULL, + NULL, + 'publish failed temporarily' +), +-- Permanently failed to publish dc_managed_agents_v1 event. +( + 'event5', + 'dc_managed_agents_v1', + '{"count":5}', + '2023-01-01 00:00:00+00', + NULL, + '2023-01-01 00:00:02+00', + 'publish failed permanently' +) diff --git a/coderd/database/modelmethods.go b/coderd/database/modelmethods.go index b49fa113d4b12..b326890d0f184 100644 --- a/coderd/database/modelmethods.go +++ b/coderd/database/modelmethods.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "sort" "strconv" + "strings" "time" "github.com/google/uuid" @@ -628,3 +629,11 @@ func (m WorkspaceAgentVolumeResourceMonitor) Debounce( return m.DebouncedUntil, false } + +func (e UsageEventType) IsDiscrete() bool { + return e.Valid() && strings.HasPrefix(string(e), "dc_") +} + +func (e UsageEventType) IsHeartbeat() bool { + return e.Valid() && strings.HasPrefix(string(e), "hb_") +} diff --git a/coderd/database/models.go b/coderd/database/models.go index 094bc98c68373..f7c0d27199f9e 100644 --- a/coderd/database/models.go +++ b/coderd/database/models.go @@ -2254,6 +2254,62 @@ func AllTailnetStatusValues() []TailnetStatus { } } +// The usage event type with version. "dc" means "discrete" (e.g. a single event, for counters), "hb" means "heartbeat" (e.g. a recurring event that contains a total count of usage generated from the database, for gauges). +type UsageEventType string + +const ( + UsageEventTypeDcManagedAgentsV1 UsageEventType = "dc_managed_agents_v1" +) + +func (e *UsageEventType) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = UsageEventType(s) + case string: + *e = UsageEventType(s) + default: + return fmt.Errorf("unsupported scan type for UsageEventType: %T", src) + } + return nil +} + +type NullUsageEventType struct { + UsageEventType UsageEventType `json:"usage_event_type"` + Valid bool `json:"valid"` // Valid is true if UsageEventType is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullUsageEventType) Scan(value interface{}) error { + if value == nil { + ns.UsageEventType, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.UsageEventType.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullUsageEventType) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.UsageEventType), nil +} + +func (e UsageEventType) Valid() bool { + switch e { + case UsageEventTypeDcManagedAgentsV1: + return true + } + return false +} + +func AllUsageEventTypeValues() []UsageEventType { + return []UsageEventType{ + UsageEventTypeDcManagedAgentsV1, + } +} + // Defines the users status: active, dormant, or suspended. type UserStatus string @@ -3693,6 +3749,22 @@ type TemplateVersionWorkspaceTag struct { Value string `db:"value" json:"value"` } +// usage_events contains usage data that is collected from the product and potentially shipped to the usage collector service. +type UsageEvent struct { + // For "discrete" event types, this is a random UUID. For "heartbeat" event types, this is a combination of the event type and a truncated timestamp. + ID string `db:"id" json:"id"` + EventType UsageEventType `db:"event_type" json:"event_type"` + // Event payload. Determined by the matching usage struct for this event type. + EventData json.RawMessage `db:"event_data" json:"event_data"` + CreatedAt time.Time `db:"created_at" json:"created_at"` + // Set to a timestamp while the event is being published by a Coder replica to the usage collector service. Used to avoid duplicate publishes by multiple replicas. Timestamps older than 1 hour are considered expired. + PublishStartedAt sql.NullTime `db:"publish_started_at" json:"publish_started_at"` + // Set to a timestamp when the event is successfully (or permanently unsuccessfully) published to the usage collector service. If set, the event should never be attempted to be published again. + PublishedAt sql.NullTime `db:"published_at" json:"published_at"` + // Set to an error message when the event is temporarily or permanently unsuccessfully published to the usage collector service. + FailureMessage sql.NullString `db:"failure_message" json:"failure_message"` +} + type User struct { ID uuid.UUID `db:"id" json:"id"` Email string `db:"email" json:"email"` diff --git a/coderd/database/querier.go b/coderd/database/querier.go index baa5d8590b1d7..3585088055e6e 100644 --- a/coderd/database/querier.go +++ b/coderd/database/querier.go @@ -520,6 +520,9 @@ type sqlcQuerier interface { InsertTemplateVersionTerraformValuesByJobID(ctx context.Context, arg InsertTemplateVersionTerraformValuesByJobIDParams) error InsertTemplateVersionVariable(ctx context.Context, arg InsertTemplateVersionVariableParams) (TemplateVersionVariable, error) InsertTemplateVersionWorkspaceTag(ctx context.Context, arg InsertTemplateVersionWorkspaceTagParams) (TemplateVersionWorkspaceTag, error) + // Duplicate events are ignored intentionally to allow for multiple replicas to + // publish heartbeat events. + InsertUsageEvent(ctx context.Context, arg InsertUsageEventParams) error InsertUser(ctx context.Context, arg InsertUserParams) (User, error) // InsertUserGroupsByID adds a user to all provided groups, if they exist. // If there is a conflict, the user is already a member @@ -565,6 +568,11 @@ type sqlcQuerier interface { RemoveUserFromAllGroups(ctx context.Context, userID uuid.UUID) error RemoveUserFromGroups(ctx context.Context, arg RemoveUserFromGroupsParams) ([]uuid.UUID, error) RevokeDBCryptKey(ctx context.Context, activeKeyDigest string) error + // Note that this selects from the CTE, not the original table. The CTE is named + // the same as the original table to trick sqlc into reusing the existing struct + // for the table. + // The CTE and the reorder is required because UPDATE doesn't guarantee order. + SelectUsageEventsForPublishing(ctx context.Context, now time.Time) ([]UsageEvent, error) // Non blocking lock. Returns true if the lock was acquired, false otherwise. // // This must be called from within a transaction. The lock will be automatically @@ -609,6 +617,7 @@ type sqlcQuerier interface { UpdateTemplateVersionDescriptionByJobID(ctx context.Context, arg UpdateTemplateVersionDescriptionByJobIDParams) error UpdateTemplateVersionExternalAuthProvidersByJobID(ctx context.Context, arg UpdateTemplateVersionExternalAuthProvidersByJobIDParams) error UpdateTemplateWorkspacesLastUsedAt(ctx context.Context, arg UpdateTemplateWorkspacesLastUsedAtParams) error + UpdateUsageEventsPostPublish(ctx context.Context, arg UpdateUsageEventsPostPublishParams) error UpdateUserDeletedByID(ctx context.Context, id uuid.UUID) error UpdateUserGithubComUserID(ctx context.Context, arg UpdateUserGithubComUserIDParams) error UpdateUserHashedOneTimePasscode(ctx context.Context, arg UpdateUserHashedOneTimePasscodeParams) error diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 80357b3731874..a6a7cefc59f6b 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -13361,6 +13361,151 @@ func (q *sqlQuerier) DisableForeignKeysAndTriggers(ctx context.Context) error { return err } +const insertUsageEvent = `-- name: InsertUsageEvent :exec +INSERT INTO + usage_events ( + id, + event_type, + event_data, + created_at, + publish_started_at, + published_at, + failure_message + ) +VALUES + ($1, $2::usage_event_type, $3, $4, NULL, NULL, NULL) +ON CONFLICT (id) DO NOTHING +` + +type InsertUsageEventParams struct { + ID string `db:"id" json:"id"` + EventType UsageEventType `db:"event_type" json:"event_type"` + EventData json.RawMessage `db:"event_data" json:"event_data"` + CreatedAt time.Time `db:"created_at" json:"created_at"` +} + +// Duplicate events are ignored intentionally to allow for multiple replicas to +// publish heartbeat events. +func (q *sqlQuerier) InsertUsageEvent(ctx context.Context, arg InsertUsageEventParams) error { + _, err := q.db.ExecContext(ctx, insertUsageEvent, + arg.ID, + arg.EventType, + arg.EventData, + arg.CreatedAt, + ) + return err +} + +const selectUsageEventsForPublishing = `-- name: SelectUsageEventsForPublishing :many +WITH usage_events AS ( + UPDATE + usage_events + SET + publish_started_at = $1::timestamptz + WHERE + id IN ( + SELECT + potential_event.id + FROM + usage_events potential_event + WHERE + -- We do not publish events older than 30 days. Tallyman will + -- always permanently reject these events anyways. + -- The parenthesis around @now::timestamptz are necessary to + -- avoid sqlc from generating an extra argument. + potential_event.created_at > ($1::timestamptz) - INTERVAL '30 days' + AND potential_event.published_at IS NULL + AND ( + potential_event.publish_started_at IS NULL + -- If the event has publish_started_at set, it must be older + -- than an hour ago. This is so we can retry publishing + -- events where the replica exited or couldn't update the + -- row. + -- Also, same parenthesis thing here: + OR potential_event.publish_started_at < ($1::timestamptz) - INTERVAL '1 hour' + ) + ORDER BY potential_event.created_at ASC + FOR UPDATE SKIP LOCKED + LIMIT 100 + ) + RETURNING id, event_type, event_data, created_at, publish_started_at, published_at, failure_message +) +SELECT id, event_type, event_data, created_at, publish_started_at, published_at, failure_message +FROM usage_events +ORDER BY created_at ASC +` + +// Note that this selects from the CTE, not the original table. The CTE is named +// the same as the original table to trick sqlc into reusing the existing struct +// for the table. +// The CTE and the reorder is required because UPDATE doesn't guarantee order. +func (q *sqlQuerier) SelectUsageEventsForPublishing(ctx context.Context, now time.Time) ([]UsageEvent, error) { + rows, err := q.db.QueryContext(ctx, selectUsageEventsForPublishing, now) + if err != nil { + return nil, err + } + defer rows.Close() + var items []UsageEvent + for rows.Next() { + var i UsageEvent + if err := rows.Scan( + &i.ID, + &i.EventType, + &i.EventData, + &i.CreatedAt, + &i.PublishStartedAt, + &i.PublishedAt, + &i.FailureMessage, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const updateUsageEventsPostPublish = `-- name: UpdateUsageEventsPostPublish :exec +UPDATE + usage_events +SET + publish_started_at = NULL, + published_at = CASE WHEN input.set_published_at THEN $1::timestamptz ELSE NULL END, + failure_message = NULLIF(input.failure_message, '') +FROM ( + SELECT + UNNEST($2::text[]) AS id, + UNNEST($3::text[]) AS failure_message, + UNNEST($4::boolean[]) AS set_published_at +) input +WHERE + input.id = usage_events.id + AND cardinality($2::text[]) = cardinality($3::text[]) + AND cardinality($2::text[]) = cardinality($4::boolean[]) +` + +type UpdateUsageEventsPostPublishParams struct { + Now time.Time `db:"now" json:"now"` + IDs []string `db:"ids" json:"ids"` + FailureMessages []string `db:"failure_messages" json:"failure_messages"` + SetPublishedAts []bool `db:"set_published_ats" json:"set_published_ats"` +} + +func (q *sqlQuerier) UpdateUsageEventsPostPublish(ctx context.Context, arg UpdateUsageEventsPostPublishParams) error { + _, err := q.db.ExecContext(ctx, updateUsageEventsPostPublish, + arg.Now, + pq.Array(arg.IDs), + pq.Array(arg.FailureMessages), + pq.Array(arg.SetPublishedAts), + ) + return err +} + const getUserLinkByLinkedID = `-- name: GetUserLinkByLinkedID :one SELECT user_links.user_id, user_links.login_type, user_links.linked_id, user_links.oauth_access_token, user_links.oauth_refresh_token, user_links.oauth_expiry, user_links.oauth_access_token_key_id, user_links.oauth_refresh_token_key_id, user_links.claims diff --git a/coderd/database/queries/usageevents.sql b/coderd/database/queries/usageevents.sql new file mode 100644 index 0000000000000..2ec4ec419968e --- /dev/null +++ b/coderd/database/queries/usageevents.sql @@ -0,0 +1,76 @@ +-- name: InsertUsageEvent :exec +-- Duplicate events are ignored intentionally to allow for multiple replicas to +-- publish heartbeat events. +INSERT INTO + usage_events ( + id, + event_type, + event_data, + created_at, + publish_started_at, + published_at, + failure_message + ) +VALUES + (@id, @event_type::usage_event_type, @event_data, @created_at, NULL, NULL, NULL) +ON CONFLICT (id) DO NOTHING; + +-- name: SelectUsageEventsForPublishing :many +WITH usage_events AS ( + UPDATE + usage_events + SET + publish_started_at = @now::timestamptz + WHERE + id IN ( + SELECT + potential_event.id + FROM + usage_events potential_event + WHERE + -- We do not publish events older than 30 days. Tallyman will + -- always permanently reject these events anyways. + -- The parenthesis around @now::timestamptz are necessary to + -- avoid sqlc from generating an extra argument. + potential_event.created_at > (@now::timestamptz) - INTERVAL '30 days' + AND potential_event.published_at IS NULL + AND ( + potential_event.publish_started_at IS NULL + -- If the event has publish_started_at set, it must be older + -- than an hour ago. This is so we can retry publishing + -- events where the replica exited or couldn't update the + -- row. + -- Also, same parenthesis thing here: + OR potential_event.publish_started_at < (@now::timestamptz) - INTERVAL '1 hour' + ) + ORDER BY potential_event.created_at ASC + FOR UPDATE SKIP LOCKED + LIMIT 100 + ) + RETURNING * +) +SELECT * +-- Note that this selects from the CTE, not the original table. The CTE is named +-- the same as the original table to trick sqlc into reusing the existing struct +-- for the table. +FROM usage_events +-- The CTE and the reorder is required because UPDATE doesn't guarantee order. +ORDER BY created_at ASC; + +-- name: UpdateUsageEventsPostPublish :exec +UPDATE + usage_events +SET + publish_started_at = NULL, + published_at = CASE WHEN input.set_published_at THEN @now::timestamptz ELSE NULL END, + failure_message = NULLIF(input.failure_message, '') +FROM ( + SELECT + UNNEST(@ids::text[]) AS id, + UNNEST(@failure_messages::text[]) AS failure_message, + UNNEST(@set_published_ats::boolean[]) AS set_published_at +) input +WHERE + input.id = usage_events.id + AND cardinality(@ids::text[]) = cardinality(@failure_messages::text[]) + AND cardinality(@ids::text[]) = cardinality(@set_published_ats::boolean[]); diff --git a/coderd/database/unique_constraint.go b/coderd/database/unique_constraint.go index 38c95e67410c9..45e7a5acf3980 100644 --- a/coderd/database/unique_constraint.go +++ b/coderd/database/unique_constraint.go @@ -67,6 +67,7 @@ const ( UniqueTemplateVersionsPkey UniqueConstraint = "template_versions_pkey" // ALTER TABLE ONLY template_versions ADD CONSTRAINT template_versions_pkey PRIMARY KEY (id); UniqueTemplateVersionsTemplateIDNameKey UniqueConstraint = "template_versions_template_id_name_key" // ALTER TABLE ONLY template_versions ADD CONSTRAINT template_versions_template_id_name_key UNIQUE (template_id, name); UniqueTemplatesPkey UniqueConstraint = "templates_pkey" // ALTER TABLE ONLY templates ADD CONSTRAINT templates_pkey PRIMARY KEY (id); + UniqueUsageEventsPkey UniqueConstraint = "usage_events_pkey" // ALTER TABLE ONLY usage_events ADD CONSTRAINT usage_events_pkey PRIMARY KEY (id); UniqueUserConfigsPkey UniqueConstraint = "user_configs_pkey" // ALTER TABLE ONLY user_configs ADD CONSTRAINT user_configs_pkey PRIMARY KEY (user_id, key); UniqueUserDeletedPkey UniqueConstraint = "user_deleted_pkey" // ALTER TABLE ONLY user_deleted ADD CONSTRAINT user_deleted_pkey PRIMARY KEY (id); UniqueUserLinksPkey UniqueConstraint = "user_links_pkey" // ALTER TABLE ONLY user_links ADD CONSTRAINT user_links_pkey PRIMARY KEY (user_id, login_type); diff --git a/coderd/usage/collector.go b/coderd/usage/collector.go new file mode 100644 index 0000000000000..1a2e16ea43f01 --- /dev/null +++ b/coderd/usage/collector.go @@ -0,0 +1,29 @@ +package usage + +import ( + "context" + + "github.com/coder/coder/v2/coderd/database" +) + +// Collector is a sink for usage events generated by the product. +type Collector interface { + // CollectDiscreteUsageEvent writes a discrete usage event to the database + // with the given database or transaction. + CollectDiscreteUsageEvent(ctx context.Context, db database.Store, event DiscreteEvent) error +} + +// AGPLCollector is a no-op implementation of Collector. +type AGPLCollector struct{} + +var _ Collector = AGPLCollector{} + +func NewAGPLCollector() Collector { + return AGPLCollector{} +} + +// CollectDiscreteUsageEvent is a no-op implementation of +// CollectDiscreteUsageEvent. +func (AGPLCollector) CollectDiscreteUsageEvent(_ context.Context, _ database.Store, _ DiscreteEvent) error { + return nil +} diff --git a/coderd/usage/events.go b/coderd/usage/events.go new file mode 100644 index 0000000000000..705435a96a244 --- /dev/null +++ b/coderd/usage/events.go @@ -0,0 +1,47 @@ +package usage + +import ( + "golang.org/x/xerrors" + + "github.com/coder/coder/v2/coderd/database" +) + +// Event is a usage event that can be collected by the usage collector. +// +// Note that the following event types should not be updated once they are +// merged into the product. Please consult Dean before making any changes. +type Event interface { + usageEvent() // to prevent external types from implementing this interface + EventType() database.UsageEventType + Valid() error +} + +// DiscreteEvent is a usage event that is collected as a discrete event. +type DiscreteEvent interface { + Event + discreteUsageEvent() // marker method, also prevents external types from implementing this interface +} + +// DCManagedAgentsV1 is a discrete usage event for the number of managed agents. +// This event is sent in the following situations: +// - Once on first startup after usage tracking is added to the product with +// the count of all existing managed agents (count=N) +// - A new managed agent is created (count=1) +type DCManagedAgentsV1 struct { + Count uint64 `json:"count"` +} + +var _ DiscreteEvent = DCManagedAgentsV1{} + +func (DCManagedAgentsV1) usageEvent() {} +func (DCManagedAgentsV1) discreteUsageEvent() {} +func (DCManagedAgentsV1) EventType() database.UsageEventType { + return database.UsageEventTypeDcManagedAgentsV1 +} + +func (e DCManagedAgentsV1) Valid() error { + if e.Count == 0 { + return xerrors.New("count must be greater than 0") + } + return nil +} diff --git a/enterprise/coderd/coderdenttest/coderdenttest.go b/enterprise/coderd/coderdenttest/coderdenttest.go index 47d248335dda1..d8afa47fbb82a 100644 --- a/enterprise/coderd/coderdenttest/coderdenttest.go +++ b/enterprise/coderd/coderdenttest/coderdenttest.go @@ -161,12 +161,13 @@ func NewWithAPI(t *testing.T, options *Options) ( // LicenseOptions is used to generate a license for testing. // It supports the builder pattern for easy customization. type LicenseOptions struct { - AccountType string - AccountID string - DeploymentIDs []string - Trial bool - FeatureSet codersdk.FeatureSet - AllFeatures bool + AccountType string + AccountID string + DeploymentIDs []string + Trial bool + FeatureSet codersdk.FeatureSet + AllFeatures bool + PublishUsageData bool // GraceAt is the time at which the license will enter the grace period. GraceAt time.Time // ExpiresAt is the time at which the license will hard expire. @@ -279,15 +280,16 @@ func GenerateLicense(t *testing.T, options LicenseOptions) string { NotBefore: jwt.NewNumericDate(options.NotBefore), IssuedAt: jwt.NewNumericDate(issuedAt), }, - LicenseExpires: jwt.NewNumericDate(options.GraceAt), - AccountType: options.AccountType, - AccountID: options.AccountID, - DeploymentIDs: options.DeploymentIDs, - Trial: options.Trial, - Version: license.CurrentVersion, - AllFeatures: options.AllFeatures, - FeatureSet: options.FeatureSet, - Features: options.Features, + LicenseExpires: jwt.NewNumericDate(options.GraceAt), + AccountType: options.AccountType, + AccountID: options.AccountID, + DeploymentIDs: options.DeploymentIDs, + Trial: options.Trial, + Version: license.CurrentVersion, + AllFeatures: options.AllFeatures, + FeatureSet: options.FeatureSet, + Features: options.Features, + PublishUsageData: options.PublishUsageData, } return GenerateLicenseRaw(t, c) } diff --git a/enterprise/coderd/license/license.go b/enterprise/coderd/license/license.go index bc5c174d9fc3a..687a4aaf66746 100644 --- a/enterprise/coderd/license/license.go +++ b/enterprise/coderd/license/license.go @@ -584,6 +584,7 @@ type Claims struct { Version uint64 `json:"version"` Features Features `json:"features"` RequireTelemetry bool `json:"require_telemetry,omitempty"` + PublishUsageData bool `json:"publish_usage_data,omitempty"` } var _ jwt.Claims = &Claims{} diff --git a/enterprise/coderd/usage/collector.go b/enterprise/coderd/usage/collector.go new file mode 100644 index 0000000000000..99dcef6d10217 --- /dev/null +++ b/enterprise/coderd/usage/collector.go @@ -0,0 +1,67 @@ +package usage + +import ( + "context" + "encoding/json" + + "github.com/google/uuid" + "golang.org/x/xerrors" + + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbtime" + agplusage "github.com/coder/coder/v2/coderd/usage" + "github.com/coder/quartz" +) + +// Collector collects usage events and stores them in the database for +// publishing. +type Collector struct { + clock quartz.Clock +} + +var _ agplusage.Collector = &Collector{} + +// NewCollector creates a new database-backed usage event collector. +func NewCollector(opts ...CollectorOption) *Collector { + c := &Collector{ + clock: quartz.NewReal(), + } + for _, opt := range opts { + opt(c) + } + return c +} + +type CollectorOption func(*Collector) + +// CollectorWithClock sets the quartz clock to use for the collector. +func CollectorWithClock(clock quartz.Clock) CollectorOption { + return func(c *Collector) { + c.clock = clock + } +} + +// CollectDiscreteUsageEvent implements agplusage.Collector. +func (c *Collector) CollectDiscreteUsageEvent(ctx context.Context, db database.Store, event agplusage.DiscreteEvent) error { + if !event.EventType().IsDiscrete() { + return xerrors.Errorf("event type %q is not a discrete event", event.EventType()) + } + if err := event.Valid(); err != nil { + return xerrors.Errorf("invalid %q event: %w", event.EventType(), err) + } + + jsonData, err := json.Marshal(event) + if err != nil { + return xerrors.Errorf("marshal event as JSON: %w", err) + } + + // Duplicate events are ignored by the query, so we don't need to check the + // error. + return db.InsertUsageEvent(ctx, database.InsertUsageEventParams{ + // Always generate a new UUID for discrete events. + ID: uuid.New().String(), + EventType: event.EventType(), + EventData: jsonData, + CreatedAt: dbtime.Time(c.clock.Now()), + }) +} diff --git a/enterprise/coderd/usage/collector_test.go b/enterprise/coderd/usage/collector_test.go new file mode 100644 index 0000000000000..6aa7bc605788b --- /dev/null +++ b/enterprise/coderd/usage/collector_test.go @@ -0,0 +1,85 @@ +package usage_test + +import ( + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbmock" + "github.com/coder/coder/v2/coderd/database/dbtime" + agplusage "github.com/coder/coder/v2/coderd/usage" + "github.com/coder/coder/v2/enterprise/coderd/usage" + "github.com/coder/coder/v2/testutil" + "github.com/coder/quartz" +) + +func TestCollector(t *testing.T) { + t.Parallel() + + t.Run("OK", func(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitLong) + ctrl := gomock.NewController(t) + db := dbmock.NewMockStore(ctrl) + clock := quartz.NewMock(t) + collector := usage.NewCollector(usage.CollectorWithClock(clock)) + + now := dbtime.Now() + events := []struct { + time time.Time + event agplusage.DiscreteEvent + }{ + { + time: now, + event: agplusage.DCManagedAgentsV1{ + Count: 1, + }, + }, + { + time: now.Add(1 * time.Minute), + event: agplusage.DCManagedAgentsV1{ + Count: 2, + }, + }, + } + + for _, event := range events { + eventJSON := jsoninate(t, event.event) + db.EXPECT().InsertUsageEvent(ctx, gomock.Any()).DoAndReturn( + func(ctx interface{}, params database.InsertUsageEventParams) error { + _, err := uuid.Parse(params.ID) + assert.NoError(t, err) + assert.Equal(t, event.event.EventType(), params.EventType) + assert.JSONEq(t, eventJSON, string(params.EventData)) + assert.Equal(t, event.time, params.CreatedAt) + return nil + }, + ).Times(1) + + clock.Set(event.time) + err := collector.CollectDiscreteUsageEvent(ctx, db, event.event) + require.NoError(t, err) + } + }) + + t.Run("InvalidEvent", func(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitLong) + ctrl := gomock.NewController(t) + db := dbmock.NewMockStore(ctrl) + + // We should get an error if the event is invalid. + collector := usage.NewCollector() + err := collector.CollectDiscreteUsageEvent(ctx, db, agplusage.DCManagedAgentsV1{ + Count: 0, // invalid + }) + assert.ErrorContains(t, err, `invalid "dc_managed_agents_v1" event: count must be greater than 0`) + }) +} diff --git a/enterprise/coderd/usage/publisher.go b/enterprise/coderd/usage/publisher.go new file mode 100644 index 0000000000000..290691e44c4ed --- /dev/null +++ b/enterprise/coderd/usage/publisher.go @@ -0,0 +1,427 @@ +package usage + +import ( + "bytes" + "context" + "crypto/ed25519" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/google/uuid" + "golang.org/x/xerrors" + + "cdr.dev/slog" + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/cryptorand" + "github.com/coder/coder/v2/enterprise/coderd" + "github.com/coder/coder/v2/enterprise/coderd/license" + "github.com/coder/quartz" +) + +const ( + CoderLicenseJWTHeader = "Coder-License-JWT" + + tallymanURL = "https://tallyman-ingress.coder.com" + tallymanIngestURLV1 = tallymanURL + "/api/v1/ingest" + + tallymanPublishInitialMinimumDelay = 5 * time.Minute + // Chosen to be a prime number and not a multiple of 5 like many other + // recurring tasks. + tallymanPublishInterval = 17 * time.Minute + tallymanPublishTimeout = 30 * time.Second + tallymanPublishBatchSize = 100 +) + +var errUsagePublishingDisabled = xerrors.New("usage publishing is not enabled by any license") + +// Publisher publishes usage events ***somewhere***. +type Publisher interface { + // Close closes the publisher and waits for it to finish. + io.Closer + // Start starts the publisher. It must only be called once. + Start() error +} + +type tallymanPublisher struct { + ctx context.Context + ctxCancel context.CancelFunc + log slog.Logger + db database.Store + done chan struct{} + + // Configured with options: + ingestURL string + httpClient *http.Client + clock quartz.Clock + licenseKeys map[string]ed25519.PublicKey + initialDelay time.Duration +} + +var _ Publisher = &tallymanPublisher{} + +// NewTallymanPublisher creates a Publisher that publishes usage events to +// Coder's Tallyman service. +func NewTallymanPublisher(ctx context.Context, log slog.Logger, db database.Store, opts ...TallymanPublisherOption) Publisher { + ctx, cancel := context.WithCancel(ctx) + publisher := &tallymanPublisher{ + ctx: ctx, + ctxCancel: cancel, + log: log, + db: db, + done: make(chan struct{}), + + ingestURL: tallymanIngestURLV1, + httpClient: http.DefaultClient, + clock: quartz.NewReal(), + licenseKeys: coderd.Keys, + } + for _, opt := range opts { + opt(publisher) + } + return publisher +} + +type TallymanPublisherOption func(*tallymanPublisher) + +// PublisherWithHTTPClient sets the HTTP client to use for publishing usage events. +func PublisherWithHTTPClient(httpClient *http.Client) TallymanPublisherOption { + return func(p *tallymanPublisher) { + p.httpClient = httpClient + } +} + +// PublisherWithClock sets the clock to use for publishing usage events. +func PublisherWithClock(clock quartz.Clock) TallymanPublisherOption { + return func(p *tallymanPublisher) { + p.clock = clock + } +} + +// PublisherWithLicenseKeys sets the license public keys to use for license +// validation. +func PublisherWithLicenseKeys(keys map[string]ed25519.PublicKey) TallymanPublisherOption { + return func(p *tallymanPublisher) { + p.licenseKeys = keys + } +} + +// PublisherWithIngestURL sets the ingest URL to use for publishing usage +// events. +func PublisherWithIngestURL(ingestURL string) TallymanPublisherOption { + return func(p *tallymanPublisher) { + p.ingestURL = ingestURL + } +} + +// PublisherWithInitialDelay sets the initial delay for the publisher. +func PublisherWithInitialDelay(initialDelay time.Duration) TallymanPublisherOption { + return func(p *tallymanPublisher) { + p.initialDelay = initialDelay + } +} + +// Start implements Publisher. +func (p *tallymanPublisher) Start() error { + deploymentID, err := p.db.GetDeploymentID(p.ctx) + if err != nil { + return xerrors.Errorf("get deployment ID: %w", err) + } + deploymentUUID, err := uuid.Parse(deploymentID) + if err != nil { + return xerrors.Errorf("parse deployment ID %q: %w", deploymentID, err) + } + + if p.initialDelay <= 0 { + // Pick a random time between tallymanPublishInitialMinimumDelay and + // tallymanPublishInterval. + maxPlusDelay := int(tallymanPublishInterval - tallymanPublishInitialMinimumDelay) + plusDelay, err := cryptorand.Intn(maxPlusDelay) + if err != nil { + return xerrors.Errorf("could not generate random start delay: %w", err) + } + p.initialDelay = tallymanPublishInitialMinimumDelay + time.Duration(plusDelay) + } + + go p.publishLoop(p.ctx, deploymentUUID) + return nil +} + +func (p *tallymanPublisher) publishLoop(ctx context.Context, deploymentID uuid.UUID) { + defer close(p.done) + + // Start the ticker with the initial delay. We will reset it to the interval + // after the first tick. + ticker := p.clock.NewTicker(p.initialDelay) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + err := p.publish(ctx, deploymentID) + if err != nil { + p.log.Warn(ctx, "publish usage events to tallyman", slog.Error(err)) + } + ticker.Reset(tallymanPublishInterval) + } +} + +// publish publishes usage events to Tallyman in a loop until there is an error +// (or any rejection) or there are no more events to publish. +func (p *tallymanPublisher) publish(ctx context.Context, deploymentID uuid.UUID) error { + for { + publishCtx, publishCtxCancel := context.WithTimeout(ctx, tallymanPublishTimeout) + accepted, err := p.publishOnce(publishCtx, deploymentID) + publishCtxCancel() + if err != nil { + return xerrors.Errorf("publish usage events to tallyman: %w", err) + } + if accepted < tallymanPublishBatchSize { + // We published less than the batch size, so we're done. + return nil + } + } +} + +// publishOnce publishes up to tallymanPublishBatchSize usage events to +// tallyman. It returns the number of successfully published events. +func (p *tallymanPublisher) publishOnce(ctx context.Context, deploymentID uuid.UUID) (int, error) { + licenseJwt, err := p.getBestLicenseJWT(ctx) + if xerrors.Is(err, errUsagePublishingDisabled) { + return 0, nil + } else if err != nil { + return 0, xerrors.Errorf("find usage publishing license: %w", err) + } + + events, err := p.db.SelectUsageEventsForPublishing(ctx, dbtime.Time(p.clock.Now())) + if err != nil { + return 0, xerrors.Errorf("select usage events for publishing: %w", err) + } + if len(events) == 0 { + // No events to publish. + return 0, nil + } + + var ( + eventIDs = make(map[string]struct{}) + tallymanReq = TallymanIngestRequestV1{ + DeploymentID: deploymentID, + Events: make([]TallymanIngestEventV1, 0, len(events)), + } + ) + for _, event := range events { + eventIDs[event.ID] = struct{}{} + tallymanReq.Events = append(tallymanReq.Events, TallymanIngestEventV1{ + ID: event.ID, + EventType: event.EventType, + EventData: event.EventData, + CreatedAt: event.CreatedAt, + }) + } + if len(eventIDs) != len(events) { + // This should never happen due to the unique constraint in the + // database. + return 0, xerrors.Errorf("duplicate event IDs found in events for publishing") + } + + resp, err := p.sendPublishRequest(ctx, licenseJwt, tallymanReq) + allFailed := err != nil + if err != nil { + p.log.Warn(ctx, "failed to send publish request to tallyman", slog.F("count", len(events)), slog.Error(err)) + // Fake a response with all events temporarily rejected. + resp = TallymanIngestResponseV1{ + AcceptedEvents: []TallymanIngestAcceptedEventV1{}, + RejectedEvents: make([]TallymanIngestRejectedEventV1, len(events)), + } + for i, event := range events { + resp.RejectedEvents[i] = TallymanIngestRejectedEventV1{ + ID: event.ID, + Message: fmt.Sprintf("failed to publish to tallyman: %v", err), + Permanent: false, + } + } + } else { + p.log.Debug(ctx, "published usage events to tallyman", slog.F("accepted", len(resp.AcceptedEvents)), slog.F("rejected", len(resp.RejectedEvents))) + } + + if len(resp.AcceptedEvents)+len(resp.RejectedEvents) != len(events) { + p.log.Warn(ctx, "tallyman returned a different number of events than we sent", slog.F("sent", len(events)), slog.F("accepted", len(resp.AcceptedEvents)), slog.F("rejected", len(resp.RejectedEvents))) + } + + var ( + acceptedEvents = make(map[string]*TallymanIngestAcceptedEventV1) + rejectedEvents = make(map[string]*TallymanIngestRejectedEventV1) + ) + for _, event := range resp.AcceptedEvents { + acceptedEvents[event.ID] = &event + } + for _, event := range resp.RejectedEvents { + rejectedEvents[event.ID] = &event + } + + dbUpdate := database.UpdateUsageEventsPostPublishParams{ + Now: dbtime.Time(p.clock.Now()), + IDs: make([]string, len(events)), + FailureMessages: make([]string, len(events)), + SetPublishedAts: make([]bool, len(events)), + } + for i, event := range events { + dbUpdate.IDs[i] = event.ID + if _, ok := acceptedEvents[event.ID]; ok { + dbUpdate.FailureMessages[i] = "" + dbUpdate.SetPublishedAts[i] = true + } else if rejectedEvent, ok := rejectedEvents[event.ID]; ok { + if !allFailed { + // These are all going to have the same message, so don't log + // them. We already logged the overall error above. + p.log.Warn(ctx, "tallyman rejected usage event", slog.F("id", event.ID), slog.F("message", rejectedEvent.Message), slog.F("permanent", rejectedEvent.Permanent)) + } + dbUpdate.FailureMessages[i] = rejectedEvent.Message + dbUpdate.SetPublishedAts[i] = rejectedEvent.Permanent + } else { + // It's not good if this path gets hit, but we'll handle it as if it + // was a temporary rejection. + p.log.Warn(ctx, "tallyman did not include a usage event in the response, considering it temporarily rejected", slog.F("id", event.ID)) + dbUpdate.FailureMessages[i] = "tallyman did not include the event in the response" + dbUpdate.SetPublishedAts[i] = false + } + } + + err = p.db.UpdateUsageEventsPostPublish(ctx, dbUpdate) + if err != nil { + return 0, xerrors.Errorf("update usage events post publish: %w", err) + } + + var returnErr error + if len(resp.RejectedEvents) > 0 { + returnErr = xerrors.New("some events were rejected by tallyman") + } + return len(resp.AcceptedEvents), returnErr +} + +// getBestLicenseJWT returns the best license JWT to use for the request. The +// criteria is as follows: +// - The license must be valid and active (after nbf, before exp) +// - The license must have usage publishing enabled +// The most recently issued (iat) license is chosen. +// +// If no licenses are found or none have usage publishing enabled, +// errUsagePublishingDisabled is returned. +func (p *tallymanPublisher) getBestLicenseJWT(ctx context.Context) (string, error) { + licenses, err := p.db.GetUnexpiredLicenses(ctx) + if err != nil { + return "", xerrors.Errorf("get unexpired licenses: %w", err) + } + if len(licenses) == 0 { + return "", errUsagePublishingDisabled + } + + type licenseJWTWithClaims struct { + Claims *license.Claims + Raw string + } + + var bestLicense licenseJWTWithClaims + for _, dbLicense := range licenses { + claims, err := license.ParseClaims(dbLicense.JWT, p.licenseKeys) + if err != nil { + p.log.Warn(ctx, "failed to parse license claims", slog.F("license_id", dbLicense.ID), slog.Error(err)) + continue + } + + // IssuedAt is verified to be non-nil in license.ParseClaims. + if claims.PublishUsageData && (bestLicense.Claims == nil || claims.IssuedAt.Time.After(bestLicense.Claims.IssuedAt.Time)) { + bestLicense = licenseJWTWithClaims{ + Claims: claims, + Raw: dbLicense.JWT, + } + } + } + + if bestLicense.Raw == "" { + return "", errUsagePublishingDisabled + } + + return bestLicense.Raw, nil +} + +func (p *tallymanPublisher) sendPublishRequest(ctx context.Context, licenseJwt string, req TallymanIngestRequestV1) (TallymanIngestResponseV1, error) { + body, err := json.Marshal(req) + if err != nil { + return TallymanIngestResponseV1{}, err + } + + r, err := http.NewRequestWithContext(ctx, http.MethodPost, p.ingestURL, bytes.NewReader(body)) + if err != nil { + return TallymanIngestResponseV1{}, err + } + r.Header.Set(CoderLicenseJWTHeader, licenseJwt) + + resp, err := p.httpClient.Do(r) + if err != nil { + return TallymanIngestResponseV1{}, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + var errBody TallymanErrorV1 + if err := json.NewDecoder(resp.Body).Decode(&errBody); err != nil { + errBody = TallymanErrorV1{ + Message: fmt.Sprintf("could not decode error response body: %v", err), + } + } + return TallymanIngestResponseV1{}, xerrors.Errorf("unexpected status code %v, error: %s", resp.StatusCode, errBody.Message) + } + + var respBody TallymanIngestResponseV1 + if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil { + return TallymanIngestResponseV1{}, xerrors.Errorf("decode response body: %w", err) + } + + return respBody, nil +} + +// Close implements Publisher. +func (p *tallymanPublisher) Close() error { + p.ctxCancel() + <-p.done + return nil +} + +type TallymanErrorV1 struct { + Message string `json:"message"` +} + +type TallymanIngestRequestV1 struct { + DeploymentID uuid.UUID `json:"deployment_id"` + Events []TallymanIngestEventV1 `json:"events"` +} + +type TallymanIngestEventV1 struct { + ID string `json:"id"` + EventType database.UsageEventType `json:"event_type"` + EventData json.RawMessage `json:"event_data"` + CreatedAt time.Time `json:"created_at"` +} + +type TallymanIngestResponseV1 struct { + AcceptedEvents []TallymanIngestAcceptedEventV1 `json:"accepted_events"` + RejectedEvents []TallymanIngestRejectedEventV1 `json:"rejected_events"` +} + +type TallymanIngestAcceptedEventV1 struct { + ID string `json:"id"` +} + +type TallymanIngestRejectedEventV1 struct { + ID string `json:"id"` + Message string `json:"message"` + Permanent bool `json:"permanent"` +} diff --git a/enterprise/coderd/usage/publisher_test.go b/enterprise/coderd/usage/publisher_test.go new file mode 100644 index 0000000000000..5e2579a4089ba --- /dev/null +++ b/enterprise/coderd/usage/publisher_test.go @@ -0,0 +1,702 @@ +package usage_test + +import ( + "context" + "database/sql" + "encoding/json" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "go.uber.org/mock/gomock" + + "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbmock" + "github.com/coder/coder/v2/coderd/database/dbtestutil" + "github.com/coder/coder/v2/coderd/database/dbtime" + agplusage "github.com/coder/coder/v2/coderd/usage" + "github.com/coder/coder/v2/enterprise/coderd/coderdenttest" + "github.com/coder/coder/v2/enterprise/coderd/usage" + "github.com/coder/coder/v2/testutil" + "github.com/coder/quartz" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, testutil.GoleakOptions...) +} + +// TestIntegration tests the collector and publisher by running them with a real +// database. +func TestIntegration(t *testing.T) { + t.Parallel() + const eventCount = 3 + + ctx := testutil.Context(t, testutil.WaitLong) + log := slogtest.Make(t, nil) + db, _ := dbtestutil.NewDB(t) + clock := quartz.NewMock(t) + deploymentID, licenseJWT := configureDeployment(ctx, t, db) + now := time.Now() + + var ( + calls int64 + handler func(req usage.TallymanIngestRequestV1) any + ) + ingestURL := fakeServer(t, tallymanHandler(t, licenseJWT, func(req usage.TallymanIngestRequestV1) any { + callCount := atomic.AddInt64(&calls, 1) + t.Logf("tallyman backend received call %d", callCount) + assert.Equal(t, deploymentID, req.DeploymentID) + + if handler == nil { + t.Errorf("handler is nil") + return usage.TallymanIngestResponseV1{} + } + return handler(req) + })) + + collector := usage.NewCollector( + usage.CollectorWithClock(clock), + ) + // Insert an old event that should never be published. + clock.Set(now.Add(-31 * 24 * time.Hour)) + err := collector.CollectDiscreteUsageEvent(ctx, db, agplusage.DCManagedAgentsV1{ + Count: 31, + }) + require.NoError(t, err) + + // Insert the events we expect to be published. + clock.Set(now.Add(1 * time.Second)) + for i := 0; i < eventCount; i++ { + clock.Advance(time.Second) + err := collector.CollectDiscreteUsageEvent(ctx, db, agplusage.DCManagedAgentsV1{ + Count: uint64(i + 1), // nolint:gosec // these numbers are tiny and will not overflow + }) + require.NoErrorf(t, err, "collecting event %d", i) + } + + publisher := usage.NewTallymanPublisher(ctx, log, db, + usage.PublisherWithClock(clock), + usage.PublisherWithIngestURL(ingestURL), + usage.PublisherWithLicenseKeys(coderdenttest.Keys), + ) + defer publisher.Close() + + // Start the publisher with a trap. + tickerTrap := clock.Trap().NewTicker() + defer tickerTrap.Close() + startErr := make(chan error) + go func() { + err := publisher.Start() + testutil.RequireSend(ctx, t, startErr, err) + }() + tickerCall := tickerTrap.MustWait(ctx) + tickerCall.MustRelease(ctx) + // The initial duration will always be some time between 5m and 17m. + require.GreaterOrEqual(t, tickerCall.Duration, 5*time.Minute) + require.LessOrEqual(t, tickerCall.Duration, 17*time.Minute) + require.NoError(t, testutil.RequireReceive(ctx, t, startErr)) + + // Set up a trap for the ticker.Reset call. + tickerResetTrap := clock.Trap().TickerReset() + defer tickerResetTrap.Close() + + // Configure the handler for the first publish. This handler will accept the + // first event, temporarily reject the second, and permanently reject the + // third. + var temporarilyRejectedEventID string + handler = func(req usage.TallymanIngestRequestV1) any { + // On the first call, accept the first event, temporarily reject the + // second, and permanently reject the third. + acceptedEvents := make([]usage.TallymanIngestAcceptedEventV1, 1) + rejectedEvents := make([]usage.TallymanIngestRejectedEventV1, 2) + if assert.Len(t, req.Events, eventCount) { + assert.JSONEqf(t, jsoninate(t, agplusage.DCManagedAgentsV1{ + Count: 1, + }), string(req.Events[0].EventData), "event data did not match for event %d", 0) + acceptedEvents[0].ID = req.Events[0].ID + + temporarilyRejectedEventID = req.Events[1].ID + assert.JSONEqf(t, jsoninate(t, agplusage.DCManagedAgentsV1{ + Count: 2, + }), string(req.Events[1].EventData), "event data did not match for event %d", 1) + rejectedEvents[0].ID = req.Events[1].ID + rejectedEvents[0].Message = "temporarily rejected" + rejectedEvents[0].Permanent = false + + assert.JSONEqf(t, jsoninate(t, agplusage.DCManagedAgentsV1{ + Count: 3, + }), string(req.Events[2].EventData), "event data did not match for event %d", 2) + rejectedEvents[1].ID = req.Events[2].ID + rejectedEvents[1].Message = "permanently rejected" + rejectedEvents[1].Permanent = true + } + return usage.TallymanIngestResponseV1{ + AcceptedEvents: acceptedEvents, + RejectedEvents: rejectedEvents, + } + } + + // Advance the clock to the initial tick, which should trigger the first + // publish, then wait for the reset call. The duration will always be 17m + // for resets (only the initial tick is variable). + clock.Advance(tickerCall.Duration) + tickerResetCall := tickerResetTrap.MustWait(ctx) + require.Equal(t, 17*time.Minute, tickerResetCall.Duration) + tickerResetCall.MustRelease(ctx) + + // The publisher should have published the events once. + require.Equal(t, int64(1), atomic.LoadInt64(&calls)) + + // Set the handler for the next publish call. This call should only include + // the temporarily rejected event from earlier. This time we'll accept it. + handler = func(req usage.TallymanIngestRequestV1) any { + assert.Len(t, req.Events, 1) + acceptedEvents := make([]usage.TallymanIngestAcceptedEventV1, len(req.Events)) + for i, event := range req.Events { + assert.Equal(t, temporarilyRejectedEventID, event.ID) + acceptedEvents[i].ID = event.ID + } + return usage.TallymanIngestResponseV1{ + AcceptedEvents: acceptedEvents, + RejectedEvents: []usage.TallymanIngestRejectedEventV1{}, + } + } + + // Advance the clock to the next tick and wait for the reset call. + clock.Advance(tickerResetCall.Duration) + tickerResetCall = tickerResetTrap.MustWait(ctx) + tickerResetCall.MustRelease(ctx) + + // The publisher should have published the events again. + require.Equal(t, int64(2), atomic.LoadInt64(&calls)) + + // There should be no more publish calls after this, so set the handler to + // nil. + handler = nil + + // Advance the clock to the next tick. + clock.Advance(tickerResetCall.Duration) + tickerResetTrap.MustWait(ctx).MustRelease(ctx) + + // No publish should have taken place since there are no more events to + // publish. + require.Equal(t, int64(2), atomic.LoadInt64(&calls)) + + require.NoError(t, publisher.Close()) +} + +func TestPublisherNoEligibleLicenses(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitLong) + log := slogtest.Make(t, nil) + ctrl := gomock.NewController(t) + db := dbmock.NewMockStore(ctrl) + clock := quartz.NewMock(t) + + // Configure the deployment manually. + deploymentID := uuid.New() + db.EXPECT().GetDeploymentID(gomock.Any()).Return(deploymentID.String(), nil).Times(1) + + var calls int64 + ingestURL := fakeServer(t, tallymanHandler(t, "", func(req usage.TallymanIngestRequestV1) any { + atomic.AddInt64(&calls, 1) + return usage.TallymanIngestResponseV1{ + AcceptedEvents: []usage.TallymanIngestAcceptedEventV1{}, + RejectedEvents: []usage.TallymanIngestRejectedEventV1{}, + } + })) + + publisher := usage.NewTallymanPublisher(ctx, log, db, + usage.PublisherWithClock(clock), + usage.PublisherWithIngestURL(ingestURL), + usage.PublisherWithLicenseKeys(coderdenttest.Keys), + ) + defer publisher.Close() + + // Start the publisher with a trap. + tickerTrap := clock.Trap().NewTicker() + defer tickerTrap.Close() + startErr := make(chan error) + go func() { + err := publisher.Start() + testutil.RequireSend(ctx, t, startErr, err) + }() + tickerCall := tickerTrap.MustWait(ctx) + tickerCall.MustRelease(ctx) + require.NoError(t, testutil.RequireReceive(ctx, t, startErr)) + + // Mock zero licenses. + db.EXPECT().GetUnexpiredLicenses(gomock.Any()).Return([]database.License{}, nil).Times(1) + + // Tick and wait for the reset call. + tickerResetTrap := clock.Trap().TickerReset() + defer tickerResetTrap.Close() + clock.Advance(tickerCall.Duration) + tickerResetCall := tickerResetTrap.MustWait(ctx) + tickerResetCall.MustRelease(ctx) + + // The publisher should not have published the events. + require.Equal(t, int64(0), atomic.LoadInt64(&calls)) + + // Mock a single license with usage publishing disabled. + licenseJWT := coderdenttest.GenerateLicense(t, coderdenttest.LicenseOptions{ + PublishUsageData: false, + }) + db.EXPECT().GetUnexpiredLicenses(gomock.Any()).Return([]database.License{ + { + ID: 1, + JWT: licenseJWT, + UploadedAt: dbtime.Now(), + Exp: dbtime.Now().Add(48 * time.Hour), // fake + UUID: uuid.New(), + }, + }, nil).Times(1) + + // Tick and wait for the reset call. + clock.Advance(tickerResetCall.Duration) + tickerResetTrap.MustWait(ctx).MustRelease(ctx) + + // The publisher should still not have published the events. + require.Equal(t, int64(0), atomic.LoadInt64(&calls)) +} + +// TestPublisherClaimExpiry tests the claim query to ensure that events are not +// claimed if they've recently been claimed by another publisher. +func TestPublisherClaimExpiry(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitLong) + log := slogtest.Make(t, nil) + db, _ := dbtestutil.NewDB(t) + clock := quartz.NewMock(t) + _, licenseJWT := configureDeployment(ctx, t, db) + now := time.Now() + + var calls int64 + ingestURL := fakeServer(t, tallymanHandler(t, licenseJWT, func(req usage.TallymanIngestRequestV1) any { + atomic.AddInt64(&calls, 1) + return tallymanAcceptAllHandler(req) + })) + + collector := usage.NewCollector( + usage.CollectorWithClock(clock), + ) + + publisher := usage.NewTallymanPublisher(ctx, log, db, + usage.PublisherWithClock(clock), + usage.PublisherWithIngestURL(ingestURL), + usage.PublisherWithLicenseKeys(coderdenttest.Keys), + usage.PublisherWithInitialDelay(17*time.Minute), + ) + defer publisher.Close() + + // Create an event that was claimed 1h-18m ago. The ticker has a forced + // delay of 17m in this test. + clock.Set(now) + err := collector.CollectDiscreteUsageEvent(ctx, db, agplusage.DCManagedAgentsV1{ + Count: 1, + }) + require.NoError(t, err) + // Claim the event in the past. Claiming it this way via the database + // directly means it won't be marked as published or unclaimed. + events, err := db.SelectUsageEventsForPublishing(ctx, now.Add(-42*time.Minute)) + require.NoError(t, err) + require.Len(t, events, 1) + + // Start the publisher with a trap. + tickerTrap := clock.Trap().NewTicker() + defer tickerTrap.Close() + startErr := make(chan error) + go func() { + err := publisher.Start() + testutil.RequireSend(ctx, t, startErr, err) + }() + tickerCall := tickerTrap.MustWait(ctx) + require.Equal(t, 17*time.Minute, tickerCall.Duration) + tickerCall.MustRelease(ctx) + require.NoError(t, testutil.RequireReceive(ctx, t, startErr)) + + // Set up a trap for the ticker.Reset call. + tickerResetTrap := clock.Trap().TickerReset() + defer tickerResetTrap.Close() + + // Advance the clock to the initial tick, which should trigger the first + // publish, then wait for the reset call. The duration will always be 17m + // for resets (only the initial tick is variable). + clock.Advance(tickerCall.Duration) + tickerResetCall := tickerResetTrap.MustWait(ctx) + require.Equal(t, 17*time.Minute, tickerResetCall.Duration) + tickerResetCall.MustRelease(ctx) + + // No events should have been published since none are eligible. + require.Equal(t, int64(0), atomic.LoadInt64(&calls)) + + // Advance the clock to the next tick and wait for the reset call. + clock.Advance(tickerResetCall.Duration) + tickerResetCall = tickerResetTrap.MustWait(ctx) + tickerResetCall.MustRelease(ctx) + + // The publisher should have published the event, as it's now eligible. + require.Equal(t, int64(1), atomic.LoadInt64(&calls)) +} + +// TestPublisherMissingEvents tests that the publisher notices events that are +// not returned by the Tallyman server and marks them as temporarily rejected. +func TestPublisherMissingEvents(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitLong) + log := slogtest.Make(t, nil) + ctrl := gomock.NewController(t) + db := dbmock.NewMockStore(ctrl) + _, licenseJWT := configureMockDeployment(t, db) + clock := quartz.NewMock(t) + now := time.Now() + clock.Set(now) + + var calls int64 + ingestURL := fakeServer(t, tallymanHandler(t, licenseJWT, func(req usage.TallymanIngestRequestV1) any { + atomic.AddInt64(&calls, 1) + return usage.TallymanIngestResponseV1{ + AcceptedEvents: []usage.TallymanIngestAcceptedEventV1{}, + RejectedEvents: []usage.TallymanIngestRejectedEventV1{}, + } + })) + + publisher := usage.NewTallymanPublisher(ctx, log, db, + usage.PublisherWithClock(clock), + usage.PublisherWithIngestURL(ingestURL), + usage.PublisherWithLicenseKeys(coderdenttest.Keys), + ) + + // Expect the publisher to call SelectUsageEventsForPublishing, followed by + // UpdateUsageEventsPostPublish. + events := []database.UsageEvent{ + { + ID: uuid.New().String(), + EventType: database.UsageEventTypeDcManagedAgentsV1, + EventData: []byte(jsoninate(t, agplusage.DCManagedAgentsV1{ + Count: 1, + })), + CreatedAt: now, + PublishedAt: sql.NullTime{}, + PublishStartedAt: sql.NullTime{}, + FailureMessage: sql.NullString{}, + }, + } + db.EXPECT().SelectUsageEventsForPublishing(gomock.Any(), gomock.Any()).Return(events, nil).Times(1) + db.EXPECT().UpdateUsageEventsPostPublish(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, params database.UpdateUsageEventsPostPublishParams) error { + assert.Equal(t, []string{events[0].ID}, params.IDs) + assert.Equal(t, []string{"tallyman did not include the event in the response"}, params.FailureMessages) + assert.Equal(t, []bool{false}, params.SetPublishedAts) + return nil + }, + ).Times(1) + + // Start the publisher with a trap. + tickerTrap := clock.Trap().NewTicker() + defer tickerTrap.Close() + startErr := make(chan error) + go func() { + err := publisher.Start() + testutil.RequireSend(ctx, t, startErr, err) + }() + tickerCall := tickerTrap.MustWait(ctx) + tickerCall.MustRelease(ctx) + require.NoError(t, testutil.RequireReceive(ctx, t, startErr)) + + // Tick and wait for the reset call. + tickerResetTrap := clock.Trap().TickerReset() + defer tickerResetTrap.Close() + clock.Advance(tickerCall.Duration) + tickerResetTrap.MustWait(ctx).MustRelease(ctx) + + // The publisher should have published the events once. + require.Equal(t, int64(1), atomic.LoadInt64(&calls)) + + require.NoError(t, publisher.Close()) +} + +func TestPublisherLicenseSelection(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitLong) + log := slogtest.Make(t, nil) + ctrl := gomock.NewController(t) + db := dbmock.NewMockStore(ctrl) + clock := quartz.NewMock(t) + now := time.Now() + + // Configure the deployment manually. + deploymentID := uuid.New() + db.EXPECT().GetDeploymentID(gomock.Any()).Return(deploymentID.String(), nil).Times(1) + + // Insert multiple licenses: + // 1. PublishUsageData false, iat 30m ago (ineligible, publish not enabled) + // 2. PublishUsageData true, iat 1h ago (eligible) + // 3. PublishUsageData true, iat 30m ago, exp 10m ago (ineligible, expired) + badLicense1 := coderdenttest.GenerateLicense(t, coderdenttest.LicenseOptions{ + PublishUsageData: false, + IssuedAt: now.Add(-30 * time.Minute), + }) + expectedLicense := coderdenttest.GenerateLicense(t, coderdenttest.LicenseOptions{ + PublishUsageData: true, + IssuedAt: now.Add(-1 * time.Hour), + }) + badLicense2 := coderdenttest.GenerateLicense(t, coderdenttest.LicenseOptions{ + PublishUsageData: true, + IssuedAt: now.Add(-30 * time.Minute), + ExpiresAt: now.Add(-10 * time.Minute), + }) + // GetUnexpiredLicenses is not supposed to return expired licenses, but for + // the purposes of this test we're going to do it anyway. + db.EXPECT().GetUnexpiredLicenses(gomock.Any()).Return([]database.License{ + { + ID: 1, + JWT: badLicense1, + Exp: now.Add(48 * time.Hour), // fake, should be ignored by publisher code anyway + UUID: uuid.New(), + UploadedAt: now, + }, + { + ID: 2, + JWT: expectedLicense, + Exp: now.Add(48 * time.Hour), // fake + UUID: uuid.New(), + UploadedAt: now, + }, + { + ID: 3, + JWT: badLicense2, + Exp: now.Add(48 * time.Hour), // fake + UUID: uuid.New(), + UploadedAt: now, + }, + }, nil) + + var calls int64 + ingestURL := fakeServer(t, tallymanHandler(t, expectedLicense, func(req usage.TallymanIngestRequestV1) any { + atomic.AddInt64(&calls, 1) + assert.Equal(t, deploymentID, req.DeploymentID) + return tallymanAcceptAllHandler(req) + })) + + publisher := usage.NewTallymanPublisher(ctx, log, db, + usage.PublisherWithClock(clock), + usage.PublisherWithIngestURL(ingestURL), + usage.PublisherWithLicenseKeys(coderdenttest.Keys), + ) + defer publisher.Close() + + // Start the publisher with a trap. + tickerTrap := clock.Trap().NewTicker() + defer tickerTrap.Close() + startErr := make(chan error) + go func() { + err := publisher.Start() + testutil.RequireSend(ctx, t, startErr, err) + }() + tickerCall := tickerTrap.MustWait(ctx) + tickerCall.MustRelease(ctx) + require.NoError(t, testutil.RequireReceive(ctx, t, startErr)) + + // Mock events to be published. + events := []database.UsageEvent{ + { + ID: uuid.New().String(), + EventType: database.UsageEventTypeDcManagedAgentsV1, + EventData: []byte(jsoninate(t, agplusage.DCManagedAgentsV1{ + Count: 1, + })), + }, + } + db.EXPECT().SelectUsageEventsForPublishing(gomock.Any(), gomock.Any()).Return(events, nil).Times(1) + db.EXPECT().UpdateUsageEventsPostPublish(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, params database.UpdateUsageEventsPostPublishParams) error { + assert.Equal(t, []string{events[0].ID}, params.IDs) + assert.Equal(t, []string{""}, params.FailureMessages) + assert.Equal(t, []bool{true}, params.SetPublishedAts) + return nil + }, + ).Times(1) + + // Tick and wait for the reset call. + tickerResetTrap := clock.Trap().TickerReset() + defer tickerResetTrap.Close() + clock.Advance(tickerCall.Duration) + tickerResetTrap.MustWait(ctx).MustRelease(ctx) + + // The publisher should have published the events once. + require.Equal(t, int64(1), atomic.LoadInt64(&calls)) +} + +func TestPublisherTallymanError(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitLong) + log := slogtest.Make(t, nil) + ctrl := gomock.NewController(t) + db := dbmock.NewMockStore(ctrl) + clock := quartz.NewMock(t) + now := time.Now() + clock.Set(now) + + _, licenseJWT := configureMockDeployment(t, db) + const errorMessage = "tallyman error" + var calls int64 + ingestURL := fakeServer(t, tallymanHandler(t, licenseJWT, func(req usage.TallymanIngestRequestV1) any { + atomic.AddInt64(&calls, 1) + return usage.TallymanErrorV1{ + Message: errorMessage, + } + })) + + publisher := usage.NewTallymanPublisher(ctx, log, db, + usage.PublisherWithClock(clock), + usage.PublisherWithIngestURL(ingestURL), + usage.PublisherWithLicenseKeys(coderdenttest.Keys), + ) + defer publisher.Close() + + // Start the publisher with a trap. + tickerTrap := clock.Trap().NewTicker() + defer tickerTrap.Close() + startErr := make(chan error) + go func() { + err := publisher.Start() + testutil.RequireSend(ctx, t, startErr, err) + }() + tickerCall := tickerTrap.MustWait(ctx) + tickerCall.MustRelease(ctx) + require.NoError(t, testutil.RequireReceive(ctx, t, startErr)) + + // Mock events to be published. + events := []database.UsageEvent{ + { + ID: uuid.New().String(), + EventType: database.UsageEventTypeDcManagedAgentsV1, + EventData: []byte(jsoninate(t, agplusage.DCManagedAgentsV1{ + Count: 1, + })), + }, + } + db.EXPECT().SelectUsageEventsForPublishing(gomock.Any(), gomock.Any()).Return(events, nil).Times(1) + db.EXPECT().UpdateUsageEventsPostPublish(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, params database.UpdateUsageEventsPostPublishParams) error { + assert.Equal(t, []string{events[0].ID}, params.IDs) + assert.Contains(t, params.FailureMessages[0], errorMessage) + assert.Equal(t, []bool{false}, params.SetPublishedAts) + return nil + }, + ).Times(1) + + // Tick and wait for the reset call. + tickerResetTrap := clock.Trap().TickerReset() + defer tickerResetTrap.Close() + clock.Advance(tickerCall.Duration) + tickerResetTrap.MustWait(ctx).MustRelease(ctx) + + // The publisher should have published the events once. + require.Equal(t, int64(1), atomic.LoadInt64(&calls)) +} + +func jsoninate(t *testing.T, v any) string { + t.Helper() + buf, err := json.Marshal(v) + require.NoError(t, err) + return string(buf) +} + +func configureDeployment(ctx context.Context, t *testing.T, db database.Store) (uuid.UUID, string) { + t.Helper() + deploymentID := uuid.New() + err := db.InsertDeploymentID(ctx, deploymentID.String()) + require.NoError(t, err) + + licenseRaw := coderdenttest.GenerateLicense(t, coderdenttest.LicenseOptions{ + PublishUsageData: true, + }) + _, err = db.InsertLicense(ctx, database.InsertLicenseParams{ + UploadedAt: dbtime.Now(), + JWT: licenseRaw, + Exp: dbtime.Now().Add(48 * time.Hour), + UUID: uuid.New(), + }) + require.NoError(t, err) + + return deploymentID, licenseRaw +} + +func configureMockDeployment(t *testing.T, db *dbmock.MockStore) (uuid.UUID, string) { + t.Helper() + deploymentID := uuid.New() + db.EXPECT().GetDeploymentID(gomock.Any()).Return(deploymentID.String(), nil).Times(1) + + licenseRaw := coderdenttest.GenerateLicense(t, coderdenttest.LicenseOptions{ + PublishUsageData: true, + }) + db.EXPECT().GetUnexpiredLicenses(gomock.Any()).Return([]database.License{ + { + ID: 1, + UploadedAt: dbtime.Now(), + JWT: licenseRaw, + Exp: dbtime.Now().Add(48 * time.Hour), + UUID: uuid.New(), + }, + }, nil) + + return deploymentID, licenseRaw +} + +func fakeServer(t *testing.T, handler http.Handler) string { + t.Helper() + server := httptest.NewServer(handler) + t.Cleanup(server.Close) + return server.URL +} + +func tallymanHandler(t *testing.T, expectLicenseJWT string, handler func(req usage.TallymanIngestRequestV1) any) http.Handler { + t.Helper() + return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + t.Helper() + licenseJWT := r.Header.Get(usage.CoderLicenseJWTHeader) + if expectLicenseJWT != "" && !assert.Equal(t, expectLicenseJWT, licenseJWT, "license JWT in request did not match") { + rw.WriteHeader(http.StatusUnauthorized) + err := json.NewEncoder(rw).Encode(usage.TallymanErrorV1{ + Message: "license JWT in request did not match", + }) + require.NoError(t, err) + return + } + + var req usage.TallymanIngestRequestV1 + err := json.NewDecoder(r.Body).Decode(&req) + require.NoError(t, err) + + resp := handler(req) + switch resp.(type) { + case usage.TallymanErrorV1: + rw.WriteHeader(http.StatusInternalServerError) + default: + rw.WriteHeader(http.StatusOK) + } + err = json.NewEncoder(rw).Encode(resp) + require.NoError(t, err) + }) +} + +func tallymanAcceptAllHandler(req usage.TallymanIngestRequestV1) usage.TallymanIngestResponseV1 { + acceptedEvents := make([]usage.TallymanIngestAcceptedEventV1, len(req.Events)) + for i, event := range req.Events { + acceptedEvents[i].ID = event.ID + } + + return usage.TallymanIngestResponseV1{ + AcceptedEvents: acceptedEvents, + RejectedEvents: []usage.TallymanIngestRejectedEventV1{}, + } +}