Skip to content

feat: add notification deduplication trigger #14172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: add notification deduplication trigger
Signed-off-by: Danny Kopping <danny@coder.com>
  • Loading branch information
dannykopping committed Aug 5, 2024
commit 675aa8fee99053fc9f1f7d404f954985b0e26026
4 changes: 3 additions & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ import (
"github.com/coder/serpent"
"github.com/coder/wgtunnel/tunnelsdk"

"github.com/coder/quartz"

"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/cli/clilog"
"github.com/coder/coder/v2/cli/cliui"
Expand Down Expand Up @@ -995,7 +997,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
metrics := notifications.NewMetrics(options.PrometheusRegistry)

// The enqueuer is responsible for enqueueing notifications to the given store.
enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, templateHelpers(options), logger.Named("notifications.enqueuer"))
enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, templateHelpers(options), logger.Named("notifications.enqueuer"), quartz.NewReal())
if err != nil {
return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err)
}
Expand Down
27 changes: 26 additions & 1 deletion coderd/database/dump.sql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP TRIGGER IF EXISTS update_notification_message_dedupe_hash ON notification_messages;
DROP FUNCTION IF EXISTS compute_notification_message_dedupe_hash();
ALTER TABLE IF EXISTS notification_messages
DROP COLUMN IF EXISTS dedupe_hash;
33 changes: 33 additions & 0 deletions coderd/database/migrations/000239_notifications_dedupe.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
-- Add a column to store the hash.
ALTER TABLE IF EXISTS notification_messages
ADD COLUMN IF NOT EXISTS dedupe_hash TEXT NULL;

COMMENT ON COLUMN notification_messages.dedupe_hash IS 'Auto-generated by insert/update trigger, used to prevent duplicate notifications from being enqueued on the same day';

-- Ensure that multiple notifications with identical hashes cannot be inserted into the table.
CREATE UNIQUE INDEX ON notification_messages (dedupe_hash);

-- Computes a hash from all unique messages fields and the current day; this will help prevent duplicate messages from being sent within the same day.
-- It is possible that a message could be sent at 23:59:59 and again at 00:00:00, but this should be good enough for now.
-- This could have been a unique index, but we cannot immutably create an index on a timestamp with a timezone.
CREATE OR REPLACE FUNCTION compute_notification_message_dedupe_hash() RETURNS TRIGGER AS
$$
BEGIN
NEW.dedupe_hash := MD5(CONCAT_WS(':',
NEW.notification_template_id,
NEW.user_id,
NEW.method,
NEW.payload::text,
ARRAY_TO_STRING(NEW.targets, ','),
DATE_TRUNC('day', NEW.created_at AT TIME ZONE 'UTC')::text
));
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

COMMENT ON FUNCTION compute_notification_message_dedupe_hash IS 'Computes a unique hash which will be used to prevent duplicate messages from being enqueued on the same day';
CREATE TRIGGER update_notification_message_dedupe_hash
BEFORE INSERT OR UPDATE
ON notification_messages
FOR EACH ROW
EXECUTE FUNCTION compute_notification_message_dedupe_hash();
2 changes: 2 additions & 0 deletions coderd/database/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions coderd/database/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions coderd/database/queries/notifications.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ WHERE nt.id = @notification_template_id
AND u.id = @user_id;

-- name: EnqueueNotificationMessage :exec
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by)
INSERT INTO notification_messages (id, notification_template_id, user_id, method, payload, targets, created_by, created_at)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added created_at to be passed in to make the dedupe hash testable by using Quartz to advance the clock.

VALUES (@id,
@notification_template_id,
@user_id,
@method::notification_method,
@payload::jsonb,
@targets,
@created_by);
@created_by,
@created_at);

-- Acquires the lease for a given count of notification messages, to enable concurrent dequeuing and subsequent sending.
-- Only rows that aren't already leased (or ones which are leased but have exceeded their lease period) are returned.
Expand Down
1 change: 1 addition & 0 deletions coderd/database/unique_constraint.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 18 additions & 2 deletions coderd/notifications/enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/google/uuid"
"golang.org/x/xerrors"

"github.com/coder/quartz"

"cdr.dev/slog"

"github.com/coder/coder/v2/coderd/database"
Expand All @@ -17,7 +19,10 @@ import (
"github.com/coder/coder/v2/codersdk"
)

var ErrCannotEnqueueDisabledNotification = xerrors.New("user has disabled this notification")
var (
ErrCannotEnqueueDisabledNotification = xerrors.New("user has disabled this notification")
ErrDuplicate = xerrors.New("duplicate notification")
)

type StoreEnqueuer struct {
store Store
Expand All @@ -27,10 +32,12 @@ type StoreEnqueuer struct {
// helpers holds a map of template funcs which are used when rendering templates. These need to be passed in because
// the template funcs will return values which are inappropriately encapsulated in this struct.
helpers template.FuncMap
// Used to manipulate time in tests.
clock quartz.Clock
}

// NewStoreEnqueuer creates an Enqueuer implementation which can persist notification messages in the store.
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger) (*StoreEnqueuer, error) {
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger, clock quartz.Clock) (*StoreEnqueuer, error) {
var method database.NotificationMethod
if err := method.Scan(cfg.Method.String()); err != nil {
return nil, xerrors.Errorf("given notification method %q is invalid", cfg.Method)
Expand All @@ -41,6 +48,7 @@ func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers tem
log: log,
defaultMethod: method,
helpers: helpers,
clock: clock,
}, nil
}

Expand Down Expand Up @@ -81,6 +89,7 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
Payload: input,
Targets: targets,
CreatedBy: createdBy,
CreatedAt: s.clock.Now().UTC(), // mimicking dbtime.Now()
})
if err != nil {
// We have a trigger on the notification_messages table named `inhibit_enqueue_if_disabled` which prevents messages
Expand All @@ -92,6 +101,13 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
return nil, ErrCannotEnqueueDisabledNotification
}

// If the enqueue fails due to a dedupe hash conflict, this means that a notification has already been enqueued
// today with identical properties. It's far simpler to prevent duplicate sends in this central manner, rather than
// having each notification enqueue handle its own logic.
if database.IsUniqueViolation(err, database.UniqueNotificationMessagesDedupeHashIndex) {
return nil, ErrDuplicate
}

s.log.Warn(ctx, "failed to enqueue notification", slog.F("template_id", templateID), slog.F("input", input), slog.Error(err))
return nil, xerrors.Errorf("enqueue notification: %w", err)
}
Expand Down
9 changes: 6 additions & 3 deletions coderd/notifications/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"github.com/coder/quartz"

"github.com/coder/serpent"

"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbgen"
"github.com/coder/coder/v2/coderd/notifications"
"github.com/coder/coder/v2/coderd/notifications/dispatch"
"github.com/coder/coder/v2/coderd/notifications/types"
"github.com/coder/coder/v2/testutil"
"github.com/coder/serpent"
)

func TestBufferedUpdates(t *testing.T) {
Expand All @@ -39,7 +42,7 @@ func TestBufferedUpdates(t *testing.T) {
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
database.NotificationMethodSmtp: santa,
})
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"))
enq, err := notifications.NewStoreEnqueuer(cfg, interceptor, defaultHelpers(), logger.Named("notifications-enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := dbgen.User(t, db, database.User{})
Expand Down Expand Up @@ -127,7 +130,7 @@ func TestBuildPayload(t *testing.T) {
}
})

enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"))
enq, err := notifications.NewStoreEnqueuer(defaultNotificationsConfig(database.NotificationMethodSmtp), interceptor, helpers, logger.Named("notifications-enqueuer"), quartz.NewReal())
require.NoError(t, err)

// WHEN: a notification is enqueued
Expand Down
10 changes: 6 additions & 4 deletions coderd/notifications/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/coder/quartz"

"github.com/coder/serpent"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another instance of import grouping, this is the job of the tooling tbh (and it's failing), so up to you if you want to fix it 😄. (There are actually a few more of these too but I'll leave those uncommented.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave this for now; we need the tooling to be updated as you suggest.
Thanks for pointing this out though


"github.com/coder/coder/v2/coderd/database"
Expand Down Expand Up @@ -61,7 +63,7 @@ func TestMetrics(t *testing.T) {
method: handler,
})

enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := createSampleUser(t, store)
Expand Down Expand Up @@ -228,7 +230,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
method: handler,
})

enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := createSampleUser(t, store)
Expand Down Expand Up @@ -305,7 +307,7 @@ func TestInflightDispatchesMetric(t *testing.T) {
method: delayer,
})

enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := createSampleUser(t, store)
Expand Down Expand Up @@ -384,7 +386,7 @@ func TestCustomMethodMetricCollection(t *testing.T) {
customMethod: webhookHandler,
})

enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"))
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
require.NoError(t, err)

user := createSampleUser(t, store)
Expand Down
Loading
Loading