Skip to content

Commit 613e074

Browse files
committed
Avoid race by exposing number of pending updates
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent aff9e6c commit 613e074

File tree

2 files changed

+55
-30
lines changed

2 files changed

+55
-30
lines changed

coderd/notifications/manager.go

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ type Manager struct {
4444
notifier *notifier
4545
handlers map[database.NotificationMethod]Handler
4646

47+
success, failure chan dispatchResult
48+
4749
runOnce sync.Once
4850
stopOnce sync.Once
4951
stop chan any
@@ -67,6 +69,15 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger)
6769
cfg: cfg,
6870
store: store,
6971

72+
// Buffer successful/failed notification dispatches in memory to reduce load on the store.
73+
//
74+
// We keep separate buffered for success/failure right now because the bulk updates are already a bit janky,
75+
// see BulkMarkNotificationMessagesSent/BulkMarkNotificationMessagesFailed. If we had the ability to batch updates,
76+
// like is offered in https://docs.sqlc.dev/en/stable/reference/query-annotations.html#batchmany, we'd have a cleaner
77+
// approach to this - but for now this will work fine.
78+
success: make(chan dispatchResult, cfg.StoreSyncBufferSize),
79+
failure: make(chan dispatchResult, cfg.StoreSyncBufferSize),
80+
7081
stop: make(chan any),
7182
done: make(chan any),
7283

@@ -123,23 +134,12 @@ func (m *Manager) loop(ctx context.Context) error {
123134
default:
124135
}
125136

126-
var (
127-
// Buffer successful/failed notification dispatches in memory to reduce load on the store.
128-
//
129-
// We keep separate buffered for success/failure right now because the bulk updates are already a bit janky,
130-
// see BulkMarkNotificationMessagesSent/BulkMarkNotificationMessagesFailed. If we had the ability to batch updates,
131-
// like is offered in https://docs.sqlc.dev/en/stable/reference/query-annotations.html#batchmany, we'd have a cleaner
132-
// approach to this - but for now this will work fine.
133-
success = make(chan dispatchResult, m.cfg.StoreSyncBufferSize)
134-
failure = make(chan dispatchResult, m.cfg.StoreSyncBufferSize)
135-
)
136-
137137
var eg errgroup.Group
138138

139139
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
140140
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers)
141141
eg.Go(func() error {
142-
return m.notifier.run(ctx, success, failure)
142+
return m.notifier.run(ctx, m.success, m.failure)
143143
})
144144

145145
// Periodically flush notification state changes to the store.
@@ -162,21 +162,21 @@ func (m *Manager) loop(ctx context.Context) error {
162162
// TODO: mention the above tradeoff in documentation.
163163
m.log.Warn(ctx, "exiting ungracefully", slog.Error(ctx.Err()))
164164

165-
if len(success)+len(failure) > 0 {
165+
if len(m.success)+len(m.failure) > 0 {
166166
m.log.Warn(ctx, "content canceled with pending updates in buffer, these messages will be sent again after lease expires",
167-
slog.F("success_count", len(success)), slog.F("failure_count", len(failure)))
167+
slog.F("success_count", len(m.success)), slog.F("failure_count", len(m.failure)))
168168
}
169169
return ctx.Err()
170170
case <-m.stop:
171-
if len(success)+len(failure) > 0 {
171+
if len(m.success)+len(m.failure) > 0 {
172172
m.log.Warn(ctx, "flushing buffered updates before stop",
173-
slog.F("success_count", len(success)), slog.F("failure_count", len(failure)))
174-
m.bulkUpdate(ctx, success, failure)
173+
slog.F("success_count", len(m.success)), slog.F("failure_count", len(m.failure)))
174+
m.bulkUpdate(ctx)
175175
m.log.Warn(ctx, "flushing updates done")
176176
}
177177
return nil
178178
case <-tick.C:
179-
m.bulkUpdate(ctx, success, failure)
179+
m.bulkUpdate(ctx)
180180
}
181181
}
182182
})
@@ -188,16 +188,22 @@ func (m *Manager) loop(ctx context.Context) error {
188188
return err
189189
}
190190

191+
// BufferedUpdatesCount returns the number of buffered updates which are currently waiting to be flushed to the store.
192+
// The returned values are for success & failure, respectively.
193+
func (m *Manager) BufferedUpdatesCount() (success int, failure int) {
194+
return len(m.success), len(m.failure)
195+
}
196+
191197
// bulkUpdate updates messages in the store based on the given successful and failed message dispatch results.
192-
func (m *Manager) bulkUpdate(ctx context.Context, success, failure <-chan dispatchResult) {
198+
func (m *Manager) bulkUpdate(ctx context.Context) {
193199
select {
194200
case <-ctx.Done():
195201
return
196202
default:
197203
}
198204

199-
nSuccess := len(success)
200-
nFailure := len(failure)
205+
nSuccess := len(m.success)
206+
nFailure := len(m.failure)
201207

202208
// Nothing to do.
203209
if nSuccess+nFailure == 0 {
@@ -217,12 +223,12 @@ func (m *Manager) bulkUpdate(ctx context.Context, success, failure <-chan dispat
217223
// will be processed on the next bulk update.
218224

219225
for i := 0; i < nSuccess; i++ {
220-
res := <-success
226+
res := <-m.success
221227
successParams.IDs = append(successParams.IDs, res.msg)
222228
successParams.SentAts = append(successParams.SentAts, res.ts)
223229
}
224230
for i := 0; i < nFailure; i++ {
225-
res := <-failure
231+
res := <-m.failure
226232

227233
status := database.NotificationMessageStatusPermanentFailure
228234
if res.retryable {

coderd/notifications/manager_test.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"encoding/json"
66
"sync/atomic"
77
"testing"
8+
"time"
89

10+
"github.com/coder/serpent"
911
"github.com/google/uuid"
1012
"github.com/stretchr/testify/assert"
1113
"github.com/stretchr/testify/require"
@@ -31,11 +33,14 @@ func TestBufferedUpdates(t *testing.T) {
3133
if !dbtestutil.WillUsePostgres() {
3234
t.Skip("This test requires postgres")
3335
}
36+
3437
ctx, logger, db := setup(t)
3538
interceptor := &bulkUpdateInterceptor{Store: db}
36-
3739
santa := &santaHandler{}
40+
3841
cfg := defaultNotificationsConfig(database.NotificationMethodSmtp)
42+
cfg.StoreSyncInterval = serpent.Duration(time.Hour) // Ensure we don't sync the store automatically.
43+
3944
mgr, err := notifications.NewManager(cfg, interceptor, logger.Named("notifications-manager"))
4045
require.NoError(t, err)
4146
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
@@ -47,20 +52,34 @@ func TestBufferedUpdates(t *testing.T) {
4752
user := dbgen.User(t, db, database.User{})
4853

4954
// given
50-
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"nice": "true"}, "")
55+
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"nice": "true"}, "") // Will succeed.
5156
require.NoError(t, err)
52-
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"nice": "true"}, "")
57+
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"nice": "true"}, "") // Will succeed.
5358
require.NoError(t, err)
54-
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"nice": "false"}, "")
59+
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"nice": "false"}, "") // Will fail.
5560
require.NoError(t, err)
5661

5762
// when
5863
mgr.Run(ctx)
5964

6065
// then
6166

67+
const (
68+
expectedSuccess = 2
69+
expectedFailure = 1
70+
)
71+
6272
// Wait for messages to be dispatched.
63-
require.Eventually(t, func() bool { return santa.naughty.Load() == 1 && santa.nice.Load() == 2 }, testutil.WaitMedium, testutil.IntervalFast)
73+
require.Eventually(t, func() bool {
74+
return santa.naughty.Load() == expectedFailure &&
75+
santa.nice.Load() == expectedSuccess
76+
}, testutil.WaitMedium, testutil.IntervalFast)
77+
78+
// Wait for the expected number of buffered updates to be accumulated.
79+
require.Eventually(t, func() bool {
80+
success, failure := mgr.BufferedUpdatesCount()
81+
return success == expectedSuccess && failure == expectedFailure
82+
}, testutil.WaitShort, testutil.IntervalFast)
6483

6584
// Stop the manager which forces an update of buffered updates.
6685
require.NoError(t, mgr.Stop(ctx))
@@ -73,8 +92,8 @@ func TestBufferedUpdates(t *testing.T) {
7392
ct.FailNow()
7493
}
7594

76-
assert.EqualValues(ct, 1, interceptor.failed.Load())
77-
assert.EqualValues(ct, 2, interceptor.sent.Load())
95+
assert.EqualValues(ct, expectedFailure, interceptor.failed.Load())
96+
assert.EqualValues(ct, expectedSuccess, interceptor.sent.Load())
7897
}, testutil.WaitMedium, testutil.IntervalFast)
7998
}
8099

0 commit comments

Comments
 (0)