Skip to content

Commit 9380d8e

Browse files
committed
Only start one notifier since all dispatches are concurrent anyway
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent 9d4c312 commit 9380d8e

File tree

11 files changed

+77
-109
lines changed

11 files changed

+77
-109
lines changed

cli/server.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ import (
5555

5656
"cdr.dev/slog"
5757
"cdr.dev/slog/sloggers/sloghuman"
58+
"github.com/coder/pretty"
59+
"github.com/coder/retry"
60+
"github.com/coder/serpent"
61+
"github.com/coder/wgtunnel/tunnelsdk"
62+
5863
"github.com/coder/coder/v2/buildinfo"
5964
"github.com/coder/coder/v2/cli/clilog"
6065
"github.com/coder/coder/v2/cli/cliui"
@@ -99,10 +104,6 @@ import (
99104
"github.com/coder/coder/v2/provisionersdk"
100105
sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
101106
"github.com/coder/coder/v2/tailnet"
102-
"github.com/coder/pretty"
103-
"github.com/coder/retry"
104-
"github.com/coder/serpent"
105-
"github.com/coder/wgtunnel/tunnelsdk"
106107
)
107108

108109
func createOIDCConfig(ctx context.Context, vals *codersdk.DeploymentValues) (*coderd.OIDCConfig, error) {
@@ -999,7 +1000,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
9991000
}
10001001

10011002
// nolint:gocritic // TODO: create own role.
1002-
notificationsManager.Run(dbauthz.AsSystemRestricted(ctx), int(cfg.WorkerCount.Value()))
1003+
notificationsManager.Run(dbauthz.AsSystemRestricted(ctx))
10031004
}
10041005

10051006
// Wrap the server in middleware that redirects to the access URL if

cli/testdata/server-config.yaml.golden

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -534,10 +534,6 @@ notifications:
534534
# this option at its default value.
535535
# (default: 50, type: int)
536536
store-sync-buffer-size: 50
537-
# How many workers should be processing messages in the queue; increase this count
538-
# if notifications are not being processed fast enough.
539-
# (default: 2, type: int)
540-
worker-count: 2
541537
# How long a notifier should lease a message. This is effectively how long a
542538
# notification is 'owned' by a notifier, and once this period expires it will be
543539
# available for lease by another notifier. Leasing is important in order for
@@ -547,8 +543,8 @@ notifications:
547543
# (default: 2m0s, type: duration)
548544
lease-period: 2m0s
549545
# How many notifications a notifier should lease per fetch interval.
550-
# (default: 10, type: int)
551-
lease-count: 10
546+
# (default: 20, type: int)
547+
lease-count: 20
552548
# How often to query the database for queued notifications.
553549
# (default: 15s, type: duration)
554550
fetch-interval: 15s

coderd/apidoc/docs.go

Lines changed: 1 addition & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/apidoc/swagger.json

Lines changed: 1 addition & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/notifications/manager.go

Lines changed: 35 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ import (
1818

1919
// Manager manages all notifications being enqueued and dispatched.
2020
//
21-
// Manager maintains a group of notifiers: these consume the queue of notification messages in the store.
21+
// Manager maintains a notifier: this consumes the queue of notification messages in the store.
2222
//
23-
// Notifiers dequeue messages from the store _CODER_NOTIFICATIONS_LEASE_COUNT_ at a time and concurrently "dispatch" these messages, meaning they are
24-
// sent by their respective methods (email, webhook, etc).
23+
// The notifier dequeues messages from the store _CODER_NOTIFICATIONS_LEASE_COUNT_ at a time and concurrently "dispatches"
24+
// these messages, meaning they are sent by their respective methods (email, webhook, etc).
2525
//
2626
// To reduce load on the store, successful and failed dispatches are accumulated in two separate buffers (success/failure)
2727
// of size CODER_NOTIFICATIONS_STORE_SYNC_INTERVAL in the Manager, and updates are sent to the store about which messages
@@ -30,20 +30,19 @@ import (
3030
// sent but they start failing too quickly, the buffers (receive channels) will fill up and block senders, which will
3131
// slow down the dispatch rate.
3232
//
33-
// NOTE: The above backpressure mechanism only works if all notifiers live within the same process, which may not be true
34-
// forever, such as if we split notifiers out into separate targets for greater processing throughput; in this case we
35-
// will need an alternative mechanism for handling backpressure.
33+
// NOTE: The above backpressure mechanism only works within the same process, which may not be true forever, such as if
34+
// we split notifiers out into separate targets for greater processing throughput; in this case we will need an
35+
// alternative mechanism for handling backpressure.
3636
type Manager struct {
3737
cfg codersdk.NotificationsConfig
3838

3939
store Store
4040
log slog.Logger
4141

42-
notifiers []*notifier
43-
notifierMu sync.Mutex
44-
42+
notifier *notifier
4543
handlers map[database.NotificationMethod]Handler
4644

45+
runOnce sync.Once
4746
stopOnce sync.Once
4847
stop chan any
4948
done chan any
@@ -81,25 +80,28 @@ func (m *Manager) WithHandlers(reg map[database.NotificationMethod]Handler) {
8180

8281
// Run initiates the control loop in the background, which spawns a given number of notifier goroutines.
8382
// Manager requires system-level permissions to interact with the store.
84-
func (m *Manager) Run(ctx context.Context, notifiers int) {
85-
// Closes when Stop() is called or context is canceled.
86-
go func() {
87-
err := m.loop(ctx, notifiers)
88-
if err != nil {
89-
m.log.Error(ctx, "notification manager stopped with error", slog.Error(err))
90-
}
91-
}()
83+
// Run is only intended to be run once.
84+
func (m *Manager) Run(ctx context.Context) {
85+
m.runOnce.Do(func() {
86+
// Closes when Stop() is called or context is canceled.
87+
go func() {
88+
err := m.loop(ctx)
89+
if err != nil {
90+
m.log.Error(ctx, "notification manager stopped with error", slog.Error(err))
91+
}
92+
}()
93+
})
9294
}
9395

9496
// loop contains the main business logic of the notification manager. It is responsible for subscribing to notification
95-
// events, creating notifiers, and publishing bulk dispatch result updates to the store.
96-
func (m *Manager) loop(ctx context.Context, notifiers int) error {
97+
// events, creating a notifier, and publishing bulk dispatch result updates to the store.
98+
func (m *Manager) loop(ctx context.Context) error {
9799
defer func() {
98100
close(m.done)
99101
m.log.Info(context.Background(), "notification manager stopped")
100102
}()
101103

102-
// Caught a terminal signal before notifiers were created, exit immediately.
104+
// Caught a terminal signal before notifier was created, exit immediately.
103105
select {
104106
case <-m.stop:
105107
m.log.Warn(ctx, "gracefully stopped")
@@ -121,21 +123,17 @@ func (m *Manager) loop(ctx context.Context, notifiers int) error {
121123
failure = make(chan dispatchResult, m.cfg.StoreSyncBufferSize)
122124
)
123125

124-
// Create a specific number of notifiers to run, and run them concurrently.
125126
var eg errgroup.Group
126-
m.notifierMu.Lock()
127-
for i := 0; i < notifiers; i++ {
128-
n := newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers)
129-
m.notifiers = append(m.notifiers, n)
130-
131-
eg.Go(func() error {
132-
return n.run(ctx, success, failure)
133-
})
134-
}
135-
m.notifierMu.Unlock()
136127

128+
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
129+
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers)
137130
eg.Go(func() error {
138-
// Every interval, collect the messages in the channels and bulk update them in the database.
131+
return m.notifier.run(ctx, success, failure)
132+
})
133+
134+
// Periodically flush notification state changes to the store.
135+
eg.Go(func() error {
136+
// Every interval, collect the messages in the channels and bulk update them in the store.
139137
tick := time.NewTicker(m.cfg.StoreSyncInterval.Value())
140138
defer tick.Stop()
141139
for {
@@ -281,12 +279,8 @@ func (m *Manager) bulkUpdate(ctx context.Context, success, failure <-chan dispat
281279
wg.Wait()
282280
}
283281

284-
// Stop stops all notifiers and waits until they have stopped.
282+
// Stop stops the notifier and waits until it has stopped.
285283
func (m *Manager) Stop(ctx context.Context) error {
286-
// Prevent notifiers from being modified while we're stopping them.
287-
m.notifierMu.Lock()
288-
defer m.notifierMu.Unlock()
289-
290284
var err error
291285
m.stopOnce.Do(func() {
292286
select {
@@ -298,22 +292,14 @@ func (m *Manager) Stop(ctx context.Context) error {
298292

299293
m.log.Info(context.Background(), "graceful stop requested")
300294

301-
// If the notifiers haven't been started, we don't need to wait for anything.
295+
// If the notifier hasn't been started, we don't need to wait for anything.
302296
// This is only really during testing when we want to enqueue messages only but not deliver them.
303-
if len(m.notifiers) == 0 {
297+
if m.notifier == nil {
304298
close(m.done)
299+
} else {
300+
m.notifier.stop()
305301
}
306302

307-
// Stop all notifiers.
308-
var eg errgroup.Group
309-
for _, n := range m.notifiers {
310-
eg.Go(func() error {
311-
n.stop()
312-
return nil
313-
})
314-
}
315-
_ = eg.Wait()
316-
317303
// Signal the stop channel to cause loop to exit.
318304
close(m.stop)
319305

coderd/notifications/manager_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestBufferedUpdates(t *testing.T) {
6060
}
6161

6262
// when
63-
mgr.Run(ctx, 1)
63+
mgr.Run(ctx)
6464

6565
// then
6666

@@ -137,6 +137,19 @@ func TestBuildPayload(t *testing.T) {
137137
}
138138
}
139139

140+
func TestStopBeforeRun(t *testing.T) {
141+
ctx := context.Background()
142+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true, IgnoredErrorIs: []error{}}).Leveled(slog.LevelDebug)
143+
mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), dbmem.New(), logger.Named("notifications-manager"))
144+
require.NoError(t, err)
145+
146+
// Call stop before notifier is started with Run().
147+
require.Eventually(t, func() bool {
148+
assert.NoError(t, mgr.Stop(ctx))
149+
return true
150+
}, testutil.WaitShort, testutil.IntervalFast)
151+
}
152+
140153
type bulkUpdateInterceptor struct {
141154
notifications.Store
142155

coderd/notifications/notifications_test.go

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func TestBasicNotificationRoundtrip(t *testing.T) {
6868
fid, err := enq.Enqueue(ctx, user.UserID, notifications.TemplateWorkspaceDeleted, map[string]string{"type": "failure"}, "test")
6969
require.NoError(t, err)
7070

71-
mgr.Run(ctx, 1)
71+
mgr.Run(ctx)
7272

7373
// then
7474
require.Eventually(t, func() bool { return handler.succeeded == sid.String() }, testutil.WaitLong, testutil.IntervalMedium)
@@ -124,7 +124,7 @@ func TestSMTPDispatch(t *testing.T) {
124124
msgID, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{}, "test")
125125
require.NoError(t, err)
126126

127-
mgr.Run(ctx, 1)
127+
mgr.Run(ctx)
128128

129129
// then
130130
require.Eventually(t, func() bool {
@@ -209,7 +209,7 @@ func TestWebhookDispatch(t *testing.T) {
209209
msgID, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, input, "test")
210210
require.NoError(t, err)
211211

212-
mgr.Run(ctx, 1)
212+
mgr.Run(ctx)
213213

214214
// then
215215
require.Eventually(t, func() bool { return <-sent }, testutil.WaitShort, testutil.IntervalFast)
@@ -289,26 +289,25 @@ func TestBackpressure(t *testing.T) {
289289
require.NoError(t, err)
290290
}
291291

292-
// Start two notifiers.
293-
const notifiers = 2
294-
mgr.Run(ctx, notifiers)
292+
// Start the notifier.
293+
mgr.Run(ctx)
295294

296295
// then
297296

298297
// Wait for 3 fetch intervals, then check progress.
299298
time.Sleep(fetchInterval * 3)
300299

301-
// We expect the notifiers will have dispatched ONLY the initial batch of messages.
302-
// In other words, the notifiers should have dispatched 3 batches by now, but because the buffered updates have not
303-
// been processed there is backpressure.
304-
require.EqualValues(t, notifiers*batchSize, handler.sent.Load()+handler.err.Load())
300+
// We expect the notifier will have dispatched ONLY the initial batch of messages.
301+
// In other words, the notifier should have dispatched 3 batches by now, but because the buffered updates have not
302+
// been processed: there is backpressure.
303+
require.EqualValues(t, batchSize, handler.sent.Load()+handler.err.Load())
305304
// We expect that the store will have received NO updates.
306305
require.EqualValues(t, 0, storeInterceptor.sent.Load()+storeInterceptor.failed.Load())
307306

308307
// However, when we Stop() the manager the backpressure will be relieved and the buffered updates will ALL be flushed,
309-
// since all the goroutines blocked on writing updates to the buffer will be unblocked and will complete.
308+
// since all the goroutines that were blocked (on writing updates to the buffer) will be unblocked and will complete.
310309
require.NoError(t, mgr.Stop(ctx))
311-
require.EqualValues(t, notifiers*batchSize, storeInterceptor.sent.Load()+storeInterceptor.failed.Load())
310+
require.EqualValues(t, batchSize, storeInterceptor.sent.Load()+storeInterceptor.failed.Load())
312311
}
313312

314313
func TestRetries(t *testing.T) {
@@ -394,9 +393,7 @@ func TestRetries(t *testing.T) {
394393
require.NoError(t, err)
395394
}
396395

397-
// Start two notifiers.
398-
const notifiers = 2
399-
mgr.Run(ctx, notifiers)
396+
mgr.Run(ctx)
400397

401398
// then
402399
require.Eventually(t, func() bool {

codersdk/deployment.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,6 @@ type NotificationsConfig struct {
467467
StoreSyncBufferSize serpent.Int64 `json:"sync_buffer_size" typescript:",notnull"`
468468

469469
// Queue.
470-
WorkerCount serpent.Int64 `json:"worker_count"`
471470
LeasePeriod serpent.Duration `json:"lease_period"`
472471
LeaseCount serpent.Int64 `json:"lease_count"`
473472
FetchInterval serpent.Duration `json:"fetch_interval"`
@@ -2196,18 +2195,6 @@ Write out the current server config as YAML to stdout.`,
21962195
YAML: "store-sync-buffer-size",
21972196
Hidden: true, // Hidden because most operators should not need to modify this.
21982197
},
2199-
{
2200-
Name: "Notifications: Worker Count",
2201-
Description: "How many workers should be processing messages in the queue; increase this count if notifications " +
2202-
"are not being processed fast enough.",
2203-
Flag: "notifications-worker-count",
2204-
Env: "CODER_NOTIFICATIONS_WORKER_COUNT",
2205-
Value: &c.Notifications.WorkerCount,
2206-
Default: "2",
2207-
Group: &deploymentGroupNotifications,
2208-
YAML: "worker-count",
2209-
Hidden: true, // Hidden because most operators should not need to modify this.
2210-
},
22112198
{
22122199
Name: "Notifications: Lease Period",
22132200
Description: "How long a notifier should lease a message. This is effectively how long a notification is 'owned' " +
@@ -2230,7 +2217,7 @@ Write out the current server config as YAML to stdout.`,
22302217
Flag: "notifications-lease-count",
22312218
Env: "CODER_NOTIFICATIONS_LEASE_COUNT",
22322219
Value: &c.Notifications.LeaseCount,
2233-
Default: "10",
2220+
Default: "20",
22342221
Group: &deploymentGroupNotifications,
22352222
YAML: "lease-count",
22362223
Hidden: true, // Hidden because most operators should not need to modify this.

docs/api/general.md

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)