Skip to content

Commit 1bb96b8

Browse files
defelmnqmafredri
andauthored
fix: resolve flake test on manager (coder#17702)
Fixes coder/internal#544 --------- Co-authored-by: Mathias Fredriksson <mafredri@gmail.com>
1 parent 857587b commit 1bb96b8

File tree

2 files changed

+74
-58
lines changed

2 files changed

+74
-58
lines changed

coderd/notifications/manager.go

+52-58
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ type Manager struct {
4444
store Store
4545
log slog.Logger
4646

47-
notifier *notifier
4847
handlers map[database.NotificationMethod]Handler
4948
method database.NotificationMethod
5049
helpers template.FuncMap
@@ -53,11 +52,13 @@ type Manager struct {
5352

5453
success, failure chan dispatchResult
5554

56-
runOnce sync.Once
57-
stopOnce sync.Once
58-
doneOnce sync.Once
59-
stop chan any
60-
done chan any
55+
mu sync.Mutex // Protects following.
56+
closed bool
57+
notifier *notifier
58+
59+
runOnce sync.Once
60+
stop chan any
61+
done chan any
6162

6263
// clock is for testing only
6364
clock quartz.Clock
@@ -138,7 +139,7 @@ func (m *Manager) WithHandlers(reg map[database.NotificationMethod]Handler) {
138139
// Manager requires system-level permissions to interact with the store.
139140
// Run is only intended to be run once.
140141
func (m *Manager) Run(ctx context.Context) {
141-
m.log.Info(ctx, "started")
142+
m.log.Debug(ctx, "notification manager started")
142143

143144
m.runOnce.Do(func() {
144145
// Closes when Stop() is called or context is canceled.
@@ -155,31 +156,26 @@ func (m *Manager) Run(ctx context.Context) {
155156
// events, creating a notifier, and publishing bulk dispatch result updates to the store.
156157
func (m *Manager) loop(ctx context.Context) error {
157158
defer func() {
158-
m.doneOnce.Do(func() {
159-
close(m.done)
160-
})
161-
m.log.Info(context.Background(), "notification manager stopped")
159+
close(m.done)
160+
m.log.Debug(context.Background(), "notification manager stopped")
162161
}()
163162

164-
// Caught a terminal signal before notifier was created, exit immediately.
165-
select {
166-
case <-m.stop:
167-
m.log.Warn(ctx, "gracefully stopped")
168-
return xerrors.Errorf("gracefully stopped")
169-
case <-ctx.Done():
170-
m.log.Error(ctx, "ungracefully stopped", slog.Error(ctx.Err()))
171-
return xerrors.Errorf("notifications: %w", ctx.Err())
172-
default:
163+
m.mu.Lock()
164+
if m.closed {
165+
m.mu.Unlock()
166+
return xerrors.New("manager already closed")
173167
}
174168

175169
var eg errgroup.Group
176170

177-
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
178171
m.notifier = newNotifier(ctx, m.cfg, uuid.New(), m.log, m.store, m.handlers, m.helpers, m.metrics, m.clock)
179172
eg.Go(func() error {
173+
// run the notifier which will handle dequeueing and dispatching notifications.
180174
return m.notifier.run(m.success, m.failure)
181175
})
182176

177+
m.mu.Unlock()
178+
183179
// Periodically flush notification state changes to the store.
184180
eg.Go(func() error {
185181
// Every interval, collect the messages in the channels and bulk update them in the store.
@@ -355,48 +351,46 @@ func (m *Manager) syncUpdates(ctx context.Context) {
355351

356352
// Stop stops the notifier and waits until it has stopped.
357353
func (m *Manager) Stop(ctx context.Context) error {
358-
var err error
359-
m.stopOnce.Do(func() {
360-
select {
361-
case <-ctx.Done():
362-
err = ctx.Err()
363-
return
364-
default:
365-
}
354+
m.mu.Lock()
355+
defer m.mu.Unlock()
366356

367-
m.log.Info(context.Background(), "graceful stop requested")
357+
if m.closed {
358+
return nil
359+
}
360+
m.closed = true
368361

369-
// If the notifier hasn't been started, we don't need to wait for anything.
370-
// This is only really during testing when we want to enqueue messages only but not deliver them.
371-
if m.notifier == nil {
372-
m.doneOnce.Do(func() {
373-
close(m.done)
374-
})
375-
} else {
376-
m.notifier.stop()
377-
}
362+
m.log.Debug(context.Background(), "graceful stop requested")
363+
364+
// If the notifier hasn't been started, we don't need to wait for anything.
365+
// This is only really during testing when we want to enqueue messages only but not deliver them.
366+
if m.notifier != nil {
367+
m.notifier.stop()
368+
}
378369

379-
// Signal the stop channel to cause loop to exit.
380-
close(m.stop)
370+
// Signal the stop channel to cause loop to exit.
371+
close(m.stop)
381372

382-
// Wait for the manager loop to exit or the context to be canceled, whichever comes first.
383-
select {
384-
case <-ctx.Done():
385-
var errStr string
386-
if ctx.Err() != nil {
387-
errStr = ctx.Err().Error()
388-
}
389-
// For some reason, slog.Error returns {} for a context error.
390-
m.log.Error(context.Background(), "graceful stop failed", slog.F("err", errStr))
391-
err = ctx.Err()
392-
return
393-
case <-m.done:
394-
m.log.Info(context.Background(), "gracefully stopped")
395-
return
396-
}
397-
})
373+
if m.notifier == nil {
374+
return nil
375+
}
398376

399-
return err
377+
m.mu.Unlock() // Unlock to avoid blocking loop.
378+
defer m.mu.Lock() // Re-lock the mutex due to earlier defer.
379+
380+
// Wait for the manager loop to exit or the context to be canceled, whichever comes first.
381+
select {
382+
case <-ctx.Done():
383+
var errStr string
384+
if ctx.Err() != nil {
385+
errStr = ctx.Err().Error()
386+
}
387+
// For some reason, slog.Error returns {} for a context error.
388+
m.log.Error(context.Background(), "graceful stop failed", slog.F("err", errStr))
389+
return ctx.Err()
390+
case <-m.done:
391+
m.log.Debug(context.Background(), "gracefully stopped")
392+
return nil
393+
}
400394
}
401395

402396
type dispatchResult struct {

coderd/notifications/manager_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,28 @@ func TestStopBeforeRun(t *testing.T) {
182182
}, testutil.WaitShort, testutil.IntervalFast)
183183
}
184184

185+
func TestRunStopRace(t *testing.T) {
186+
t.Parallel()
187+
188+
// SETUP
189+
190+
// nolint:gocritic // Unit test.
191+
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitMedium))
192+
store, ps := dbtestutil.NewDB(t)
193+
logger := testutil.Logger(t)
194+
195+
// GIVEN: a standard manager
196+
mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), store, ps, defaultHelpers(), createMetrics(), logger.Named("notifications-manager"))
197+
require.NoError(t, err)
198+
199+
// Start Run and Stop after each other (run does "go loop()").
200+
// This is to catch a (now fixed) race condition where the manager
201+
// would be accessed/stopped while it was being created/starting up.
202+
mgr.Run(ctx)
203+
err = mgr.Stop(ctx)
204+
require.NoError(t, err)
205+
}
206+
185207
type syncInterceptor struct {
186208
notifications.Store
187209

0 commit comments

Comments
 (0)