Skip to content

Commit d97f0c3

Browse files
committed
WIP
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent 82192a7 commit d97f0c3

File tree

5 files changed

+308
-118
lines changed

5 files changed

+308
-118
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package dispatcher
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math/rand/v2"
7+
"time"
8+
9+
"golang.org/x/xerrors"
10+
11+
"github.com/coder/coder/v2/coderd/database"
12+
)
13+
14+
type SMTPDispatcher struct{}
15+
16+
func (S *SMTPDispatcher) Send(ctx context.Context, msg database.NotificationMessage, title, body string) error {
17+
select {
18+
case <-ctx.Done():
19+
return ctx.Err()
20+
default:
21+
}
22+
23+
// TODO: implement real smtp dispatcher
24+
t := time.Duration(rand.IntN(500)) * time.Millisecond
25+
if rand.IntN(10) > 8 {
26+
t = t + time.Second*2
27+
}
28+
29+
select {
30+
case <-ctx.Done():
31+
return xerrors.Errorf("dispatch prematurely aborted: %w", ctx.Err())
32+
case <-time.After(t):
33+
default:
34+
}
35+
36+
if rand.IntN(10) < 5 {
37+
return xerrors.New(fmt.Sprintf("%s: oops", msg.ID))
38+
}
39+
return nil
40+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package dispatcher
2+
3+
import (
4+
"context"
5+
6+
"github.com/coder/coder/v2/coderd/database"
7+
)
8+
9+
type dispatcher interface {
10+
Send(ctx context.Context, msg database.NotificationMessage, title, body string) error // TODO: don't use database type
11+
}

coderd/notifications/notifications.go

Lines changed: 116 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,144 @@ package notifications
22

33
import (
44
"context"
5+
"fmt"
6+
"sync"
7+
"time"
58

6-
"github.com/coder/coder/v2/coderd/database"
9+
"github.com/google/uuid"
710
"golang.org/x/sync/errgroup"
11+
"golang.org/x/xerrors"
12+
13+
"github.com/coder/coder/v2/coderd/database"
814
)
915

16+
const MaxAttempts = 5 // TODO: configurable
17+
18+
// Manager manages all notifications being enqueued and dispatched.
19+
//
20+
// Manager maintains a group of notifiers: these consume the queue of notification messages in the store.
21+
//
22+
// Notifiers dequeue messages from the store _n_ at a time and concurrently "dispatch" these messages, meaning they are
23+
// sent to their respective receivers (email, webhook, etc).
24+
//
25+
// To reduce load on the store, successful and failed dispatches are accumulated in two separate buffers (success/failure)
26+
// in the Manager, and updates are sent to the store about which messages succeeded or failed every _n_ seconds.
27+
// These buffers are limited in size, naturally introduces some backpressure; if there are hundreds of messages to be
28+
// sent but they start failing too quickly, the buffers (receive channels) will fill up and block senders, which will
29+
// slow down the dispatch rate.
30+
//
31+
// NOTE: The above backpressure mechanism only works if all notifiers live within the same process, which may not be true
32+
// forever, such as if we split notifiers out into separate targets for greater processing throughput; in this case we
33+
// will need an alternative mechanism for handling backpressure.
1034
type Manager struct {
1135
db Store
36+
mu sync.Mutex
1237
notifiers []*notifier
38+
39+
stop chan any
40+
notifierCtx context.Context
41+
notifierCancel context.CancelCauseFunc
1342
}
1443

1544
type Store interface {
1645
AcquireNotificationMessages(ctx context.Context, params database.AcquireNotificationMessagesParams) ([]database.NotificationMessage, error)
1746
}
1847

1948
func NewManager(db Store) *Manager {
20-
return &Manager{db: db}
49+
return &Manager{db: db, stop: make(chan any)}
2150
}
2251

23-
func (m *Manager) Run(nc int) error {
52+
func (m *Manager) Run(ctx context.Context, nc int) error {
53+
select {
54+
case <-m.stop:
55+
return xerrors.Errorf("gracefully stopped")
56+
case <-ctx.Done():
57+
return xerrors.Errorf("ungraceful stop: %w", ctx.Err())
58+
default:
59+
}
60+
2461
var eg errgroup.Group
2562

63+
var (
64+
success = make(chan dispatchResult, 50)
65+
failure = make(chan dispatchResult, 50)
66+
)
67+
2668
for i := 0; i < nc; i++ {
2769
eg.Go(func() error {
28-
n := newNotifier(m.db)
70+
m.mu.Lock()
71+
n := newNotifier(ctx, m.db)
2972
m.notifiers = append(m.notifiers, n)
30-
return n.run()
73+
m.mu.Unlock()
74+
return n.run(ctx, success, failure)
3175
})
3276
}
3377

34-
return eg.Wait()
78+
go func() {
79+
// Every second, collect the messages in the channels and bulk update them in the database.
80+
tick := time.NewTicker(time.Second)
81+
defer tick.Stop()
82+
for {
83+
select {
84+
case <-ctx.Done():
85+
fmt.Printf("context canceled with %d success and %d failure messages still pending!\n", len(success), len(failure))
86+
return
87+
case <-m.stop:
88+
// Drain before stopping.
89+
m.drain(success, failure)
90+
return
91+
case <-tick.C:
92+
m.drain(success, failure)
93+
}
94+
}
95+
}()
96+
97+
err := eg.Wait()
98+
fmt.Printf("MANAGER DONE: %v\n", err)
99+
return err
100+
}
101+
102+
func (m *Manager) drain(success, failure <-chan dispatchResult) {
103+
var wg sync.WaitGroup
104+
wg.Add(2)
105+
106+
var sCount, fCount int
107+
108+
go func() {
109+
defer wg.Done()
110+
count := len(success)
111+
for i := 0; i < count; i++ {
112+
_ = <-success
113+
// fmt.Printf("[%s] SUCCESS: %s\n", res.notifier, res.msg)
114+
sCount++
115+
}
116+
}()
117+
118+
go func() {
119+
defer wg.Done()
120+
count := len(failure)
121+
for i := 0; i < count; i++ {
122+
_ = <-failure
123+
// fmt.Printf("[%s] FAILURE: %s -> %s (%v)\n", res.notifier, res.msg, res.err, res.retryable)
124+
fCount++
125+
}
126+
}()
127+
128+
wg.Wait()
129+
130+
fmt.Printf("\t>S: %d, F: %d, T: %d\n", sCount, fCount, sCount+fCount)
131+
}
132+
133+
func (m *Manager) Stop() {
134+
for _, n := range m.notifiers {
135+
n.stop()
136+
}
137+
close(m.stop)
138+
}
139+
140+
type dispatchResult struct {
141+
notifier uuid.UUID
142+
msg uuid.UUID
143+
err error
144+
retryable bool
35145
}

coderd/notifications/notifications_test.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,47 @@ package notifications_test
22

33
import (
44
"context"
5+
"sync"
56
"testing"
7+
"time"
68

7-
"github.com/coder/coder/v2/coderd/database"
8-
"github.com/coder/coder/v2/coderd/notifications"
99
"github.com/google/uuid"
1010
"github.com/stretchr/testify/require"
11+
"go.uber.org/goleak"
12+
13+
"github.com/coder/coder/v2/coderd/database"
14+
"github.com/coder/coder/v2/coderd/notifications"
1115
)
1216

17+
func TestMain(m *testing.M) {
18+
goleak.VerifyTestMain(m)
19+
}
20+
1321
func TestStuff(t *testing.T) {
1422
n := notifications.NewManager(fakeDB{})
15-
require.NoError(t, n.Run(1))
23+
24+
ctx, cancel := context.WithCancel(context.Background())
25+
t.Cleanup(cancel)
26+
27+
var wg sync.WaitGroup
28+
wg.Add(1)
29+
go func() {
30+
defer wg.Done()
31+
require.ErrorIs(t, n.Run(ctx, 3), context.Canceled)
32+
}()
33+
34+
select {
35+
case <-ctx.Done():
36+
return
37+
case <-time.After(time.Second * 3):
38+
t.Logf("\n\n\n\nCANCELED\n\n\n\n")
39+
cancel()
40+
case <-time.After(time.Second * 5):
41+
t.Logf("\n\n\n\nSTOPPED\n\n\n\n")
42+
n.Stop()
43+
}
44+
45+
wg.Wait()
1646
}
1747

1848
type fakeDB struct{}

0 commit comments

Comments
 (0)