diff --git a/coderd/database/dbauthz/dbauthz.go b/coderd/database/dbauthz/dbauthz.go index cc638cceb9dab..098922527c81f 100644 --- a/coderd/database/dbauthz/dbauthz.go +++ b/coderd/database/dbauthz/dbauthz.go @@ -817,6 +817,13 @@ func (q *querier) AcquireLock(ctx context.Context, id int64) error { return q.db.AcquireLock(ctx, id) } +func (q *querier) AcquireNotificationMessages(ctx context.Context, arg database.AcquireNotificationMessagesParams) ([]database.AcquireNotificationMessagesRow, error) { + if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil { + return nil, err + } + return q.db.AcquireNotificationMessages(ctx, arg) +} + // TODO: We need to create a ProvisionerJob resource type func (q *querier) AcquireProvisionerJob(ctx context.Context, arg database.AcquireProvisionerJobParams) (database.ProvisionerJob, error) { // if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil { @@ -861,6 +868,20 @@ func (q *querier) BatchUpdateWorkspaceLastUsedAt(ctx context.Context, arg databa return q.db.BatchUpdateWorkspaceLastUsedAt(ctx, arg) } +func (q *querier) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) { + if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil { + return 0, err + } + return q.db.BulkMarkNotificationMessagesFailed(ctx, arg) +} + +func (q *querier) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) { + if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil { + return 0, err + } + return q.db.BulkMarkNotificationMessagesSent(ctx, arg) +} + func (q *querier) CleanTailnetCoordinators(ctx context.Context) error { if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceTailnetCoordinator); err != nil { return err @@ -1010,6 +1031,13 @@ func (q *querier) DeleteOAuth2ProviderAppTokensByAppAndUserID(ctx context.Contex return q.db.DeleteOAuth2ProviderAppTokensByAppAndUserID(ctx, arg) } +func (q *querier) DeleteOldNotificationMessages(ctx context.Context) error { + if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil { + return err + } + return q.db.DeleteOldNotificationMessages(ctx) +} + func (q *querier) DeleteOldProvisionerDaemons(ctx context.Context) error { if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil { return err @@ -1114,6 +1142,13 @@ func (q *querier) DeleteWorkspaceAgentPortSharesByTemplate(ctx context.Context, return q.db.DeleteWorkspaceAgentPortSharesByTemplate(ctx, templateID) } +func (q *querier) EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) { + if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil { + return database.NotificationMessage{}, err + } + return q.db.EnqueueNotificationMessage(ctx, arg) +} + func (q *querier) FavoriteWorkspace(ctx context.Context, id uuid.UUID) error { fetch := func(ctx context.Context, id uuid.UUID) (database.Workspace, error) { return q.db.GetWorkspaceByID(ctx, id) @@ -1121,6 +1156,13 @@ func (q *querier) FavoriteWorkspace(ctx context.Context, id uuid.UUID) error { return update(q.log, q.auth, fetch, q.db.FavoriteWorkspace)(ctx, id) } +func (q *querier) FetchNewMessageMetadata(ctx context.Context, arg database.FetchNewMessageMetadataParams) (database.FetchNewMessageMetadataRow, error) { + if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceSystem); err != nil { + return database.FetchNewMessageMetadataRow{}, err + } + return q.db.FetchNewMessageMetadata(ctx, arg) +} + func (q *querier) GetAPIKeyByID(ctx context.Context, id string) (database.APIKey, error) { return fetch(q.log, q.auth, q.db.GetAPIKeyByID)(ctx, id) } diff --git a/coderd/database/dbauthz/dbauthz_test.go b/coderd/database/dbauthz/dbauthz_test.go index 9288f52260d78..837cc1c9f69dc 100644 --- a/coderd/database/dbauthz/dbauthz_test.go +++ b/coderd/database/dbauthz/dbauthz_test.go @@ -2467,6 +2467,32 @@ func (s *MethodTestSuite) TestSystemFunctions() { AgentID: uuid.New(), }).Asserts(tpl, policy.ActionCreate) })) + s.Run("AcquireNotificationMessages", s.Subtest(func(db database.Store, check *expects) { + // TODO: update this test once we have a specific role for notifications + check.Args(database.AcquireNotificationMessagesParams{}).Asserts(rbac.ResourceSystem, policy.ActionUpdate) + })) + s.Run("BulkMarkNotificationMessagesFailed", s.Subtest(func(db database.Store, check *expects) { + // TODO: update this test once we have a specific role for notifications + check.Args(database.BulkMarkNotificationMessagesFailedParams{}).Asserts(rbac.ResourceSystem, policy.ActionUpdate) + })) + s.Run("BulkMarkNotificationMessagesSent", s.Subtest(func(db database.Store, check *expects) { + // TODO: update this test once we have a specific role for notifications + check.Args(database.BulkMarkNotificationMessagesSentParams{}).Asserts(rbac.ResourceSystem, policy.ActionUpdate) + })) + s.Run("DeleteOldNotificationMessages", s.Subtest(func(db database.Store, check *expects) { + // TODO: update this test once we have a specific role for notifications + check.Args().Asserts(rbac.ResourceSystem, policy.ActionDelete) + })) + s.Run("EnqueueNotificationMessage", s.Subtest(func(db database.Store, check *expects) { + // TODO: update this test once we have a specific role for notifications + check.Args(database.EnqueueNotificationMessageParams{ + Method: database.NotificationMethodWebhook, + }).Asserts(rbac.ResourceSystem, policy.ActionCreate) + })) + s.Run("FetchNewMessageMetadata", s.Subtest(func(db database.Store, check *expects) { + // TODO: update this test once we have a specific role for notifications + check.Args(database.FetchNewMessageMetadataParams{}).Asserts(rbac.ResourceSystem, policy.ActionRead) + })) } func (s *MethodTestSuite) TestOAuth2ProviderApps() { diff --git a/coderd/database/dbmem/dbmem.go b/coderd/database/dbmem/dbmem.go index 830ce9e7241f4..6258e69888aca 100644 --- a/coderd/database/dbmem/dbmem.go +++ b/coderd/database/dbmem/dbmem.go @@ -907,6 +907,15 @@ func (*FakeQuerier) AcquireLock(_ context.Context, _ int64) error { return xerrors.New("AcquireLock must only be called within a transaction") } +func (*FakeQuerier) AcquireNotificationMessages(_ context.Context, arg database.AcquireNotificationMessagesParams) ([]database.AcquireNotificationMessagesRow, error) { + err := validateDatabaseType(arg) + if err != nil { + return nil, err + } + // nolint:nilnil // Irrelevant. + return nil, nil +} + func (q *FakeQuerier) AcquireProvisionerJob(_ context.Context, arg database.AcquireProvisionerJobParams) (database.ProvisionerJob, error) { if err := validateDatabaseType(arg); err != nil { return database.ProvisionerJob{}, err @@ -1169,6 +1178,22 @@ func (q *FakeQuerier) BatchUpdateWorkspaceLastUsedAt(_ context.Context, arg data return nil } +func (*FakeQuerier) BulkMarkNotificationMessagesFailed(_ context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) { + err := validateDatabaseType(arg) + if err != nil { + return 0, err + } + return -1, nil +} + +func (*FakeQuerier) BulkMarkNotificationMessagesSent(_ context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) { + err := validateDatabaseType(arg) + if err != nil { + return 0, err + } + return -1, nil +} + func (*FakeQuerier) CleanTailnetCoordinators(_ context.Context) error { return ErrUnimplemented } @@ -1504,6 +1529,10 @@ func (q *FakeQuerier) DeleteOAuth2ProviderAppTokensByAppAndUserID(_ context.Cont return nil } +func (*FakeQuerier) DeleteOldNotificationMessages(_ context.Context) error { + return nil +} + func (q *FakeQuerier) DeleteOldProvisionerDaemons(_ context.Context) error { q.mutex.Lock() defer q.mutex.Unlock() @@ -1737,6 +1766,14 @@ func (q *FakeQuerier) DeleteWorkspaceAgentPortSharesByTemplate(_ context.Context return nil } +func (*FakeQuerier) EnqueueNotificationMessage(_ context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) { + err := validateDatabaseType(arg) + if err != nil { + return database.NotificationMessage{}, err + } + return database.NotificationMessage{}, nil +} + func (q *FakeQuerier) FavoriteWorkspace(_ context.Context, arg uuid.UUID) error { err := validateDatabaseType(arg) if err != nil { @@ -1756,6 +1793,14 @@ func (q *FakeQuerier) FavoriteWorkspace(_ context.Context, arg uuid.UUID) error return nil } +func (*FakeQuerier) FetchNewMessageMetadata(_ context.Context, arg database.FetchNewMessageMetadataParams) (database.FetchNewMessageMetadataRow, error) { + err := validateDatabaseType(arg) + if err != nil { + return database.FetchNewMessageMetadataRow{}, err + } + return database.FetchNewMessageMetadataRow{}, nil +} + func (q *FakeQuerier) GetAPIKeyByID(_ context.Context, id string) (database.APIKey, error) { q.mutex.RLock() defer q.mutex.RUnlock() diff --git a/coderd/database/dbmetrics/dbmetrics.go b/coderd/database/dbmetrics/dbmetrics.go index e45072cd71cdb..fbaf7d4fc0b4e 100644 --- a/coderd/database/dbmetrics/dbmetrics.go +++ b/coderd/database/dbmetrics/dbmetrics.go @@ -88,6 +88,13 @@ func (m metricsStore) AcquireLock(ctx context.Context, pgAdvisoryXactLock int64) return err } +func (m metricsStore) AcquireNotificationMessages(ctx context.Context, arg database.AcquireNotificationMessagesParams) ([]database.AcquireNotificationMessagesRow, error) { + start := time.Now() + r0, r1 := m.s.AcquireNotificationMessages(ctx, arg) + m.queryLatencies.WithLabelValues("AcquireNotificationMessages").Observe(time.Since(start).Seconds()) + return r0, r1 +} + func (m metricsStore) AcquireProvisionerJob(ctx context.Context, arg database.AcquireProvisionerJobParams) (database.ProvisionerJob, error) { start := time.Now() provisionerJob, err := m.s.AcquireProvisionerJob(ctx, arg) @@ -123,6 +130,20 @@ func (m metricsStore) BatchUpdateWorkspaceLastUsedAt(ctx context.Context, arg da return r0 } +func (m metricsStore) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) { + start := time.Now() + r0, r1 := m.s.BulkMarkNotificationMessagesFailed(ctx, arg) + m.queryLatencies.WithLabelValues("BulkMarkNotificationMessagesFailed").Observe(time.Since(start).Seconds()) + return r0, r1 +} + +func (m metricsStore) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) { + start := time.Now() + r0, r1 := m.s.BulkMarkNotificationMessagesSent(ctx, arg) + m.queryLatencies.WithLabelValues("BulkMarkNotificationMessagesSent").Observe(time.Since(start).Seconds()) + return r0, r1 +} + func (m metricsStore) CleanTailnetCoordinators(ctx context.Context) error { start := time.Now() err := m.s.CleanTailnetCoordinators(ctx) @@ -263,6 +284,13 @@ func (m metricsStore) DeleteOAuth2ProviderAppTokensByAppAndUserID(ctx context.Co return r0 } +func (m metricsStore) DeleteOldNotificationMessages(ctx context.Context) error { + start := time.Now() + r0 := m.s.DeleteOldNotificationMessages(ctx) + m.queryLatencies.WithLabelValues("DeleteOldNotificationMessages").Observe(time.Since(start).Seconds()) + return r0 +} + func (m metricsStore) DeleteOldProvisionerDaemons(ctx context.Context) error { start := time.Now() r0 := m.s.DeleteOldProvisionerDaemons(ctx) @@ -354,6 +382,13 @@ func (m metricsStore) DeleteWorkspaceAgentPortSharesByTemplate(ctx context.Conte return r0 } +func (m metricsStore) EnqueueNotificationMessage(ctx context.Context, arg database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) { + start := time.Now() + r0, r1 := m.s.EnqueueNotificationMessage(ctx, arg) + m.queryLatencies.WithLabelValues("EnqueueNotificationMessage").Observe(time.Since(start).Seconds()) + return r0, r1 +} + func (m metricsStore) FavoriteWorkspace(ctx context.Context, arg uuid.UUID) error { start := time.Now() r0 := m.s.FavoriteWorkspace(ctx, arg) @@ -361,6 +396,13 @@ func (m metricsStore) FavoriteWorkspace(ctx context.Context, arg uuid.UUID) erro return r0 } +func (m metricsStore) FetchNewMessageMetadata(ctx context.Context, arg database.FetchNewMessageMetadataParams) (database.FetchNewMessageMetadataRow, error) { + start := time.Now() + r0, r1 := m.s.FetchNewMessageMetadata(ctx, arg) + m.queryLatencies.WithLabelValues("FetchNewMessageMetadata").Observe(time.Since(start).Seconds()) + return r0, r1 +} + func (m metricsStore) GetAPIKeyByID(ctx context.Context, id string) (database.APIKey, error) { start := time.Now() apiKey, err := m.s.GetAPIKeyByID(ctx, id) diff --git a/coderd/database/dbmock/dbmock.go b/coderd/database/dbmock/dbmock.go index 4b952461c44a6..7f00a57587216 100644 --- a/coderd/database/dbmock/dbmock.go +++ b/coderd/database/dbmock/dbmock.go @@ -58,6 +58,21 @@ func (mr *MockStoreMockRecorder) AcquireLock(arg0, arg1 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcquireLock", reflect.TypeOf((*MockStore)(nil).AcquireLock), arg0, arg1) } +// AcquireNotificationMessages mocks base method. +func (m *MockStore) AcquireNotificationMessages(arg0 context.Context, arg1 database.AcquireNotificationMessagesParams) ([]database.AcquireNotificationMessagesRow, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AcquireNotificationMessages", arg0, arg1) + ret0, _ := ret[0].([]database.AcquireNotificationMessagesRow) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AcquireNotificationMessages indicates an expected call of AcquireNotificationMessages. +func (mr *MockStoreMockRecorder) AcquireNotificationMessages(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcquireNotificationMessages", reflect.TypeOf((*MockStore)(nil).AcquireNotificationMessages), arg0, arg1) +} + // AcquireProvisionerJob mocks base method. func (m *MockStore) AcquireProvisionerJob(arg0 context.Context, arg1 database.AcquireProvisionerJobParams) (database.ProvisionerJob, error) { m.ctrl.T.Helper() @@ -131,6 +146,36 @@ func (mr *MockStoreMockRecorder) BatchUpdateWorkspaceLastUsedAt(arg0, arg1 any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchUpdateWorkspaceLastUsedAt", reflect.TypeOf((*MockStore)(nil).BatchUpdateWorkspaceLastUsedAt), arg0, arg1) } +// BulkMarkNotificationMessagesFailed mocks base method. +func (m *MockStore) BulkMarkNotificationMessagesFailed(arg0 context.Context, arg1 database.BulkMarkNotificationMessagesFailedParams) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BulkMarkNotificationMessagesFailed", arg0, arg1) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BulkMarkNotificationMessagesFailed indicates an expected call of BulkMarkNotificationMessagesFailed. +func (mr *MockStoreMockRecorder) BulkMarkNotificationMessagesFailed(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BulkMarkNotificationMessagesFailed", reflect.TypeOf((*MockStore)(nil).BulkMarkNotificationMessagesFailed), arg0, arg1) +} + +// BulkMarkNotificationMessagesSent mocks base method. +func (m *MockStore) BulkMarkNotificationMessagesSent(arg0 context.Context, arg1 database.BulkMarkNotificationMessagesSentParams) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BulkMarkNotificationMessagesSent", arg0, arg1) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BulkMarkNotificationMessagesSent indicates an expected call of BulkMarkNotificationMessagesSent. +func (mr *MockStoreMockRecorder) BulkMarkNotificationMessagesSent(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BulkMarkNotificationMessagesSent", reflect.TypeOf((*MockStore)(nil).BulkMarkNotificationMessagesSent), arg0, arg1) +} + // CleanTailnetCoordinators mocks base method. func (m *MockStore) CleanTailnetCoordinators(arg0 context.Context) error { m.ctrl.T.Helper() @@ -413,6 +458,20 @@ func (mr *MockStoreMockRecorder) DeleteOAuth2ProviderAppTokensByAppAndUserID(arg return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteOAuth2ProviderAppTokensByAppAndUserID", reflect.TypeOf((*MockStore)(nil).DeleteOAuth2ProviderAppTokensByAppAndUserID), arg0, arg1) } +// DeleteOldNotificationMessages mocks base method. +func (m *MockStore) DeleteOldNotificationMessages(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteOldNotificationMessages", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteOldNotificationMessages indicates an expected call of DeleteOldNotificationMessages. +func (mr *MockStoreMockRecorder) DeleteOldNotificationMessages(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteOldNotificationMessages", reflect.TypeOf((*MockStore)(nil).DeleteOldNotificationMessages), arg0) +} + // DeleteOldProvisionerDaemons mocks base method. func (m *MockStore) DeleteOldProvisionerDaemons(arg0 context.Context) error { m.ctrl.T.Helper() @@ -599,6 +658,21 @@ func (mr *MockStoreMockRecorder) DeleteWorkspaceAgentPortSharesByTemplate(arg0, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkspaceAgentPortSharesByTemplate", reflect.TypeOf((*MockStore)(nil).DeleteWorkspaceAgentPortSharesByTemplate), arg0, arg1) } +// EnqueueNotificationMessage mocks base method. +func (m *MockStore) EnqueueNotificationMessage(arg0 context.Context, arg1 database.EnqueueNotificationMessageParams) (database.NotificationMessage, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EnqueueNotificationMessage", arg0, arg1) + ret0, _ := ret[0].(database.NotificationMessage) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// EnqueueNotificationMessage indicates an expected call of EnqueueNotificationMessage. +func (mr *MockStoreMockRecorder) EnqueueNotificationMessage(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnqueueNotificationMessage", reflect.TypeOf((*MockStore)(nil).EnqueueNotificationMessage), arg0, arg1) +} + // FavoriteWorkspace mocks base method. func (m *MockStore) FavoriteWorkspace(arg0 context.Context, arg1 uuid.UUID) error { m.ctrl.T.Helper() @@ -613,6 +687,21 @@ func (mr *MockStoreMockRecorder) FavoriteWorkspace(arg0, arg1 any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FavoriteWorkspace", reflect.TypeOf((*MockStore)(nil).FavoriteWorkspace), arg0, arg1) } +// FetchNewMessageMetadata mocks base method. +func (m *MockStore) FetchNewMessageMetadata(arg0 context.Context, arg1 database.FetchNewMessageMetadataParams) (database.FetchNewMessageMetadataRow, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchNewMessageMetadata", arg0, arg1) + ret0, _ := ret[0].(database.FetchNewMessageMetadataRow) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchNewMessageMetadata indicates an expected call of FetchNewMessageMetadata. +func (mr *MockStoreMockRecorder) FetchNewMessageMetadata(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchNewMessageMetadata", reflect.TypeOf((*MockStore)(nil).FetchNewMessageMetadata), arg0, arg1) +} + // GetAPIKeyByID mocks base method. func (m *MockStore) GetAPIKeyByID(arg0 context.Context, arg1 string) (database.APIKey, error) { m.ctrl.T.Helper() diff --git a/coderd/database/dbpurge/dbpurge.go b/coderd/database/dbpurge/dbpurge.go index a6ad0a125d5f2..2bcfefdca79ff 100644 --- a/coderd/database/dbpurge/dbpurge.go +++ b/coderd/database/dbpurge/dbpurge.go @@ -58,6 +58,9 @@ func New(ctx context.Context, logger slog.Logger, db database.Store) io.Closer { if err := tx.DeleteOldProvisionerDaemons(ctx); err != nil { return xerrors.Errorf("failed to delete old provisioner daemons: %w", err) } + if err := tx.DeleteOldNotificationMessages(ctx); err != nil { + return xerrors.Errorf("failed to delete old notification messages: %w", err) + } logger.Info(ctx, "purged old database entries", slog.F("duration", time.Since(start))) diff --git a/coderd/database/dump.sql b/coderd/database/dump.sql index 0ca4c7ac18c99..0b51a6c300205 100644 --- a/coderd/database/dump.sql +++ b/coderd/database/dump.sql @@ -78,6 +78,20 @@ CREATE TYPE name_organization_pair AS ( organization_id uuid ); +CREATE TYPE notification_message_status AS ENUM ( + 'pending', + 'leased', + 'sent', + 'permanent_failure', + 'temporary_failure', + 'unknown' +); + +CREATE TYPE notification_method AS ENUM ( + 'smtp', + 'webhook' +); + CREATE TYPE parameter_destination_scheme AS ENUM ( 'none', 'environment_variable', @@ -534,6 +548,34 @@ CREATE SEQUENCE licenses_id_seq ALTER SEQUENCE licenses_id_seq OWNED BY licenses.id; +CREATE TABLE notification_messages ( + id uuid NOT NULL, + notification_template_id uuid NOT NULL, + user_id uuid NOT NULL, + method notification_method NOT NULL, + status notification_message_status DEFAULT 'pending'::notification_message_status NOT NULL, + status_reason text, + created_by text NOT NULL, + payload jsonb NOT NULL, + attempt_count integer DEFAULT 0, + targets uuid[], + created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL, + updated_at timestamp with time zone, + leased_until timestamp with time zone, + next_retry_after timestamp with time zone +); + +CREATE TABLE notification_templates ( + id uuid NOT NULL, + name text NOT NULL, + title_template text NOT NULL, + body_template text NOT NULL, + actions jsonb, + "group" text +); + +COMMENT ON TABLE notification_templates IS 'Templates from which to create notification messages.'; + CREATE TABLE oauth2_provider_app_codes ( id uuid NOT NULL, created_at timestamp with time zone NOT NULL, @@ -1473,6 +1515,15 @@ ALTER TABLE ONLY licenses ALTER TABLE ONLY licenses ADD CONSTRAINT licenses_pkey PRIMARY KEY (id); +ALTER TABLE ONLY notification_messages + ADD CONSTRAINT notification_messages_pkey PRIMARY KEY (id); + +ALTER TABLE ONLY notification_templates + ADD CONSTRAINT notification_templates_name_key UNIQUE (name); + +ALTER TABLE ONLY notification_templates + ADD CONSTRAINT notification_templates_pkey PRIMARY KEY (id); + ALTER TABLE ONLY oauth2_provider_app_codes ADD CONSTRAINT oauth2_provider_app_codes_pkey PRIMARY KEY (id); @@ -1652,6 +1703,8 @@ CREATE INDEX idx_custom_roles_id ON custom_roles USING btree (id); CREATE UNIQUE INDEX idx_custom_roles_name_lower ON custom_roles USING btree (lower(name)); +CREATE INDEX idx_notification_messages_status ON notification_messages USING btree (status); + CREATE INDEX idx_organization_member_organization_id_uuid ON organization_members USING btree (organization_id); CREATE INDEX idx_organization_member_user_id_uuid ON organization_members USING btree (user_id); @@ -1769,6 +1822,12 @@ ALTER TABLE ONLY jfrog_xray_scans ALTER TABLE ONLY jfrog_xray_scans ADD CONSTRAINT jfrog_xray_scans_workspace_id_fkey FOREIGN KEY (workspace_id) REFERENCES workspaces(id) ON DELETE CASCADE; +ALTER TABLE ONLY notification_messages + ADD CONSTRAINT notification_messages_notification_template_id_fkey FOREIGN KEY (notification_template_id) REFERENCES notification_templates(id) ON DELETE CASCADE; + +ALTER TABLE ONLY notification_messages + ADD CONSTRAINT notification_messages_user_id_fkey FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE; + ALTER TABLE ONLY oauth2_provider_app_codes ADD CONSTRAINT oauth2_provider_app_codes_app_id_fkey FOREIGN KEY (app_id) REFERENCES oauth2_provider_apps(id) ON DELETE CASCADE; diff --git a/coderd/database/foreign_key_constraint.go b/coderd/database/foreign_key_constraint.go index 2a8f1738d3cb8..3a9557a9758dd 100644 --- a/coderd/database/foreign_key_constraint.go +++ b/coderd/database/foreign_key_constraint.go @@ -15,6 +15,8 @@ const ( ForeignKeyGroupsOrganizationID ForeignKeyConstraint = "groups_organization_id_fkey" // ALTER TABLE ONLY groups ADD CONSTRAINT groups_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE; ForeignKeyJfrogXrayScansAgentID ForeignKeyConstraint = "jfrog_xray_scans_agent_id_fkey" // ALTER TABLE ONLY jfrog_xray_scans ADD CONSTRAINT jfrog_xray_scans_agent_id_fkey FOREIGN KEY (agent_id) REFERENCES workspace_agents(id) ON DELETE CASCADE; ForeignKeyJfrogXrayScansWorkspaceID ForeignKeyConstraint = "jfrog_xray_scans_workspace_id_fkey" // ALTER TABLE ONLY jfrog_xray_scans ADD CONSTRAINT jfrog_xray_scans_workspace_id_fkey FOREIGN KEY (workspace_id) REFERENCES workspaces(id) ON DELETE CASCADE; + ForeignKeyNotificationMessagesNotificationTemplateID ForeignKeyConstraint = "notification_messages_notification_template_id_fkey" // ALTER TABLE ONLY notification_messages ADD CONSTRAINT notification_messages_notification_template_id_fkey FOREIGN KEY (notification_template_id) REFERENCES notification_templates(id) ON DELETE CASCADE; + ForeignKeyNotificationMessagesUserID ForeignKeyConstraint = "notification_messages_user_id_fkey" // ALTER TABLE ONLY notification_messages ADD CONSTRAINT notification_messages_user_id_fkey FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE; ForeignKeyOauth2ProviderAppCodesAppID ForeignKeyConstraint = "oauth2_provider_app_codes_app_id_fkey" // ALTER TABLE ONLY oauth2_provider_app_codes ADD CONSTRAINT oauth2_provider_app_codes_app_id_fkey FOREIGN KEY (app_id) REFERENCES oauth2_provider_apps(id) ON DELETE CASCADE; ForeignKeyOauth2ProviderAppCodesUserID ForeignKeyConstraint = "oauth2_provider_app_codes_user_id_fkey" // ALTER TABLE ONLY oauth2_provider_app_codes ADD CONSTRAINT oauth2_provider_app_codes_user_id_fkey FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE; ForeignKeyOauth2ProviderAppSecretsAppID ForeignKeyConstraint = "oauth2_provider_app_secrets_app_id_fkey" // ALTER TABLE ONLY oauth2_provider_app_secrets ADD CONSTRAINT oauth2_provider_app_secrets_app_id_fkey FOREIGN KEY (app_id) REFERENCES oauth2_provider_apps(id) ON DELETE CASCADE; diff --git a/coderd/database/migrations/000221_notifications.down.sql b/coderd/database/migrations/000221_notifications.down.sql new file mode 100644 index 0000000000000..a7cd8a5f6a4c3 --- /dev/null +++ b/coderd/database/migrations/000221_notifications.down.sql @@ -0,0 +1,4 @@ +DROP TABLE IF EXISTS notification_messages; +DROP TABLE IF EXISTS notification_templates; +DROP TYPE IF EXISTS notification_method; +DROP TYPE IF EXISTS notification_message_status; \ No newline at end of file diff --git a/coderd/database/migrations/000221_notifications.up.sql b/coderd/database/migrations/000221_notifications.up.sql new file mode 100644 index 0000000000000..567ed87d80764 --- /dev/null +++ b/coderd/database/migrations/000221_notifications.up.sql @@ -0,0 +1,65 @@ +CREATE TYPE notification_message_status AS ENUM ( + 'pending', + 'leased', + 'sent', + 'permanent_failure', + 'temporary_failure', + 'unknown' + ); + +CREATE TYPE notification_method AS ENUM ( + 'smtp', + 'webhook' + ); + +CREATE TABLE notification_templates +( + id uuid NOT NULL, + name text NOT NULL, + title_template text NOT NULL, + body_template text NOT NULL, + actions jsonb, + "group" text, + PRIMARY KEY (id), + UNIQUE (name) +); + +COMMENT ON TABLE notification_templates IS 'Templates from which to create notification messages.'; + +CREATE TABLE notification_messages +( + id uuid NOT NULL, + notification_template_id uuid NOT NULL, + user_id uuid NOT NULL, + method notification_method NOT NULL, + status notification_message_status NOT NULL DEFAULT 'pending'::notification_message_status, + status_reason text, + created_by text NOT NULL, + payload jsonb NOT NULL, + attempt_count int DEFAULT 0, + targets uuid[], + created_at timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at timestamp with time zone, + leased_until timestamp with time zone, + next_retry_after timestamp with time zone, + PRIMARY KEY (id), + FOREIGN KEY (notification_template_id) REFERENCES notification_templates (id) ON DELETE CASCADE, + FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE +); + +CREATE INDEX idx_notification_messages_status ON notification_messages (status); + +-- TODO: autogenerate constants which reference the UUIDs +INSERT INTO notification_templates (id, name, title_template, body_template, "group", actions) +VALUES ('f517da0b-cdc9-410f-ab89-a86107c420ed', 'Workspace Deleted', E'Workspace "{{.Labels.name}}" deleted', + E'Hi {{.UserName}}\n\nYour workspace **{{.Labels.name}}** was deleted.\nThe specified reason was "**{{.Labels.reason}}**".', + 'Workspace Events', '[ + { + "label": "View workspaces", + "url": "{{ base_url }}/workspaces" + }, + { + "label": "View templates", + "url": "{{ base_url }}/templates" + } + ]'::jsonb); diff --git a/coderd/database/migrations/testdata/fixtures/000221_notifications.up.sql b/coderd/database/migrations/testdata/fixtures/000221_notifications.up.sql new file mode 100644 index 0000000000000..a3bd8a73f2566 --- /dev/null +++ b/coderd/database/migrations/testdata/fixtures/000221_notifications.up.sql @@ -0,0 +1,21 @@ +DO +$$ + DECLARE + template text; + BEGIN + SELECT 'You successfully did {{.thing}}!' INTO template; + + INSERT INTO notification_templates (id, name, title_template, body_template, "group") + VALUES ('a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11', 'A', template, template, 'Group 1'), + ('b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a12', 'B', template, template, 'Group 1'), + ('c0eebc99-9c0b-4ef8-bb6d-6bb9bd380a13', 'C', template, template, 'Group 2'); + + INSERT INTO users(id, email, username, hashed_password, created_at, updated_at, status, rbac_roles, deleted) + VALUES ('fc1511ef-4fcf-4a3b-98a1-8df64160e35a', 'githubuser@coder.com', 'githubuser', '\x', '2022-11-02 13:05:21.445455+02', '2022-11-02 13:05:21.445455+02', 'active', '{}', false) ON CONFLICT DO NOTHING; + + INSERT INTO notification_messages (id, notification_template_id, user_id, method, created_by, payload) + VALUES ( + gen_random_uuid(), 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11', 'fc1511ef-4fcf-4a3b-98a1-8df64160e35a', 'smtp'::notification_method, 'test', '{}' + ); + END +$$; diff --git a/coderd/database/models.go b/coderd/database/models.go index aea9837e92e89..d7f1ab9972a61 100644 --- a/coderd/database/models.go +++ b/coderd/database/models.go @@ -660,6 +660,134 @@ func AllLoginTypeValues() []LoginType { } } +type NotificationMessageStatus string + +const ( + NotificationMessageStatusPending NotificationMessageStatus = "pending" + NotificationMessageStatusLeased NotificationMessageStatus = "leased" + NotificationMessageStatusSent NotificationMessageStatus = "sent" + NotificationMessageStatusPermanentFailure NotificationMessageStatus = "permanent_failure" + NotificationMessageStatusTemporaryFailure NotificationMessageStatus = "temporary_failure" + NotificationMessageStatusUnknown NotificationMessageStatus = "unknown" +) + +func (e *NotificationMessageStatus) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = NotificationMessageStatus(s) + case string: + *e = NotificationMessageStatus(s) + default: + return fmt.Errorf("unsupported scan type for NotificationMessageStatus: %T", src) + } + return nil +} + +type NullNotificationMessageStatus struct { + NotificationMessageStatus NotificationMessageStatus `json:"notification_message_status"` + Valid bool `json:"valid"` // Valid is true if NotificationMessageStatus is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullNotificationMessageStatus) Scan(value interface{}) error { + if value == nil { + ns.NotificationMessageStatus, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.NotificationMessageStatus.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullNotificationMessageStatus) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.NotificationMessageStatus), nil +} + +func (e NotificationMessageStatus) Valid() bool { + switch e { + case NotificationMessageStatusPending, + NotificationMessageStatusLeased, + NotificationMessageStatusSent, + NotificationMessageStatusPermanentFailure, + NotificationMessageStatusTemporaryFailure, + NotificationMessageStatusUnknown: + return true + } + return false +} + +func AllNotificationMessageStatusValues() []NotificationMessageStatus { + return []NotificationMessageStatus{ + NotificationMessageStatusPending, + NotificationMessageStatusLeased, + NotificationMessageStatusSent, + NotificationMessageStatusPermanentFailure, + NotificationMessageStatusTemporaryFailure, + NotificationMessageStatusUnknown, + } +} + +type NotificationMethod string + +const ( + NotificationMethodSmtp NotificationMethod = "smtp" + NotificationMethodWebhook NotificationMethod = "webhook" +) + +func (e *NotificationMethod) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = NotificationMethod(s) + case string: + *e = NotificationMethod(s) + default: + return fmt.Errorf("unsupported scan type for NotificationMethod: %T", src) + } + return nil +} + +type NullNotificationMethod struct { + NotificationMethod NotificationMethod `json:"notification_method"` + Valid bool `json:"valid"` // Valid is true if NotificationMethod is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullNotificationMethod) Scan(value interface{}) error { + if value == nil { + ns.NotificationMethod, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.NotificationMethod.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullNotificationMethod) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.NotificationMethod), nil +} + +func (e NotificationMethod) Valid() bool { + switch e { + case NotificationMethodSmtp, + NotificationMethodWebhook: + return true + } + return false +} + +func AllNotificationMethodValues() []NotificationMethod { + return []NotificationMethod{ + NotificationMethodSmtp, + NotificationMethodWebhook, + } +} + type ParameterDestinationScheme string const ( @@ -1885,6 +2013,33 @@ type License struct { UUID uuid.UUID `db:"uuid" json:"uuid"` } +type NotificationMessage struct { + ID uuid.UUID `db:"id" json:"id"` + NotificationTemplateID uuid.UUID `db:"notification_template_id" json:"notification_template_id"` + UserID uuid.UUID `db:"user_id" json:"user_id"` + Method NotificationMethod `db:"method" json:"method"` + Status NotificationMessageStatus `db:"status" json:"status"` + StatusReason sql.NullString `db:"status_reason" json:"status_reason"` + CreatedBy string `db:"created_by" json:"created_by"` + Payload []byte `db:"payload" json:"payload"` + AttemptCount sql.NullInt32 `db:"attempt_count" json:"attempt_count"` + Targets []uuid.UUID `db:"targets" json:"targets"` + CreatedAt time.Time `db:"created_at" json:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at" json:"updated_at"` + LeasedUntil sql.NullTime `db:"leased_until" json:"leased_until"` + NextRetryAfter sql.NullTime `db:"next_retry_after" json:"next_retry_after"` +} + +// Templates from which to create notification messages. +type NotificationTemplate struct { + ID uuid.UUID `db:"id" json:"id"` + Name string `db:"name" json:"name"` + TitleTemplate string `db:"title_template" json:"title_template"` + BodyTemplate string `db:"body_template" json:"body_template"` + Actions []byte `db:"actions" json:"actions"` + Group sql.NullString `db:"group" json:"group"` +} + // A table used to configure apps that can use Coder as an OAuth2 provider, the reverse of what we are calling external authentication. type OAuth2ProviderApp struct { ID uuid.UUID `db:"id" json:"id"` diff --git a/coderd/database/querier.go b/coderd/database/querier.go index f244f52026eb0..179a5e06039ff 100644 --- a/coderd/database/querier.go +++ b/coderd/database/querier.go @@ -17,6 +17,18 @@ type sqlcQuerier interface { // This must be called from within a transaction. The lock will be automatically // released when the transaction ends. AcquireLock(ctx context.Context, pgAdvisoryXactLock int64) error + // Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending. + // Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned. + // + // A "lease" here refers to a notifier taking ownership of a notification_messages row. A lease survives for the duration + // of CODER_NOTIFICATIONS_LEASE_PERIOD. Once a message is delivered, its status is updated and the lease expires (set to NULL). + // If a message exceeds its lease, that implies the notifier did not shutdown cleanly, or the table update failed somehow, + // and the row will then be eligible to be dequeued by another notifier. + // + // SKIP LOCKED is used to jump over locked rows. This prevents multiple notifiers from acquiring the same messages. + // See: https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE + // + AcquireNotificationMessages(ctx context.Context, arg AcquireNotificationMessagesParams) ([]AcquireNotificationMessagesRow, error) // Acquires the lock for a single job that isn't started, completed, // canceled, and that matches an array of provisioner types. // @@ -45,6 +57,8 @@ type sqlcQuerier interface { // referenced by the latest build of a workspace. ArchiveUnusedTemplateVersions(ctx context.Context, arg ArchiveUnusedTemplateVersionsParams) ([]uuid.UUID, error) BatchUpdateWorkspaceLastUsedAt(ctx context.Context, arg BatchUpdateWorkspaceLastUsedAtParams) error + BulkMarkNotificationMessagesFailed(ctx context.Context, arg BulkMarkNotificationMessagesFailedParams) (int64, error) + BulkMarkNotificationMessagesSent(ctx context.Context, arg BulkMarkNotificationMessagesSentParams) (int64, error) CleanTailnetCoordinators(ctx context.Context) error CleanTailnetLostPeers(ctx context.Context) error CleanTailnetTunnels(ctx context.Context) error @@ -65,6 +79,8 @@ type sqlcQuerier interface { DeleteOAuth2ProviderAppCodesByAppAndUserID(ctx context.Context, arg DeleteOAuth2ProviderAppCodesByAppAndUserIDParams) error DeleteOAuth2ProviderAppSecretByID(ctx context.Context, id uuid.UUID) error DeleteOAuth2ProviderAppTokensByAppAndUserID(ctx context.Context, arg DeleteOAuth2ProviderAppTokensByAppAndUserIDParams) error + // Delete all notification messages which have not been updated for over a week. + DeleteOldNotificationMessages(ctx context.Context) error // Delete provisioner daemons that have been created at least a week ago // and have not connected to coderd since a week. // A provisioner daemon with "zeroed" last_seen_at column indicates possible @@ -84,7 +100,10 @@ type sqlcQuerier interface { DeleteTailnetTunnel(ctx context.Context, arg DeleteTailnetTunnelParams) (DeleteTailnetTunnelRow, error) DeleteWorkspaceAgentPortShare(ctx context.Context, arg DeleteWorkspaceAgentPortShareParams) error DeleteWorkspaceAgentPortSharesByTemplate(ctx context.Context, templateID uuid.UUID) error + EnqueueNotificationMessage(ctx context.Context, arg EnqueueNotificationMessageParams) (NotificationMessage, error) FavoriteWorkspace(ctx context.Context, id uuid.UUID) error + // This is used to build up the notification_message's JSON payload. + FetchNewMessageMetadata(ctx context.Context, arg FetchNewMessageMetadataParams) (FetchNewMessageMetadataRow, error) GetAPIKeyByID(ctx context.Context, id string) (APIKey, error) // there is no unique constraint on empty token names GetAPIKeyByName(ctx context.Context, arg GetAPIKeyByNameParams) (APIKey, error) diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 3902b2e5f8461..61b697a934744 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -3285,6 +3285,297 @@ func (q *sqlQuerier) TryAcquireLock(ctx context.Context, pgTryAdvisoryXactLock i return pg_try_advisory_xact_lock, err } +const acquireNotificationMessages = `-- name: AcquireNotificationMessages :many +WITH acquired AS ( + UPDATE + notification_messages + SET updated_at = NOW(), + status = 'leased'::notification_message_status, + status_reason = 'Leased by notifier ' || $1::uuid, + leased_until = NOW() + CONCAT($2::int, ' seconds')::interval + WHERE id IN (SELECT nm.id + FROM notification_messages AS nm + WHERE ( + ( + -- message is in acquirable states + nm.status IN ( + 'pending'::notification_message_status, + 'temporary_failure'::notification_message_status + ) + ) + -- or somehow the message was left in leased for longer than its lease period + OR ( + nm.status = 'leased'::notification_message_status + AND nm.leased_until < NOW() + ) + ) + AND ( + -- exclude all messages which have exceeded the max attempts; these will be purged later + nm.attempt_count IS NULL OR nm.attempt_count < $3::int + ) + -- if set, do not retry until we've exceeded the wait time + AND ( + CASE + WHEN nm.next_retry_after IS NOT NULL THEN nm.next_retry_after < NOW() + ELSE true + END + ) + ORDER BY nm.created_at ASC + -- Ensure that multiple concurrent readers cannot retrieve the same rows + FOR UPDATE OF nm + SKIP LOCKED + LIMIT $4) + RETURNING id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after) +SELECT + -- message + nm.id, + nm.payload, + nm.method, + nm.created_by, + -- template + nt.title_template, + nt.body_template +FROM acquired nm + JOIN notification_templates nt ON nm.notification_template_id = nt.id +` + +type AcquireNotificationMessagesParams struct { + NotifierID uuid.UUID `db:"notifier_id" json:"notifier_id"` + LeaseSeconds int32 `db:"lease_seconds" json:"lease_seconds"` + MaxAttemptCount int32 `db:"max_attempt_count" json:"max_attempt_count"` + Count int32 `db:"count" json:"count"` +} + +type AcquireNotificationMessagesRow struct { + ID uuid.UUID `db:"id" json:"id"` + Payload json.RawMessage `db:"payload" json:"payload"` + Method NotificationMethod `db:"method" json:"method"` + CreatedBy string `db:"created_by" json:"created_by"` + TitleTemplate string `db:"title_template" json:"title_template"` + BodyTemplate string `db:"body_template" json:"body_template"` +} + +// Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending. +// Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned. +// +// A "lease" here refers to a notifier taking ownership of a notification_messages row. A lease survives for the duration +// of CODER_NOTIFICATIONS_LEASE_PERIOD. Once a message is delivered, its status is updated and the lease expires (set to NULL). +// If a message exceeds its lease, that implies the notifier did not shutdown cleanly, or the table update failed somehow, +// and the row will then be eligible to be dequeued by another notifier. +// +// SKIP LOCKED is used to jump over locked rows. This prevents multiple notifiers from acquiring the same messages. +// See: https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE +func (q *sqlQuerier) AcquireNotificationMessages(ctx context.Context, arg AcquireNotificationMessagesParams) ([]AcquireNotificationMessagesRow, error) { + rows, err := q.db.QueryContext(ctx, acquireNotificationMessages, + arg.NotifierID, + arg.LeaseSeconds, + arg.MaxAttemptCount, + arg.Count, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []AcquireNotificationMessagesRow + for rows.Next() { + var i AcquireNotificationMessagesRow + if err := rows.Scan( + &i.ID, + &i.Payload, + &i.Method, + &i.CreatedBy, + &i.TitleTemplate, + &i.BodyTemplate, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const bulkMarkNotificationMessagesFailed = `-- name: BulkMarkNotificationMessagesFailed :execrows +UPDATE notification_messages +SET updated_at = subquery.failed_at, + attempt_count = attempt_count + 1, + status = CASE + WHEN attempt_count + 1 < $1::int THEN subquery.status + ELSE 'permanent_failure'::notification_message_status END, + status_reason = subquery.status_reason, + leased_until = NULL, + next_retry_after = CASE + WHEN (attempt_count + 1 < $1::int) + THEN NOW() + CONCAT($2::int, ' seconds')::interval END +FROM (SELECT UNNEST($3::uuid[]) AS id, + UNNEST($4::timestamptz[]) AS failed_at, + UNNEST($5::notification_message_status[]) AS status, + UNNEST($6::text[]) AS status_reason) AS subquery +WHERE notification_messages.id = subquery.id +` + +type BulkMarkNotificationMessagesFailedParams struct { + MaxAttempts int32 `db:"max_attempts" json:"max_attempts"` + RetryInterval int32 `db:"retry_interval" json:"retry_interval"` + IDs []uuid.UUID `db:"ids" json:"ids"` + FailedAts []time.Time `db:"failed_ats" json:"failed_ats"` + Statuses []NotificationMessageStatus `db:"statuses" json:"statuses"` + StatusReasons []string `db:"status_reasons" json:"status_reasons"` +} + +func (q *sqlQuerier) BulkMarkNotificationMessagesFailed(ctx context.Context, arg BulkMarkNotificationMessagesFailedParams) (int64, error) { + result, err := q.db.ExecContext(ctx, bulkMarkNotificationMessagesFailed, + arg.MaxAttempts, + arg.RetryInterval, + pq.Array(arg.IDs), + pq.Array(arg.FailedAts), + pq.Array(arg.Statuses), + pq.Array(arg.StatusReasons), + ) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +const bulkMarkNotificationMessagesSent = `-- name: BulkMarkNotificationMessagesSent :execrows +UPDATE notification_messages +SET updated_at = new_values.sent_at, + attempt_count = attempt_count + 1, + status = 'sent'::notification_message_status, + status_reason = NULL, + leased_until = NULL, + next_retry_after = NULL +FROM (SELECT UNNEST($1::uuid[]) AS id, + UNNEST($2::timestamptz[]) AS sent_at) + AS new_values +WHERE notification_messages.id = new_values.id +` + +type BulkMarkNotificationMessagesSentParams struct { + IDs []uuid.UUID `db:"ids" json:"ids"` + SentAts []time.Time `db:"sent_ats" json:"sent_ats"` +} + +func (q *sqlQuerier) BulkMarkNotificationMessagesSent(ctx context.Context, arg BulkMarkNotificationMessagesSentParams) (int64, error) { + result, err := q.db.ExecContext(ctx, bulkMarkNotificationMessagesSent, pq.Array(arg.IDs), pq.Array(arg.SentAts)) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +const deleteOldNotificationMessages = `-- name: DeleteOldNotificationMessages :exec +DELETE +FROM notification_messages +WHERE id IN + (SELECT id + FROM notification_messages AS nested + WHERE nested.updated_at < NOW() - INTERVAL '7 days') +` + +// Delete all notification messages which have not been updated for over a week. +func (q *sqlQuerier) DeleteOldNotificationMessages(ctx context.Context) error { + _, err := q.db.ExecContext(ctx, deleteOldNotificationMessages) + return err +} + +const enqueueNotificationMessage = `-- name: EnqueueNotificationMessage :one +INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by) +VALUES ($1, + $2, + $3, + $4::notification_method, + $5::jsonb, + $6, + $7) +RETURNING id, notification_template_id, user_id, method, status, status_reason, created_by, payload, attempt_count, targets, created_at, updated_at, leased_until, next_retry_after +` + +type EnqueueNotificationMessageParams struct { + ID uuid.UUID `db:"id" json:"id"` + NotificationTemplateID uuid.UUID `db:"notification_template_id" json:"notification_template_id"` + UserID uuid.UUID `db:"user_id" json:"user_id"` + Method NotificationMethod `db:"method" json:"method"` + Payload json.RawMessage `db:"payload" json:"payload"` + Targets []uuid.UUID `db:"targets" json:"targets"` + CreatedBy string `db:"created_by" json:"created_by"` +} + +func (q *sqlQuerier) EnqueueNotificationMessage(ctx context.Context, arg EnqueueNotificationMessageParams) (NotificationMessage, error) { + row := q.db.QueryRowContext(ctx, enqueueNotificationMessage, + arg.ID, + arg.NotificationTemplateID, + arg.UserID, + arg.Method, + arg.Payload, + pq.Array(arg.Targets), + arg.CreatedBy, + ) + var i NotificationMessage + err := row.Scan( + &i.ID, + &i.NotificationTemplateID, + &i.UserID, + &i.Method, + &i.Status, + &i.StatusReason, + &i.CreatedBy, + &i.Payload, + &i.AttemptCount, + pq.Array(&i.Targets), + &i.CreatedAt, + &i.UpdatedAt, + &i.LeasedUntil, + &i.NextRetryAfter, + ) + return i, err +} + +const fetchNewMessageMetadata = `-- name: FetchNewMessageMetadata :one +SELECT nt.name AS notification_name, + nt.actions AS actions, + u.id AS user_id, + u.email AS user_email, + COALESCE(NULLIF(u.name, ''), NULLIF(u.username, ''))::text AS user_name +FROM notification_templates nt, + users u +WHERE nt.id = $1 + AND u.id = $2 +` + +type FetchNewMessageMetadataParams struct { + NotificationTemplateID uuid.UUID `db:"notification_template_id" json:"notification_template_id"` + UserID uuid.UUID `db:"user_id" json:"user_id"` +} + +type FetchNewMessageMetadataRow struct { + NotificationName string `db:"notification_name" json:"notification_name"` + Actions []byte `db:"actions" json:"actions"` + UserID uuid.UUID `db:"user_id" json:"user_id"` + UserEmail string `db:"user_email" json:"user_email"` + UserName string `db:"user_name" json:"user_name"` +} + +// This is used to build up the notification_message's JSON payload. +func (q *sqlQuerier) FetchNewMessageMetadata(ctx context.Context, arg FetchNewMessageMetadataParams) (FetchNewMessageMetadataRow, error) { + row := q.db.QueryRowContext(ctx, fetchNewMessageMetadata, arg.NotificationTemplateID, arg.UserID) + var i FetchNewMessageMetadataRow + err := row.Scan( + &i.NotificationName, + &i.Actions, + &i.UserID, + &i.UserEmail, + &i.UserName, + ) + return i, err +} + const deleteOAuth2ProviderAppByID = `-- name: DeleteOAuth2ProviderAppByID :exec DELETE FROM oauth2_provider_apps WHERE id = $1 ` diff --git a/coderd/database/queries/notifications.sql b/coderd/database/queries/notifications.sql new file mode 100644 index 0000000000000..8cc31e0661927 --- /dev/null +++ b/coderd/database/queries/notifications.sql @@ -0,0 +1,127 @@ +-- name: FetchNewMessageMetadata :one +-- This is used to build up the notification_message's JSON payload. +SELECT nt.name AS notification_name, + nt.actions AS actions, + u.id AS user_id, + u.email AS user_email, + COALESCE(NULLIF(u.name, ''), NULLIF(u.username, ''))::text AS user_name +FROM notification_templates nt, + users u +WHERE nt.id = @notification_template_id + AND u.id = @user_id; + +-- name: EnqueueNotificationMessage :one +INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by) +VALUES (@id, + @notification_template_id, + @user_id, + @method::notification_method, + @payload::jsonb, + @targets, + @created_by) +RETURNING *; + +-- Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending. +-- Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned. +-- +-- A "lease" here refers to a notifier taking ownership of a notification_messages row. A lease survives for the duration +-- of CODER_NOTIFICATIONS_LEASE_PERIOD. Once a message is delivered, its status is updated and the lease expires (set to NULL). +-- If a message exceeds its lease, that implies the notifier did not shutdown cleanly, or the table update failed somehow, +-- and the row will then be eligible to be dequeued by another notifier. +-- +-- SKIP LOCKED is used to jump over locked rows. This prevents multiple notifiers from acquiring the same messages. +-- See: https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE +-- +-- name: AcquireNotificationMessages :many +WITH acquired AS ( + UPDATE + notification_messages + SET updated_at = NOW(), + status = 'leased'::notification_message_status, + status_reason = 'Leased by notifier ' || sqlc.arg('notifier_id')::uuid, + leased_until = NOW() + CONCAT(sqlc.arg('lease_seconds')::int, ' seconds')::interval + WHERE id IN (SELECT nm.id + FROM notification_messages AS nm + WHERE ( + ( + -- message is in acquirable states + nm.status IN ( + 'pending'::notification_message_status, + 'temporary_failure'::notification_message_status + ) + ) + -- or somehow the message was left in leased for longer than its lease period + OR ( + nm.status = 'leased'::notification_message_status + AND nm.leased_until < NOW() + ) + ) + AND ( + -- exclude all messages which have exceeded the max attempts; these will be purged later + nm.attempt_count IS NULL OR nm.attempt_count < sqlc.arg('max_attempt_count')::int + ) + -- if set, do not retry until we've exceeded the wait time + AND ( + CASE + WHEN nm.next_retry_after IS NOT NULL THEN nm.next_retry_after < NOW() + ELSE true + END + ) + ORDER BY nm.created_at ASC + -- Ensure that multiple concurrent readers cannot retrieve the same rows + FOR UPDATE OF nm + SKIP LOCKED + LIMIT sqlc.arg('count')) + RETURNING *) +SELECT + -- message + nm.id, + nm.payload, + nm.method, + nm.created_by, + -- template + nt.title_template, + nt.body_template +FROM acquired nm + JOIN notification_templates nt ON nm.notification_template_id = nt.id; + +-- name: BulkMarkNotificationMessagesFailed :execrows +UPDATE notification_messages +SET updated_at = subquery.failed_at, + attempt_count = attempt_count + 1, + status = CASE + WHEN attempt_count + 1 < @max_attempts::int THEN subquery.status + ELSE 'permanent_failure'::notification_message_status END, + status_reason = subquery.status_reason, + leased_until = NULL, + next_retry_after = CASE + WHEN (attempt_count + 1 < @max_attempts::int) + THEN NOW() + CONCAT(@retry_interval::int, ' seconds')::interval END +FROM (SELECT UNNEST(@ids::uuid[]) AS id, + UNNEST(@failed_ats::timestamptz[]) AS failed_at, + UNNEST(@statuses::notification_message_status[]) AS status, + UNNEST(@status_reasons::text[]) AS status_reason) AS subquery +WHERE notification_messages.id = subquery.id; + +-- name: BulkMarkNotificationMessagesSent :execrows +UPDATE notification_messages +SET updated_at = new_values.sent_at, + attempt_count = attempt_count + 1, + status = 'sent'::notification_message_status, + status_reason = NULL, + leased_until = NULL, + next_retry_after = NULL +FROM (SELECT UNNEST(@ids::uuid[]) AS id, + UNNEST(@sent_ats::timestamptz[]) AS sent_at) + AS new_values +WHERE notification_messages.id = new_values.id; + +-- Delete all notification messages which have not been updated for over a week. +-- name: DeleteOldNotificationMessages :exec +DELETE +FROM notification_messages +WHERE id IN + (SELECT id + FROM notification_messages AS nested + WHERE nested.updated_at < NOW() - INTERVAL '7 days'); + diff --git a/coderd/database/sqlc.yaml b/coderd/database/sqlc.yaml index 67d7e52b06b6d..5d6f4419d5b8b 100644 --- a/coderd/database/sqlc.yaml +++ b/coderd/database/sqlc.yaml @@ -64,6 +64,12 @@ sql: - column: "template_usage_stats.app_usage_mins" go_type: type: "StringMapOfInt" + - column: "notification_templates.actions" + go_type: + type: "[]byte" + - column: "notification_messages.payload" + go_type: + type: "[]byte" rename: template: TemplateTable template_with_user: Template diff --git a/coderd/database/unique_constraint.go b/coderd/database/unique_constraint.go index cbae30279c5e9..d090af80626b8 100644 --- a/coderd/database/unique_constraint.go +++ b/coderd/database/unique_constraint.go @@ -23,6 +23,9 @@ const ( UniqueJfrogXrayScansPkey UniqueConstraint = "jfrog_xray_scans_pkey" // ALTER TABLE ONLY jfrog_xray_scans ADD CONSTRAINT jfrog_xray_scans_pkey PRIMARY KEY (agent_id, workspace_id); UniqueLicensesJWTKey UniqueConstraint = "licenses_jwt_key" // ALTER TABLE ONLY licenses ADD CONSTRAINT licenses_jwt_key UNIQUE (jwt); UniqueLicensesPkey UniqueConstraint = "licenses_pkey" // ALTER TABLE ONLY licenses ADD CONSTRAINT licenses_pkey PRIMARY KEY (id); + UniqueNotificationMessagesPkey UniqueConstraint = "notification_messages_pkey" // ALTER TABLE ONLY notification_messages ADD CONSTRAINT notification_messages_pkey PRIMARY KEY (id); + UniqueNotificationTemplatesNameKey UniqueConstraint = "notification_templates_name_key" // ALTER TABLE ONLY notification_templates ADD CONSTRAINT notification_templates_name_key UNIQUE (name); + UniqueNotificationTemplatesPkey UniqueConstraint = "notification_templates_pkey" // ALTER TABLE ONLY notification_templates ADD CONSTRAINT notification_templates_pkey PRIMARY KEY (id); UniqueOauth2ProviderAppCodesPkey UniqueConstraint = "oauth2_provider_app_codes_pkey" // ALTER TABLE ONLY oauth2_provider_app_codes ADD CONSTRAINT oauth2_provider_app_codes_pkey PRIMARY KEY (id); UniqueOauth2ProviderAppCodesSecretPrefixKey UniqueConstraint = "oauth2_provider_app_codes_secret_prefix_key" // ALTER TABLE ONLY oauth2_provider_app_codes ADD CONSTRAINT oauth2_provider_app_codes_secret_prefix_key UNIQUE (secret_prefix); UniqueOauth2ProviderAppSecretsPkey UniqueConstraint = "oauth2_provider_app_secrets_pkey" // ALTER TABLE ONLY oauth2_provider_app_secrets ADD CONSTRAINT oauth2_provider_app_secrets_pkey PRIMARY KEY (id);