Skip to content

Commit 31c5c83

Browse files
committed
feat: add watchdog to pubsub
1 parent c7f52b7 commit 31c5c83

File tree

3 files changed

+274
-0
lines changed

3 files changed

+274
-0
lines changed

cli/server.go

+9
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,10 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
655655
options.OIDCConfig = oc
656656
}
657657

658+
// We'll read from this channel in the select below that tracks shutdown. If it remains
659+
// nil, that case of the select will just never fire, but it's important not to have a
660+
// "bare" read on this channel.
661+
var pubsubWatchdogTimeout <-chan struct{}
658662
if vals.InMemoryDatabase {
659663
// This is only used for testing.
660664
options.Database = dbmem.New()
@@ -683,6 +687,9 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
683687
options.PrometheusRegistry.MustRegister(ps)
684688
}
685689
defer options.Pubsub.Close()
690+
psWatchdog := pubsub.NewWatchdog(ctx, logger.Named("pswatch"), ps)
691+
pubsubWatchdogTimeout = psWatchdog.Timeout()
692+
defer psWatchdog.Close()
686693
}
687694

688695
if options.DeploymentValues.Prometheus.Enable && options.DeploymentValues.Prometheus.CollectDBMetrics {
@@ -1031,6 +1038,8 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
10311038
_, _ = io.WriteString(inv.Stdout, cliui.Bold("Interrupt caught, gracefully exiting. Use ctrl+\\ to force quit"))
10321039
case <-tunnelDone:
10331040
exitErr = xerrors.New("dev tunnel closed unexpectedly")
1041+
case <-pubsubWatchdogTimeout:
1042+
exitErr = xerrors.New("pubsub Watchdog timed out")
10341043
case exitErr = <-errCh:
10351044
}
10361045
if exitErr != nil && !xerrors.Is(exitErr, context.Canceled) {

coderd/database/pubsub/watchdog.go

+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package pubsub
2+
3+
import (
4+
"context"
5+
"runtime/pprof"
6+
"strings"
7+
"sync"
8+
"time"
9+
10+
"github.com/benbjohnson/clock"
11+
12+
"cdr.dev/slog"
13+
)
14+
15+
const (
16+
EventPubsubWatchdog = "pubsub_watchdog"
17+
periodHeartbeat = 15 * time.Second
18+
// periodTimeout is the time without receiving a heartbeat (from any publisher) before we
19+
// consider the watchdog to have timed out. There is a tradeoff here between avoiding
20+
// disruption due to a short-lived issue connecting to the postgres database, and restarting
21+
// before the consequences of a non-working pubsub are noticed by end users (e.g. being unable
22+
// to connect to their workspaces).
23+
periodTimeout = 5 * time.Minute
24+
)
25+
26+
type Watchdog struct {
27+
ctx context.Context
28+
cancel context.CancelFunc
29+
logger slog.Logger
30+
ps Pubsub
31+
wg sync.WaitGroup
32+
timeout chan struct{}
33+
34+
// for testing
35+
clock clock.Clock
36+
}
37+
38+
func NewWatchdog(ctx context.Context, logger slog.Logger, ps Pubsub) *Watchdog {
39+
return NewWatchdogWithClock(ctx, logger, ps, clock.New())
40+
}
41+
42+
// NewWatchdogWithClock returns a watchdog with the given clock. Product code should always call NewWatchDog.
43+
func NewWatchdogWithClock(ctx context.Context, logger slog.Logger, ps Pubsub, c clock.Clock) *Watchdog {
44+
ctx, cancel := context.WithCancel(ctx)
45+
w := &Watchdog{
46+
ctx: ctx,
47+
cancel: cancel,
48+
logger: logger,
49+
ps: ps,
50+
timeout: make(chan struct{}),
51+
clock: c,
52+
}
53+
w.wg.Add(2)
54+
go w.publishLoop()
55+
go w.subscribeMonitor()
56+
return w
57+
}
58+
59+
func (w *Watchdog) Close() error {
60+
w.cancel()
61+
w.wg.Wait()
62+
return nil
63+
}
64+
65+
// Timeout returns a channel that is closed if the watchdog times out. Note that the Timeout() chan
66+
// will NOT be closed if the Watchdog is Close'd or its context expires, so it is important to read
67+
// from the Timeout() chan in a select e.g.
68+
//
69+
// w := NewWatchDog(ctx, logger, ps)
70+
// select {
71+
// case <-ctx.Done():
72+
// case <-w.Timeout():
73+
//
74+
// FreakOut()
75+
// }
76+
func (w *Watchdog) Timeout() <-chan struct{} {
77+
return w.timeout
78+
}
79+
80+
func (w *Watchdog) publishLoop() {
81+
defer w.wg.Done()
82+
tkr := w.clock.Ticker(periodHeartbeat)
83+
defer tkr.Stop()
84+
// immediate publish after starting the ticker. This helps testing so that we can tell from
85+
// the outside that the ticker is started.
86+
err := w.ps.Publish(EventPubsubWatchdog, []byte{})
87+
if err != nil {
88+
w.logger.Warn(w.ctx, "failed to publish heartbeat on pubsub watchdog", slog.Error(err))
89+
}
90+
for {
91+
select {
92+
case <-w.ctx.Done():
93+
w.logger.Debug(w.ctx, "context done; exiting publishLoop")
94+
return
95+
case <-tkr.C:
96+
err := w.ps.Publish(EventPubsubWatchdog, []byte{})
97+
if err != nil {
98+
w.logger.Warn(w.ctx, "failed to publish heartbeat on pubsub watchdog", slog.Error(err))
99+
}
100+
}
101+
}
102+
}
103+
104+
func (w *Watchdog) subscribeMonitor() {
105+
defer w.wg.Done()
106+
beats := make(chan struct{})
107+
unsub, err := w.ps.Subscribe(EventPubsubWatchdog, func(context.Context, []byte) {
108+
w.logger.Debug(w.ctx, "got heartbeat for pubsub watchdog")
109+
select {
110+
case <-w.ctx.Done():
111+
case beats <- struct{}{}:
112+
}
113+
})
114+
if err != nil {
115+
w.logger.Critical(w.ctx, "watchdog failed to subscribe", slog.Error(err))
116+
close(w.timeout)
117+
return
118+
}
119+
defer unsub()
120+
tmr := w.clock.Timer(periodTimeout)
121+
defer tmr.Stop()
122+
for {
123+
select {
124+
case <-w.ctx.Done():
125+
w.logger.Debug(w.ctx, "context done; exiting subscribeMonitor")
126+
return
127+
case <-beats:
128+
// c.f. https://pkg.go.dev/time#Timer.Reset
129+
if !tmr.Stop() {
130+
<-tmr.C
131+
}
132+
tmr.Reset(periodTimeout)
133+
case <-tmr.C:
134+
buf := new(strings.Builder)
135+
_ = pprof.Lookup("goroutine").WriteTo(buf, 1)
136+
w.logger.Critical(w.ctx, "pubsub watchdog timeout", slog.F("goroutines", buf.String()))
137+
close(w.timeout)
138+
return
139+
}
140+
}
141+
}
+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package pubsub_test
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/benbjohnson/clock"
8+
"github.com/stretchr/testify/require"
9+
10+
"cdr.dev/slog"
11+
"cdr.dev/slog/sloggers/slogtest"
12+
"github.com/coder/coder/v2/coderd/database/pubsub"
13+
"github.com/coder/coder/v2/testutil"
14+
)
15+
16+
func TestWatchdog_NoTimeout(t *testing.T) {
17+
t.Parallel()
18+
ctx := testutil.Context(t, time.Hour)
19+
mClock := clock.NewMock()
20+
start := time.Date(2024, 2, 5, 8, 7, 6, 5, time.UTC)
21+
mClock.Set(start)
22+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
23+
fPS := newFakePubsub()
24+
uut := pubsub.NewWatchdogWithClock(ctx, logger, fPS, mClock)
25+
26+
sub := testutil.RequireRecvCtx(ctx, t, fPS.subs)
27+
require.Equal(t, pubsub.EventPubsubWatchdog, sub.event)
28+
p := testutil.RequireRecvCtx(ctx, t, fPS.pubs)
29+
require.Equal(t, pubsub.EventPubsubWatchdog, p)
30+
31+
// 5 min / 15 sec = 20, so do 21 ticks
32+
for i := 0; i < 21; i++ {
33+
mClock.Add(15 * time.Second)
34+
p = testutil.RequireRecvCtx(ctx, t, fPS.pubs)
35+
require.Equal(t, pubsub.EventPubsubWatchdog, p)
36+
mClock.Add(30 * time.Millisecond) // reasonable round-trip
37+
// forward the beat
38+
sub.listener(ctx, []byte{})
39+
// we shouldn't time out
40+
select {
41+
case <-uut.Timeout():
42+
t.Fatal("watchdog tripped")
43+
default:
44+
// OK!
45+
}
46+
}
47+
48+
err := uut.Close()
49+
require.NoError(t, err)
50+
}
51+
52+
func TestWatchdog_Timeout(t *testing.T) {
53+
t.Parallel()
54+
ctx := testutil.Context(t, testutil.WaitShort)
55+
mClock := clock.NewMock()
56+
start := time.Date(2024, 2, 5, 8, 7, 6, 5, time.UTC)
57+
mClock.Set(start)
58+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
59+
fPS := newFakePubsub()
60+
uut := pubsub.NewWatchdogWithClock(ctx, logger, fPS, mClock)
61+
62+
sub := testutil.RequireRecvCtx(ctx, t, fPS.subs)
63+
require.Equal(t, pubsub.EventPubsubWatchdog, sub.event)
64+
p := testutil.RequireRecvCtx(ctx, t, fPS.pubs)
65+
require.Equal(t, pubsub.EventPubsubWatchdog, p)
66+
67+
// 5 min / 15 sec = 20, so do 19 ticks without timing out
68+
for i := 0; i < 19; i++ {
69+
mClock.Add(15 * time.Second)
70+
p = testutil.RequireRecvCtx(ctx, t, fPS.pubs)
71+
require.Equal(t, pubsub.EventPubsubWatchdog, p)
72+
mClock.Add(30 * time.Millisecond) // reasonable round-trip
73+
// we DO NOT forward the heartbeat
74+
// we shouldn't time out
75+
select {
76+
case <-uut.Timeout():
77+
t.Fatal("watchdog tripped")
78+
default:
79+
// OK!
80+
}
81+
}
82+
mClock.Add(15 * time.Second)
83+
p = testutil.RequireRecvCtx(ctx, t, fPS.pubs)
84+
require.Equal(t, pubsub.EventPubsubWatchdog, p)
85+
testutil.RequireRecvCtx(ctx, t, uut.Timeout())
86+
87+
err := uut.Close()
88+
require.NoError(t, err)
89+
}
90+
91+
type subscribe struct {
92+
event string
93+
listener pubsub.Listener
94+
}
95+
96+
type fakePubsub struct {
97+
pubs chan string
98+
subs chan subscribe
99+
}
100+
101+
func (f *fakePubsub) Subscribe(event string, listener pubsub.Listener) (func(), error) {
102+
f.subs <- subscribe{event, listener}
103+
return func() {}, nil
104+
}
105+
106+
func (*fakePubsub) SubscribeWithErr(string, pubsub.ListenerWithErr) (func(), error) {
107+
panic("should not be called")
108+
}
109+
110+
func (*fakePubsub) Close() error {
111+
panic("should not be called")
112+
}
113+
114+
func (f *fakePubsub) Publish(event string, _ []byte) error {
115+
f.pubs <- event
116+
return nil
117+
}
118+
119+
func newFakePubsub() *fakePubsub {
120+
return &fakePubsub{
121+
pubs: make(chan string),
122+
subs: make(chan subscribe),
123+
}
124+
}

0 commit comments

Comments
 (0)