@@ -2,34 +2,144 @@ package notifications
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
6
+ "sync"
7
+ "time"
5
8
6
- "github.com/coder/coder/v2/coderd/database "
9
+ "github.com/google/uuid "
7
10
"golang.org/x/sync/errgroup"
11
+ "golang.org/x/xerrors"
12
+
13
+ "github.com/coder/coder/v2/coderd/database"
8
14
)
9
15
16
+ const MaxAttempts = 5 // TODO: configurable
17
+
18
+ // Manager manages all notifications being enqueued and dispatched.
19
+ //
20
+ // Manager maintains a group of notifiers: these consume the queue of notification messages in the store.
21
+ //
22
+ // Notifiers dequeue messages from the store _n_ at a time and concurrently "dispatch" these messages, meaning they are
23
+ // sent to their respective receivers (email, webhook, etc).
24
+ //
25
+ // To reduce load on the store, successful and failed dispatches are accumulated in two separate buffers (success/failure)
26
+ // in the Manager, and updates are sent to the store about which messages succeeded or failed every _n_ seconds.
27
+ // These buffers are limited in size, naturally introduces some backpressure; if there are hundreds of messages to be
28
+ // sent but they start failing too quickly, the buffers (receive channels) will fill up and block senders, which will
29
+ // slow down the dispatch rate.
30
+ //
31
+ // NOTE: The above backpressure mechanism only works if all notifiers live within the same process, which may not be true
32
+ // forever, such as if we split notifiers out into separate targets for greater processing throughput; in this case we
33
+ // will need an alternative mechanism for handling backpressure.
10
34
type Manager struct {
11
35
db Store
36
+ mu sync.Mutex
12
37
notifiers []* notifier
38
+
39
+ stop chan any
40
+ notifierCtx context.Context
41
+ notifierCancel context.CancelCauseFunc
13
42
}
14
43
15
44
type Store interface {
16
45
AcquireNotificationMessages (ctx context.Context , params database.AcquireNotificationMessagesParams ) ([]database.NotificationMessage , error )
17
46
}
18
47
19
48
func NewManager (db Store ) * Manager {
20
- return & Manager {db : db }
49
+ return & Manager {db : db , stop : make ( chan any ) }
21
50
}
22
51
23
- func (m * Manager ) Run (nc int ) error {
52
+ func (m * Manager ) Run (ctx context.Context , nc int ) error {
53
+ select {
54
+ case <- m .stop :
55
+ return xerrors .Errorf ("gracefully stopped" )
56
+ case <- ctx .Done ():
57
+ return xerrors .Errorf ("ungraceful stop: %w" , ctx .Err ())
58
+ default :
59
+ }
60
+
24
61
var eg errgroup.Group
25
62
63
+ var (
64
+ success = make (chan dispatchResult , 50 )
65
+ failure = make (chan dispatchResult , 50 )
66
+ )
67
+
26
68
for i := 0 ; i < nc ; i ++ {
27
69
eg .Go (func () error {
28
- n := newNotifier (m .db )
70
+ m .mu .Lock ()
71
+ n := newNotifier (ctx , m .db )
29
72
m .notifiers = append (m .notifiers , n )
30
- return n .run ()
73
+ m .mu .Unlock ()
74
+ return n .run (ctx , success , failure )
31
75
})
32
76
}
33
77
34
- return eg .Wait ()
78
+ go func () {
79
+ // Every second, collect the messages in the channels and bulk update them in the database.
80
+ tick := time .NewTicker (time .Second )
81
+ defer tick .Stop ()
82
+ for {
83
+ select {
84
+ case <- ctx .Done ():
85
+ fmt .Printf ("context canceled with %d success and %d failure messages still pending!\n " , len (success ), len (failure ))
86
+ return
87
+ case <- m .stop :
88
+ // Drain before stopping.
89
+ m .drain (success , failure )
90
+ return
91
+ case <- tick .C :
92
+ m .drain (success , failure )
93
+ }
94
+ }
95
+ }()
96
+
97
+ err := eg .Wait ()
98
+ fmt .Printf ("MANAGER DONE: %v\n " , err )
99
+ return err
100
+ }
101
+
102
+ func (m * Manager ) drain (success , failure <- chan dispatchResult ) {
103
+ var wg sync.WaitGroup
104
+ wg .Add (2 )
105
+
106
+ var sCount , fCount int
107
+
108
+ go func () {
109
+ defer wg .Done ()
110
+ count := len (success )
111
+ for i := 0 ; i < count ; i ++ {
112
+ _ = <- success
113
+ // fmt.Printf("[%s] SUCCESS: %s\n", res.notifier, res.msg)
114
+ sCount ++
115
+ }
116
+ }()
117
+
118
+ go func () {
119
+ defer wg .Done ()
120
+ count := len (failure )
121
+ for i := 0 ; i < count ; i ++ {
122
+ _ = <- failure
123
+ // fmt.Printf("[%s] FAILURE: %s -> %s (%v)\n", res.notifier, res.msg, res.err, res.retryable)
124
+ fCount ++
125
+ }
126
+ }()
127
+
128
+ wg .Wait ()
129
+
130
+ fmt .Printf ("\t >S: %d, F: %d, T: %d\n " , sCount , fCount , sCount + fCount )
131
+ }
132
+
133
+ func (m * Manager ) Stop () {
134
+ for _ , n := range m .notifiers {
135
+ n .stop ()
136
+ }
137
+ close (m .stop )
138
+ }
139
+
140
+ type dispatchResult struct {
141
+ notifier uuid.UUID
142
+ msg uuid.UUID
143
+ err error
144
+ retryable bool
35
145
}
0 commit comments