Skip to content

Commit 4671ebb

Browse files
authored
feat: measure pubsub latencies and expose metrics (coder#13126)
1 parent e14f8fb commit 4671ebb

File tree

5 files changed

+326
-38
lines changed

5 files changed

+326
-38
lines changed

coderd/database/pubsub/latency.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package pubsub
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"time"
8+
9+
"github.com/google/uuid"
10+
"golang.org/x/xerrors"
11+
12+
"cdr.dev/slog"
13+
)
14+
15+
// LatencyMeasurer is used to measure the send & receive latencies of the underlying Pubsub implementation. We use these
16+
// measurements to export metrics which can indicate when a Pubsub implementation's queue is overloaded and/or full.
17+
type LatencyMeasurer struct {
18+
// Create unique pubsub channel names so that multiple coderd replicas do not clash when performing latency measurements.
19+
channel uuid.UUID
20+
logger slog.Logger
21+
}
22+
23+
// LatencyMessageLength is the length of a UUIDv4 encoded to hex.
24+
const LatencyMessageLength = 36
25+
26+
func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
27+
return &LatencyMeasurer{
28+
channel: uuid.New(),
29+
logger: logger,
30+
}
31+
}
32+
33+
// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
34+
func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send, recv time.Duration, err error) {
35+
var (
36+
start time.Time
37+
res = make(chan time.Duration, 1)
38+
)
39+
40+
msg := []byte(uuid.New().String())
41+
lm.logger.Debug(ctx, "performing measurement", slog.F("msg", msg))
42+
43+
cancel, err := p.Subscribe(lm.latencyChannelName(), func(ctx context.Context, in []byte) {
44+
if !bytes.Equal(in, msg) {
45+
lm.logger.Warn(ctx, "received unexpected message", slog.F("got", in), slog.F("expected", msg))
46+
return
47+
}
48+
49+
res <- time.Since(start)
50+
})
51+
if err != nil {
52+
return -1, -1, xerrors.Errorf("failed to subscribe: %w", err)
53+
}
54+
defer cancel()
55+
56+
start = time.Now()
57+
err = p.Publish(lm.latencyChannelName(), msg)
58+
if err != nil {
59+
return -1, -1, xerrors.Errorf("failed to publish: %w", err)
60+
}
61+
62+
send = time.Since(start)
63+
select {
64+
case <-ctx.Done():
65+
lm.logger.Error(ctx, "context canceled before message could be received", slog.Error(ctx.Err()), slog.F("msg", msg))
66+
return send, -1, ctx.Err()
67+
case recv = <-res:
68+
return send, recv, nil
69+
}
70+
}
71+
72+
func (lm *LatencyMeasurer) latencyChannelName() string {
73+
return fmt.Sprintf("latency-measure:%s", lm.channel)
74+
}

coderd/database/pubsub/pubsub.go

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"net"
99
"sync"
10+
"sync/atomic"
1011
"time"
1112

1213
"github.com/google/uuid"
@@ -28,6 +29,9 @@ type ListenerWithErr func(ctx context.Context, message []byte, err error)
2829
// might have been dropped.
2930
var ErrDroppedMessages = xerrors.New("dropped messages")
3031

32+
// LatencyMeasureTimeout defines how often to trigger a new background latency measurement.
33+
const LatencyMeasureTimeout = time.Second * 10
34+
3135
// Pubsub is a generic interface for broadcasting and receiving messages.
3236
// Implementors should assume high-availability with the backing implementation.
3337
type Pubsub interface {
@@ -205,6 +209,10 @@ type PGPubsub struct {
205209
receivedBytesTotal prometheus.Counter
206210
disconnectionsTotal prometheus.Counter
207211
connected prometheus.Gauge
212+
213+
latencyMeasurer *LatencyMeasurer
214+
latencyMeasureCounter atomic.Int64
215+
latencyErrCounter atomic.Int64
208216
}
209217

210218
// BufferSize is the maximum number of unhandled messages we will buffer
@@ -478,6 +486,30 @@ var (
478486
)
479487
)
480488

489+
// additional metrics collected out-of-band
490+
var (
491+
pubsubSendLatencyDesc = prometheus.NewDesc(
492+
"coder_pubsub_send_latency_seconds",
493+
"The time taken to send a message into a pubsub event channel",
494+
nil, nil,
495+
)
496+
pubsubRecvLatencyDesc = prometheus.NewDesc(
497+
"coder_pubsub_receive_latency_seconds",
498+
"The time taken to receive a message from a pubsub event channel",
499+
nil, nil,
500+
)
501+
pubsubLatencyMeasureCountDesc = prometheus.NewDesc(
502+
"coder_pubsub_latency_measures_total",
503+
"The number of pubsub latency measurements",
504+
nil, nil,
505+
)
506+
pubsubLatencyMeasureErrDesc = prometheus.NewDesc(
507+
"coder_pubsub_latency_measure_errs_total",
508+
"The number of pubsub latency measurement failures",
509+
nil, nil,
510+
)
511+
)
512+
481513
// We'll track messages as size "normal" and "colossal", where the
482514
// latter are messages larger than 7600 bytes, or 95% of the postgres
483515
// notify limit. If we see a lot of colossal packets that's an indication that
@@ -504,6 +536,12 @@ func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc) {
504536
// implicit metrics
505537
descs <- currentSubscribersDesc
506538
descs <- currentEventsDesc
539+
540+
// additional metrics
541+
descs <- pubsubSendLatencyDesc
542+
descs <- pubsubRecvLatencyDesc
543+
descs <- pubsubLatencyMeasureCountDesc
544+
descs <- pubsubLatencyMeasureErrDesc
507545
}
508546

509547
// Collect implements, along with Describe, the prometheus.Collector interface
@@ -528,6 +566,20 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
528566
p.qMu.Unlock()
529567
metrics <- prometheus.MustNewConstMetric(currentSubscribersDesc, prometheus.GaugeValue, float64(subs))
530568
metrics <- prometheus.MustNewConstMetric(currentEventsDesc, prometheus.GaugeValue, float64(events))
569+
570+
// additional metrics
571+
ctx, cancel := context.WithTimeout(context.Background(), LatencyMeasureTimeout)
572+
defer cancel()
573+
send, recv, err := p.latencyMeasurer.Measure(ctx, p)
574+
575+
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureCountDesc, prometheus.CounterValue, float64(p.latencyMeasureCounter.Add(1)))
576+
if err != nil {
577+
p.logger.Warn(context.Background(), "failed to measure latency", slog.Error(err))
578+
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc, prometheus.CounterValue, float64(p.latencyErrCounter.Add(1)))
579+
return
580+
}
581+
metrics <- prometheus.MustNewConstMetric(pubsubSendLatencyDesc, prometheus.GaugeValue, send.Seconds())
582+
metrics <- prometheus.MustNewConstMetric(pubsubRecvLatencyDesc, prometheus.GaugeValue, recv.Seconds())
531583
}
532584

533585
// New creates a new Pubsub implementation using a PostgreSQL connection.
@@ -544,10 +596,11 @@ func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connect
544596
// newWithoutListener creates a new PGPubsub without creating the pqListener.
545597
func newWithoutListener(logger slog.Logger, database *sql.DB) *PGPubsub {
546598
return &PGPubsub{
547-
logger: logger,
548-
listenDone: make(chan struct{}),
549-
db: database,
550-
queues: make(map[string]map[uuid.UUID]*msgQueue),
599+
logger: logger,
600+
listenDone: make(chan struct{}),
601+
db: database,
602+
queues: make(map[string]map[uuid.UUID]*msgQueue),
603+
latencyMeasurer: NewLatencyMeasurer(logger.Named("latency-measurer")),
551604

552605
publishesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
553606
Namespace: "coder",

coderd/database/pubsub/pubsub_linux_test.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package pubsub_test
44

55
import (
6+
"bytes"
67
"context"
78
"database/sql"
89
"fmt"
@@ -15,6 +16,8 @@ import (
1516
"github.com/stretchr/testify/require"
1617
"golang.org/x/xerrors"
1718

19+
"cdr.dev/slog/sloggers/sloghuman"
20+
1821
"cdr.dev/slog"
1922
"cdr.dev/slog/sloggers/slogtest"
2023
"github.com/coder/coder/v2/coderd/database/dbtestutil"
@@ -294,3 +297,111 @@ func TestPubsub_Disconnect(t *testing.T) {
294297
}
295298
require.True(t, gotDroppedErr)
296299
}
300+
301+
func TestMeasureLatency(t *testing.T) {
302+
t.Parallel()
303+
304+
newPubsub := func() (pubsub.Pubsub, func()) {
305+
ctx, cancel := context.WithCancel(context.Background())
306+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
307+
connectionURL, closePg, err := dbtestutil.Open()
308+
require.NoError(t, err)
309+
db, err := sql.Open("postgres", connectionURL)
310+
require.NoError(t, err)
311+
ps, err := pubsub.New(ctx, logger, db, connectionURL)
312+
require.NoError(t, err)
313+
314+
return ps, func() {
315+
_ = ps.Close()
316+
_ = db.Close()
317+
closePg()
318+
cancel()
319+
}
320+
}
321+
322+
t.Run("MeasureLatency", func(t *testing.T) {
323+
t.Parallel()
324+
325+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
326+
ps, done := newPubsub()
327+
defer done()
328+
329+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
330+
defer cancel()
331+
332+
send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
333+
require.NoError(t, err)
334+
require.Greater(t, send.Seconds(), 0.0)
335+
require.Greater(t, recv.Seconds(), 0.0)
336+
})
337+
338+
t.Run("MeasureLatencyRecvTimeout", func(t *testing.T) {
339+
t.Parallel()
340+
341+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
342+
ps, done := newPubsub()
343+
defer done()
344+
345+
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
346+
defer cancel()
347+
348+
send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
349+
require.ErrorContains(t, err, context.DeadlineExceeded.Error())
350+
require.Greater(t, send.Seconds(), 0.0)
351+
require.EqualValues(t, recv, time.Duration(-1))
352+
})
353+
354+
t.Run("MeasureLatencyNotifyRace", func(t *testing.T) {
355+
t.Parallel()
356+
357+
var buf bytes.Buffer
358+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
359+
logger = logger.AppendSinks(sloghuman.Sink(&buf))
360+
361+
lm := pubsub.NewLatencyMeasurer(logger)
362+
ps, done := newPubsub()
363+
defer done()
364+
365+
racy := newRacyPubsub(ps)
366+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
367+
defer cancel()
368+
369+
send, recv, err := lm.Measure(ctx, racy)
370+
assert.NoError(t, err)
371+
assert.Greater(t, send.Seconds(), 0.0)
372+
assert.Greater(t, recv.Seconds(), 0.0)
373+
374+
logger.Sync()
375+
assert.Contains(t, buf.String(), "received unexpected message")
376+
})
377+
}
378+
379+
// racyPubsub simulates a race on the same channel by publishing two messages (one expected, one not).
380+
// This is used to verify that a subscriber will only listen for the message it explicitly expects.
381+
type racyPubsub struct {
382+
pubsub.Pubsub
383+
}
384+
385+
func newRacyPubsub(ps pubsub.Pubsub) *racyPubsub {
386+
return &racyPubsub{ps}
387+
}
388+
389+
func (s *racyPubsub) Subscribe(event string, listener pubsub.Listener) (cancel func(), err error) {
390+
return s.Pubsub.Subscribe(event, listener)
391+
}
392+
393+
func (s *racyPubsub) SubscribeWithErr(event string, listener pubsub.ListenerWithErr) (cancel func(), err error) {
394+
return s.Pubsub.SubscribeWithErr(event, listener)
395+
}
396+
397+
func (s *racyPubsub) Publish(event string, message []byte) error {
398+
err := s.Pubsub.Publish(event, []byte("nonsense"))
399+
if err != nil {
400+
return xerrors.Errorf("failed to send simulated race: %w", err)
401+
}
402+
return s.Pubsub.Publish(event, message)
403+
}
404+
405+
func (s *racyPubsub) Close() error {
406+
return s.Pubsub.Close()
407+
}

coderd/database/pubsub/pubsub_test.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ func TestPGPubsub_Metrics(t *testing.T) {
3939
err = registry.Register(uut)
4040
require.NoError(t, err)
4141

42+
// each Gather measures pubsub latency by publishing a message & subscribing to it
43+
var gatherCount float64
44+
4245
metrics, err := registry.Gather()
46+
gatherCount++
4347
require.NoError(t, err)
4448
require.True(t, testutil.PromGaugeHasValue(t, metrics, 0, "coder_pubsub_current_events"))
4549
require.True(t, testutil.PromGaugeHasValue(t, metrics, 0, "coder_pubsub_current_subscribers"))
@@ -59,19 +63,26 @@ func TestPGPubsub_Metrics(t *testing.T) {
5963
_ = testutil.RequireRecvCtx(ctx, t, messageChannel)
6064

6165
require.Eventually(t, func() bool {
66+
latencyBytes := gatherCount * pubsub.LatencyMessageLength
6267
metrics, err = registry.Gather()
68+
gatherCount++
6369
assert.NoError(t, err)
6470
return testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_events") &&
6571
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_subscribers") &&
6672
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_connected") &&
67-
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_publishes_total", "true") &&
68-
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_subscribes_total", "true") &&
69-
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "normal") &&
70-
testutil.PromCounterHasValue(t, metrics, 7, "coder_pubsub_received_bytes_total") &&
71-
testutil.PromCounterHasValue(t, metrics, 7, "coder_pubsub_published_bytes_total")
73+
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_publishes_total", "true") &&
74+
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_subscribes_total", "true") &&
75+
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_messages_total", "normal") &&
76+
testutil.PromCounterHasValue(t, metrics, float64(len(data))+latencyBytes, "coder_pubsub_received_bytes_total") &&
77+
testutil.PromCounterHasValue(t, metrics, float64(len(data))+latencyBytes, "coder_pubsub_published_bytes_total") &&
78+
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_send_latency_seconds") &&
79+
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_receive_latency_seconds") &&
80+
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_latency_measures_total") &&
81+
!testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total")
7282
}, testutil.WaitShort, testutil.IntervalFast)
7383

74-
colossalData := make([]byte, 7600)
84+
colossalSize := 7600
85+
colossalData := make([]byte, colossalSize)
7586
for i := range colossalData {
7687
colossalData[i] = 'q'
7788
}
@@ -89,16 +100,22 @@ func TestPGPubsub_Metrics(t *testing.T) {
89100
_ = testutil.RequireRecvCtx(ctx, t, messageChannel)
90101

91102
require.Eventually(t, func() bool {
103+
latencyBytes := gatherCount * pubsub.LatencyMessageLength
92104
metrics, err = registry.Gather()
105+
gatherCount++
93106
assert.NoError(t, err)
94107
return testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_events") &&
95108
testutil.PromGaugeHasValue(t, metrics, 2, "coder_pubsub_current_subscribers") &&
96109
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_connected") &&
97-
testutil.PromCounterHasValue(t, metrics, 2, "coder_pubsub_publishes_total", "true") &&
98-
testutil.PromCounterHasValue(t, metrics, 2, "coder_pubsub_subscribes_total", "true") &&
99-
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "normal") &&
110+
testutil.PromCounterHasValue(t, metrics, 1+gatherCount, "coder_pubsub_publishes_total", "true") &&
111+
testutil.PromCounterHasValue(t, metrics, 1+gatherCount, "coder_pubsub_subscribes_total", "true") &&
112+
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_messages_total", "normal") &&
100113
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "colossal") &&
101-
testutil.PromCounterHasValue(t, metrics, 7607, "coder_pubsub_received_bytes_total") &&
102-
testutil.PromCounterHasValue(t, metrics, 7607, "coder_pubsub_published_bytes_total")
114+
testutil.PromCounterHasValue(t, metrics, float64(colossalSize+len(data))+latencyBytes, "coder_pubsub_received_bytes_total") &&
115+
testutil.PromCounterHasValue(t, metrics, float64(colossalSize+len(data))+latencyBytes, "coder_pubsub_published_bytes_total") &&
116+
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_send_latency_seconds") &&
117+
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_receive_latency_seconds") &&
118+
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_latency_measures_total") &&
119+
!testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total")
103120
}, testutil.WaitShort, testutil.IntervalFast)
104121
}

0 commit comments

Comments
 (0)