@@ -44,7 +44,6 @@ type Manager struct {
44
44
store Store
45
45
log slog.Logger
46
46
47
- notifier * notifier
48
47
handlers map [database.NotificationMethod ]Handler
49
48
method database.NotificationMethod
50
49
helpers template.FuncMap
@@ -53,11 +52,13 @@ type Manager struct {
53
52
54
53
success , failure chan dispatchResult
55
54
56
- runOnce sync.Once
57
- stopOnce sync.Once
58
- doneOnce sync.Once
59
- stop chan any
60
- done chan any
55
+ mu sync.Mutex // Protects following.
56
+ closed bool
57
+ notifier * notifier
58
+
59
+ runOnce sync.Once
60
+ stop chan any
61
+ done chan any
61
62
62
63
// clock is for testing only
63
64
clock quartz.Clock
@@ -138,7 +139,7 @@ func (m *Manager) WithHandlers(reg map[database.NotificationMethod]Handler) {
138
139
// Manager requires system-level permissions to interact with the store.
139
140
// Run is only intended to be run once.
140
141
func (m * Manager ) Run (ctx context.Context ) {
141
- m .log .Info (ctx , "started" )
142
+ m .log .Debug (ctx , "notification manager started" )
142
143
143
144
m .runOnce .Do (func () {
144
145
// Closes when Stop() is called or context is canceled.
@@ -155,31 +156,26 @@ func (m *Manager) Run(ctx context.Context) {
155
156
// events, creating a notifier, and publishing bulk dispatch result updates to the store.
156
157
func (m * Manager ) loop (ctx context.Context ) error {
157
158
defer func () {
158
- m .doneOnce .Do (func () {
159
- close (m .done )
160
- })
161
- m .log .Info (context .Background (), "notification manager stopped" )
159
+ close (m .done )
160
+ m .log .Debug (context .Background (), "notification manager stopped" )
162
161
}()
163
162
164
- // Caught a terminal signal before notifier was created, exit immediately.
165
- select {
166
- case <- m .stop :
167
- m .log .Warn (ctx , "gracefully stopped" )
168
- return xerrors .Errorf ("gracefully stopped" )
169
- case <- ctx .Done ():
170
- m .log .Error (ctx , "ungracefully stopped" , slog .Error (ctx .Err ()))
171
- return xerrors .Errorf ("notifications: %w" , ctx .Err ())
172
- default :
163
+ m .mu .Lock ()
164
+ if m .closed {
165
+ m .mu .Unlock ()
166
+ return xerrors .New ("manager already closed" )
173
167
}
174
168
175
169
var eg errgroup.Group
176
170
177
- // Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
178
171
m .notifier = newNotifier (ctx , m .cfg , uuid .New (), m .log , m .store , m .handlers , m .helpers , m .metrics , m .clock )
179
172
eg .Go (func () error {
173
+ // run the notifier which will handle dequeueing and dispatching notifications.
180
174
return m .notifier .run (m .success , m .failure )
181
175
})
182
176
177
+ m .mu .Unlock ()
178
+
183
179
// Periodically flush notification state changes to the store.
184
180
eg .Go (func () error {
185
181
// Every interval, collect the messages in the channels and bulk update them in the store.
@@ -355,48 +351,46 @@ func (m *Manager) syncUpdates(ctx context.Context) {
355
351
356
352
// Stop stops the notifier and waits until it has stopped.
357
353
func (m * Manager ) Stop (ctx context.Context ) error {
358
- var err error
359
- m .stopOnce .Do (func () {
360
- select {
361
- case <- ctx .Done ():
362
- err = ctx .Err ()
363
- return
364
- default :
365
- }
354
+ m .mu .Lock ()
355
+ defer m .mu .Unlock ()
366
356
367
- m .log .Info (context .Background (), "graceful stop requested" )
357
+ if m .closed {
358
+ return nil
359
+ }
360
+ m .closed = true
368
361
369
- // If the notifier hasn't been started, we don't need to wait for anything.
370
- // This is only really during testing when we want to enqueue messages only but not deliver them.
371
- if m .notifier == nil {
372
- m .doneOnce .Do (func () {
373
- close (m .done )
374
- })
375
- } else {
376
- m .notifier .stop ()
377
- }
362
+ m .log .Debug (context .Background (), "graceful stop requested" )
363
+
364
+ // If the notifier hasn't been started, we don't need to wait for anything.
365
+ // This is only really during testing when we want to enqueue messages only but not deliver them.
366
+ if m .notifier != nil {
367
+ m .notifier .stop ()
368
+ }
378
369
379
- // Signal the stop channel to cause loop to exit.
380
- close (m .stop )
370
+ // Signal the stop channel to cause loop to exit.
371
+ close (m .stop )
381
372
382
- // Wait for the manager loop to exit or the context to be canceled, whichever comes first.
383
- select {
384
- case <- ctx .Done ():
385
- var errStr string
386
- if ctx .Err () != nil {
387
- errStr = ctx .Err ().Error ()
388
- }
389
- // For some reason, slog.Error returns {} for a context error.
390
- m .log .Error (context .Background (), "graceful stop failed" , slog .F ("err" , errStr ))
391
- err = ctx .Err ()
392
- return
393
- case <- m .done :
394
- m .log .Info (context .Background (), "gracefully stopped" )
395
- return
396
- }
397
- })
373
+ if m .notifier == nil {
374
+ return nil
375
+ }
398
376
399
- return err
377
+ m .mu .Unlock () // Unlock to avoid blocking loop.
378
+ defer m .mu .Lock () // Re-lock the mutex due to earlier defer.
379
+
380
+ // Wait for the manager loop to exit or the context to be canceled, whichever comes first.
381
+ select {
382
+ case <- ctx .Done ():
383
+ var errStr string
384
+ if ctx .Err () != nil {
385
+ errStr = ctx .Err ().Error ()
386
+ }
387
+ // For some reason, slog.Error returns {} for a context error.
388
+ m .log .Error (context .Background (), "graceful stop failed" , slog .F ("err" , errStr ))
389
+ return ctx .Err ()
390
+ case <- m .done :
391
+ m .log .Debug (context .Background (), "gracefully stopped" )
392
+ return nil
393
+ }
400
394
}
401
395
402
396
type dispatchResult struct {
0 commit comments