Skip to content

Commit c466c9f

Browse files
authored
Merge branch 'main' into restructure-new
2 parents 0903742 + 4843062 commit c466c9f

File tree

21 files changed

+805
-499
lines changed

21 files changed

+805
-499
lines changed

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ jobs:
184184
185185
# Check for any typos
186186
- name: Check for typos
187-
uses: crate-ci/typos@v1.23.6
187+
uses: crate-ci/typos@v1.24.3
188188
with:
189189
config: .github/workflows/typos.toml
190190

cli/root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ func (r *RootCmd) Command(subcommands []*serpent.Command) (*serpent.Command, err
256256
cmd.Use = fmt.Sprintf("%s %s %s", tokens[0], flags, tokens[1])
257257
})
258258

259-
// Add alises when appropriate.
259+
// Add aliases when appropriate.
260260
cmd.Walk(func(cmd *serpent.Command) {
261261
// TODO: we should really be consistent about naming.
262262
if cmd.Name() == "delete" || cmd.Name() == "remove" {
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DELETE FROM notification_templates WHERE id = '29a09665-2a4c-403f-9648-54301670e7be';

coderd/notifications/dispatch/smtp.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ type SMTPHandler struct {
5353
cfg codersdk.NotificationsEmailConfig
5454
log slog.Logger
5555

56-
loginWarnOnce sync.Once
56+
noAuthWarnOnce sync.Once
57+
loginWarnOnce sync.Once
5758

5859
helpers template.FuncMap
5960
}
@@ -136,14 +137,20 @@ func (s *SMTPHandler) dispatch(subject, htmlBody, plainBody, to string) Delivery
136137

137138
// Check for authentication capabilities.
138139
if ok, avail := c.Extension("AUTH"); ok {
139-
// Ensure the auth mechanisms available are ones we can use.
140+
// Ensure the auth mechanisms available are ones we can use, and create a SASL client.
140141
auth, err := s.auth(ctx, avail)
141142
if err != nil {
142143
return true, xerrors.Errorf("determine auth mechanism: %w", err)
143144
}
144145

145-
// If so, use the auth mechanism to authenticate.
146-
if auth != nil {
146+
if auth == nil {
147+
// If we get here, no SASL client (which handles authentication) was returned.
148+
// This is expected if auth is supported by the smarthost BUT no authentication details were configured.
149+
s.noAuthWarnOnce.Do(func() {
150+
s.log.Warn(ctx, "skipping auth; no authentication client created")
151+
})
152+
} else {
153+
// We have a SASL client, use it to authenticate.
147154
if err := c.Auth(auth); err != nil {
148155
return true, xerrors.Errorf("%T auth: %w", auth, err)
149156
}
@@ -431,6 +438,12 @@ func (s *SMTPHandler) loadCertificate() (*tls.Certificate, error) {
431438
func (s *SMTPHandler) auth(ctx context.Context, mechs string) (sasl.Client, error) {
432439
username := s.cfg.Auth.Username.String()
433440

441+
// All auth mechanisms require username, so if one is not defined then don't return an auth client.
442+
if username == "" {
443+
// nolint:nilnil // This is a valid response.
444+
return nil, nil
445+
}
446+
434447
var errs error
435448
list := strings.Split(mechs, " ")
436449
for _, mech := range list {

coderd/notifications/dispatch/smtp_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ func TestSMTP(t *testing.T) {
203203
retryable: false,
204204
},
205205
{
206-
// No auth, no problem!
207206
name: "No auth mechanisms supported, none configured",
208207
authMechs: []string{},
209208
cfg: codersdk.NotificationsEmailConfig{
@@ -213,6 +212,16 @@ func TestSMTP(t *testing.T) {
213212
toAddrs: []string{to},
214213
expectedAuthMeth: "",
215214
},
215+
{
216+
name: "Auth mechanisms supported optionally, none configured",
217+
authMechs: []string{sasl.Login, sasl.Plain},
218+
cfg: codersdk.NotificationsEmailConfig{
219+
Hello: hello,
220+
From: from,
221+
},
222+
toAddrs: []string{to},
223+
expectedAuthMeth: "",
224+
},
216225
/**
217226
* TLS connections
218227
*/

coderd/notifications/manager.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"golang.org/x/xerrors"
1212

1313
"cdr.dev/slog"
14+
"github.com/coder/quartz"
1415

1516
"github.com/coder/coder/v2/coderd/database"
1617
"github.com/coder/coder/v2/coderd/notifications/dispatch"
@@ -54,13 +55,25 @@ type Manager struct {
5455
stopOnce sync.Once
5556
stop chan any
5657
done chan any
58+
59+
// clock is for testing only
60+
clock quartz.Clock
61+
}
62+
63+
type ManagerOption func(*Manager)
64+
65+
// WithTestClock is used in testing to set the quartz clock on the manager
66+
func WithTestClock(clock quartz.Clock) ManagerOption {
67+
return func(m *Manager) {
68+
m.clock = clock
69+
}
5770
}
5871

5972
// NewManager instantiates a new Manager instance which coordinates notification enqueuing and delivery.
6073
//
6174
// helpers is a map of template helpers which are used to customize notification messages to use global settings like
6275
// access URL etc.
63-
func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, metrics *Metrics, log slog.Logger) (*Manager, error) {
76+
func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, metrics *Metrics, log slog.Logger, opts ...ManagerOption) (*Manager, error) {
6477
// TODO(dannyk): add the ability to use multiple notification methods.
6578
var method database.NotificationMethod
6679
if err := method.Scan(cfg.Method.String()); err != nil {
@@ -74,7 +87,7 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.
7487
return nil, ErrInvalidDispatchTimeout
7588
}
7689

77-
return &Manager{
90+
m := &Manager{
7891
log: log,
7992
cfg: cfg,
8093
store: store,
@@ -95,7 +108,13 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.
95108
done: make(chan any),
96109

97110
handlers: defaultHandlers(cfg, helpers, log),
98-
}, nil
111+
112+
clock: quartz.NewReal(),
113+
}
114+
for _, o := range opts {
115+
o(m)
116+
}
117+
return m, nil
99118
}
100119

101120
// defaultHandlers builds a set of known handlers; panics if any error occurs as these handlers should be valid at compile time.
@@ -150,15 +169,15 @@ func (m *Manager) loop(ctx context.Context) error {
150169
var eg errgroup.Group
151170

152171
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
153-
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.metrics)
172+
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.metrics, m.clock)
154173
eg.Go(func() error {
155174
return m.notifier.run(ctx, m.success, m.failure)
156175
})
157176

158177
// Periodically flush notification state changes to the store.
159178
eg.Go(func() error {
160179
// Every interval, collect the messages in the channels and bulk update them in the store.
161-
tick := time.NewTicker(m.cfg.StoreSyncInterval.Value())
180+
tick := m.clock.NewTicker(m.cfg.StoreSyncInterval.Value(), "Manager", "storeSync")
162181
defer tick.Stop()
163182
for {
164183
select {

coderd/notifications/metrics_test.go

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -221,13 +221,16 @@ func TestPendingUpdatesMetric(t *testing.T) {
221221

222222
// GIVEN: a notification manager whose store updates are intercepted so we can read the number of pending updates set in the metric
223223
cfg := defaultNotificationsConfig(method)
224-
cfg.FetchInterval = serpent.Duration(time.Millisecond * 50)
225224
cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere.
226225
cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100)
227226

228227
syncer := &syncInterceptor{Store: api.Database}
229228
interceptor := newUpdateSignallingInterceptor(syncer)
230-
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, api.Logger.Named("manager"))
229+
mClock := quartz.NewMock(t)
230+
trap := mClock.Trap().NewTicker("Manager", "storeSync")
231+
defer trap.Close()
232+
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, api.Logger.Named("manager"),
233+
notifications.WithTestClock(mClock))
231234
require.NoError(t, err)
232235
t.Cleanup(func() {
233236
assert.NoError(t, mgr.Stop(ctx))
@@ -249,6 +252,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
249252
require.NoError(t, err)
250253

251254
mgr.Run(ctx)
255+
trap.MustWait(ctx).Release() // ensures ticker has been set
252256

253257
// THEN:
254258
// Wait until the handler has dispatched the given notifications.
@@ -259,17 +263,20 @@ func TestPendingUpdatesMetric(t *testing.T) {
259263
return len(handler.succeeded) == 1 && len(handler.failed) == 1
260264
}, testutil.WaitShort, testutil.IntervalFast)
261265

262-
// Wait until we intercept the calls to sync the pending updates to the store.
263-
success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess)
264-
failure := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateFailure)
265-
266-
// Wait for the metric to be updated with the expected count of metrics.
266+
// Both handler calls should be pending in the metrics.
267267
require.Eventually(t, func() bool {
268-
return promtest.ToFloat64(metrics.PendingUpdates) == float64(success+failure)
268+
return promtest.ToFloat64(metrics.PendingUpdates) == float64(2)
269269
}, testutil.WaitShort, testutil.IntervalFast)
270270

271-
// Unpause the interceptor so the updates can proceed.
272-
interceptor.unpause()
271+
// THEN:
272+
// Trigger syncing updates
273+
mClock.Advance(cfg.StoreSyncInterval.Value()).MustWait(ctx)
274+
275+
// Wait until we intercept the calls to sync the pending updates to the store.
276+
success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess)
277+
require.EqualValues(t, 1, success)
278+
failure := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateFailure)
279+
require.EqualValues(t, 1, failure)
273280

274281
// Validate that the store synced the expected number of updates.
275282
require.Eventually(t, func() bool {
@@ -464,43 +471,25 @@ func fingerprintLabels(lbs ...string) model.Fingerprint {
464471
// signaled by the caller so it can continue.
465472
type updateSignallingInterceptor struct {
466473
notifications.Store
467-
468-
pause chan any
469-
470474
updateSuccess chan int
471475
updateFailure chan int
472476
}
473477

474478
func newUpdateSignallingInterceptor(interceptor notifications.Store) *updateSignallingInterceptor {
475479
return &updateSignallingInterceptor{
476-
Store: interceptor,
477-
478-
pause: make(chan any, 1),
479-
480+
Store: interceptor,
480481
updateSuccess: make(chan int, 1),
481482
updateFailure: make(chan int, 1),
482483
}
483484
}
484485

485-
func (u *updateSignallingInterceptor) unpause() {
486-
close(u.pause)
487-
}
488-
489486
func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
490487
u.updateSuccess <- len(arg.IDs)
491-
492-
// Wait until signaled so we have a chance to read the number of pending updates.
493-
<-u.pause
494-
495488
return u.Store.BulkMarkNotificationMessagesSent(ctx, arg)
496489
}
497490

498491
func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
499492
u.updateFailure <- len(arg.IDs)
500-
501-
// Wait until signaled so we have a chance to read the number of pending updates.
502-
<-u.pause
503-
504493
return u.Store.BulkMarkNotificationMessagesFailed(ctx, arg)
505494
}
506495

coderd/notifications/notifier.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"sync"
7-
"time"
87

98
"github.com/google/uuid"
109
"golang.org/x/sync/errgroup"
@@ -15,6 +14,7 @@ import (
1514
"github.com/coder/coder/v2/coderd/notifications/render"
1615
"github.com/coder/coder/v2/coderd/notifications/types"
1716
"github.com/coder/coder/v2/codersdk"
17+
"github.com/coder/quartz"
1818

1919
"cdr.dev/slog"
2020

@@ -29,26 +29,33 @@ type notifier struct {
2929
log slog.Logger
3030
store Store
3131

32-
tick *time.Ticker
32+
tick *quartz.Ticker
3333
stopOnce sync.Once
3434
quit chan any
3535
done chan any
3636

3737
handlers map[database.NotificationMethod]Handler
3838
metrics *Metrics
39+
40+
// clock is for testing
41+
clock quartz.Clock
3942
}
4043

41-
func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, hr map[database.NotificationMethod]Handler, metrics *Metrics) *notifier {
44+
func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store,
45+
hr map[database.NotificationMethod]Handler, metrics *Metrics, clock quartz.Clock,
46+
) *notifier {
47+
tick := clock.NewTicker(cfg.FetchInterval.Value(), "notifier", "fetchInterval")
4248
return &notifier{
4349
id: id,
4450
cfg: cfg,
4551
log: log.Named("notifier").With(slog.F("notifier_id", id)),
4652
quit: make(chan any),
4753
done: make(chan any),
48-
tick: time.NewTicker(cfg.FetchInterval.Value()),
54+
tick: tick,
4955
store: db,
5056
handlers: hr,
5157
metrics: metrics,
58+
clock: clock,
5259
}
5360
}
5461

@@ -245,10 +252,10 @@ func (n *notifier) deliver(ctx context.Context, msg database.AcquireNotification
245252
n.metrics.InflightDispatches.WithLabelValues(string(msg.Method), msg.TemplateID.String()).Inc()
246253
n.metrics.QueuedSeconds.WithLabelValues(string(msg.Method)).Observe(msg.QueuedSeconds)
247254

248-
start := time.Now()
255+
start := n.clock.Now()
249256
retryable, err := deliver(ctx, msg.ID)
250257

251-
n.metrics.DispatcherSendSeconds.WithLabelValues(string(msg.Method)).Observe(time.Since(start).Seconds())
258+
n.metrics.DispatcherSendSeconds.WithLabelValues(string(msg.Method)).Observe(n.clock.Since(start).Seconds())
252259
n.metrics.InflightDispatches.WithLabelValues(string(msg.Method), msg.TemplateID.String()).Dec()
253260

254261
if err != nil {
@@ -291,7 +298,7 @@ func (n *notifier) newSuccessfulDispatch(msg database.AcquireNotificationMessage
291298
return dispatchResult{
292299
notifier: n.id,
293300
msg: msg.ID,
294-
ts: dbtime.Now(),
301+
ts: dbtime.Time(n.clock.Now().UTC()),
295302
}
296303
}
297304

@@ -311,7 +318,7 @@ func (n *notifier) newFailedDispatch(msg database.AcquireNotificationMessagesRow
311318
return dispatchResult{
312319
notifier: n.id,
313320
msg: msg.ID,
314-
ts: dbtime.Now(),
321+
ts: dbtime.Time(n.clock.Now().UTC()),
315322
err: err,
316323
retryable: retryable,
317324
}
@@ -321,7 +328,7 @@ func (n *notifier) newInhibitedDispatch(msg database.AcquireNotificationMessages
321328
return dispatchResult{
322329
notifier: n.id,
323330
msg: msg.ID,
324-
ts: dbtime.Now(),
331+
ts: dbtime.Time(n.clock.Now().UTC()),
325332
retryable: false,
326333
inhibited: true,
327334
}

0 commit comments

Comments
 (0)