Skip to content

Commit 5d02e30

Browse files
committed
feat: add watchdog to pubsub
1 parent 646ac94 commit 5d02e30

File tree

2 files changed

+100
-0
lines changed

2 files changed

+100
-0
lines changed

cli/server.go

+2
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,8 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
683683
options.PrometheusRegistry.MustRegister(ps)
684684
}
685685
defer options.Pubsub.Close()
686+
psWatchdog := pubsub.NewWatchdog(ctx, logger.Named("pswatch"), ps)
687+
defer psWatchdog.Close()
686688
}
687689

688690
if options.DeploymentValues.Prometheus.Enable && options.DeploymentValues.Prometheus.CollectDBMetrics {

coderd/database/pubsub/watchdog.go

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package pubsub
2+
3+
import (
4+
"context"
5+
"runtime/pprof"
6+
"strings"
7+
"sync"
8+
"time"
9+
10+
"cdr.dev/slog"
11+
)
12+
13+
const (
14+
eventPubsubWatchdog = "pubsub_watchdog"
15+
periodHeartbeat = 15 * time.Second
16+
periodTimeout = 3 * periodHeartbeat
17+
)
18+
19+
type Watchdog struct {
20+
ctx context.Context
21+
cancel context.CancelFunc
22+
logger slog.Logger
23+
ps Pubsub
24+
wg sync.WaitGroup
25+
}
26+
27+
func NewWatchdog(ctx context.Context, logger slog.Logger, ps Pubsub) *Watchdog {
28+
ctx, cancel := context.WithCancel(ctx)
29+
w := &Watchdog{
30+
ctx: ctx,
31+
cancel: cancel,
32+
logger: logger,
33+
ps: ps,
34+
}
35+
w.wg.Add(2)
36+
go w.publishLoop()
37+
go w.subscribeMonitor()
38+
return w
39+
}
40+
41+
func (w *Watchdog) Close() error {
42+
w.cancel()
43+
w.wg.Wait()
44+
return nil
45+
}
46+
47+
func (w *Watchdog) publishLoop() {
48+
defer w.wg.Done()
49+
tkr := time.NewTicker(periodHeartbeat)
50+
defer tkr.Stop()
51+
for {
52+
select {
53+
case <-w.ctx.Done():
54+
w.logger.Debug(w.ctx, "context done; exiting publishLoop")
55+
return
56+
case <-tkr.C:
57+
err := w.ps.Publish(eventPubsubWatchdog, []byte{})
58+
if err != nil {
59+
w.logger.Warn(w.ctx, "failed to publish heartbeat on pubsub watchdog", slog.Error(err))
60+
}
61+
}
62+
}
63+
}
64+
65+
func (w *Watchdog) subscribeMonitor() {
66+
defer w.wg.Done()
67+
beats := make(chan struct{})
68+
unsub, err := w.ps.Subscribe(eventPubsubWatchdog, func(context.Context, []byte) {
69+
w.logger.Debug(w.ctx, "got heartbeat for pubsub watchdog")
70+
select {
71+
case <-w.ctx.Done():
72+
case beats <- struct{}{}:
73+
}
74+
})
75+
if err != nil {
76+
w.logger.Fatal(w.ctx, "watchdog failed to subscribe", slog.Error(err))
77+
}
78+
defer unsub()
79+
tmr := time.NewTimer(periodTimeout)
80+
defer tmr.Stop()
81+
for {
82+
select {
83+
case <-w.ctx.Done():
84+
w.logger.Debug(w.ctx, "context done; exiting subscribeMonitor")
85+
return
86+
case <-beats:
87+
// c.f. https://pkg.go.dev/time#Timer.Reset
88+
if !tmr.Stop() {
89+
<-tmr.C
90+
}
91+
tmr.Reset(periodTimeout)
92+
case <-tmr.C:
93+
buf := new(strings.Builder)
94+
_ = pprof.Lookup("goroutine").WriteTo(buf, 1)
95+
w.logger.Fatal(w.ctx, "pubsub watchdog timeout", slog.F("goroutines", buf.String()))
96+
}
97+
}
98+
}

0 commit comments

Comments
 (0)