Skip to content

Commit 1d28ccf

Browse files
committed
feat(coderd/notifications): notify pubsub on enqueue
1 parent 7a8e8bc commit 1d28ccf

File tree

4 files changed

+55
-1
lines changed

4 files changed

+55
-1
lines changed

coderd/notifications/enqueuer.go

+9
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ var (
2727
ErrDuplicate = xerrors.New("duplicate notification")
2828
)
2929

30+
const EventNotificationEnqueued = "notification_enqueued"
31+
3032
type InvalidDefaultNotificationMethodError struct {
3133
Method string
3234
}
@@ -83,6 +85,13 @@ func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUI
8385
// Enqueue queues a notification message for later delivery.
8486
// Messages will be dequeued by a notifier later and dispatched.
8587
func (s *StoreEnqueuer) EnqueueWithData(ctx context.Context, userID, templateID uuid.UUID, labels map[string]string, data map[string]any, createdBy string, targets ...uuid.UUID) ([]uuid.UUID, error) {
88+
defer func() {
89+
// Publish an event to notify that a notification has been enqueued.
90+
// Failure to publish is acceptable, as the fetcher will still process the
91+
// message on its next run.
92+
// TODO(Cian): debounce this to maybe once per second or so?
93+
_ = s.ps.Publish(EventNotificationEnqueued, nil)
94+
}()
8695
metadata, err := s.store.FetchNewMessageMetadata(ctx, database.FetchNewMessageMetadataParams{
8796
UserID: userID,
8897
NotificationTemplateID: templateID,

coderd/notifications/fetcher_internal_test.go

+23
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"golang.org/x/xerrors"
1212

1313
"github.com/coder/coder/v2/coderd/database/dbmock"
14+
"github.com/coder/coder/v2/coderd/database/pubsub/psmock"
1415
)
1516

1617
func TestNotifier_FetchHelpers(t *testing.T) {
@@ -21,9 +22,11 @@ func TestNotifier_FetchHelpers(t *testing.T) {
2122

2223
ctrl := gomock.NewController(t)
2324
dbmock := dbmock.NewMockStore(ctrl)
25+
psmock := psmock.NewMockPubsub(ctrl)
2426

2527
n := &notifier{
2628
store: dbmock,
29+
ps: psmock,
2730
helpers: template.FuncMap{},
2831
}
2932

@@ -48,9 +51,11 @@ func TestNotifier_FetchHelpers(t *testing.T) {
4851

4952
ctrl := gomock.NewController(t)
5053
dbmock := dbmock.NewMockStore(ctrl)
54+
psmock := psmock.NewMockPubsub(ctrl)
5155

5256
n := &notifier{
5357
store: dbmock,
58+
ps: psmock,
5459
helpers: template.FuncMap{},
5560
}
5661

@@ -67,9 +72,11 @@ func TestNotifier_FetchHelpers(t *testing.T) {
6772

6873
ctrl := gomock.NewController(t)
6974
dbmock := dbmock.NewMockStore(ctrl)
75+
psmock := psmock.NewMockPubsub(ctrl)
7076

7177
n := &notifier{
7278
store: dbmock,
79+
ps: psmock,
7380
helpers: template.FuncMap{},
7481
}
7582

@@ -90,9 +97,11 @@ func TestNotifier_FetchAppName(t *testing.T) {
9097

9198
ctrl := gomock.NewController(t)
9299
dbmock := dbmock.NewMockStore(ctrl)
100+
psmock := psmock.NewMockPubsub(ctrl)
93101

94102
n := &notifier{
95103
store: dbmock,
104+
ps: psmock,
96105
}
97106

98107
dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("ACME Inc.", nil)
@@ -107,9 +116,11 @@ func TestNotifier_FetchAppName(t *testing.T) {
107116
t.Parallel()
108117
ctrl := gomock.NewController(t)
109118
dbmock := dbmock.NewMockStore(ctrl)
119+
psmock := psmock.NewMockPubsub(ctrl)
110120

111121
n := &notifier{
112122
store: dbmock,
123+
ps: psmock,
113124
}
114125

115126
dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", sql.ErrNoRows)
@@ -125,9 +136,11 @@ func TestNotifier_FetchAppName(t *testing.T) {
125136

126137
ctrl := gomock.NewController(t)
127138
dbmock := dbmock.NewMockStore(ctrl)
139+
psmock := psmock.NewMockPubsub(ctrl)
128140

129141
n := &notifier{
130142
store: dbmock,
143+
ps: psmock,
131144
}
132145

133146
dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", nil)
@@ -143,9 +156,11 @@ func TestNotifier_FetchAppName(t *testing.T) {
143156

144157
ctrl := gomock.NewController(t)
145158
dbmock := dbmock.NewMockStore(ctrl)
159+
psmock := psmock.NewMockPubsub(ctrl)
146160

147161
n := &notifier{
148162
store: dbmock,
163+
ps: psmock,
149164
}
150165

151166
dbmock.EXPECT().GetApplicationName(gomock.Any()).Return("", xerrors.New("internal error"))
@@ -164,9 +179,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) {
164179

165180
ctrl := gomock.NewController(t)
166181
dbmock := dbmock.NewMockStore(ctrl)
182+
psmock := psmock.NewMockPubsub(ctrl)
167183

168184
n := &notifier{
169185
store: dbmock,
186+
ps: psmock,
170187
}
171188

172189
dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("https://example.com/logo.png", nil)
@@ -181,9 +198,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) {
181198
t.Parallel()
182199
ctrl := gomock.NewController(t)
183200
dbmock := dbmock.NewMockStore(ctrl)
201+
psmock := psmock.NewMockPubsub(ctrl)
184202

185203
n := &notifier{
186204
store: dbmock,
205+
ps: psmock,
187206
}
188207

189208
dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", sql.ErrNoRows)
@@ -199,9 +218,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) {
199218

200219
ctrl := gomock.NewController(t)
201220
dbmock := dbmock.NewMockStore(ctrl)
221+
psmock := psmock.NewMockPubsub(ctrl)
202222

203223
n := &notifier{
204224
store: dbmock,
225+
ps: psmock,
205226
}
206227

207228
dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", nil)
@@ -217,9 +238,11 @@ func TestNotifier_FetchLogoURL(t *testing.T) {
217238

218239
ctrl := gomock.NewController(t)
219240
dbmock := dbmock.NewMockStore(ctrl)
241+
psmock := psmock.NewMockPubsub(ctrl)
220242

221243
n := &notifier{
222244
store: dbmock,
245+
ps: psmock,
223246
}
224247

225248
dbmock.EXPECT().GetLogoURL(gomock.Any()).Return("", xerrors.New("internal error"))

coderd/notifications/manager.go

+1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, ps pubsub.Pubsub,
9494
log: log,
9595
cfg: cfg,
9696
store: store,
97+
ps: ps,
9798

9899
// Buffer successful/failed notification dispatches in memory to reduce load on the store.
99100
//

coderd/notifications/notifier.go

+22-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
139139
}
140140
}()
141141

142-
// run the ticker with the graceful context, so we stop fetching after stop() is called
142+
// Periodically trigger the processing loop.
143143
tick := n.clock.TickerFunc(n.gracefulCtx, n.cfg.FetchInterval.Value(), func() error {
144144
c := make(chan struct{})
145145
loopTick <- c
@@ -149,6 +149,27 @@ func (n *notifier) run(success chan<- dispatchResult, failure chan<- dispatchRes
149149
return nil
150150
}, "notifier", "fetchInterval")
151151

152+
// Also signal the processing loop when a notification is enqueued.
153+
if stopListen, err := n.ps.Subscribe(EventNotificationEnqueued, func(ctx context.Context, _ []byte) {
154+
c := make(chan struct{})
155+
select {
156+
case <-ctx.Done():
157+
return
158+
// This is a no-op if the notifier is paused.
159+
case loopTick <- c:
160+
default:
161+
// If the loop is busy, don't send a notification.
162+
n.log.Debug(ctx, "notifier busy, skipping notification")
163+
return
164+
}
165+
}); err != nil {
166+
// Intentionally not making this a fatal error. The notifier will still run,
167+
// albeit without notification events.
168+
n.log.Error(n.outerCtx, "failed to subscribe to notification events", slog.Error(err))
169+
} else {
170+
defer stopListen()
171+
}
172+
152173
// Note the order of operations here.
153174
_ = tick.Wait() // will block until gracefulCtx is done
154175
close(loopTick) // happens immediately

0 commit comments

Comments
 (0)