Skip to content

Commit 899ec80

Browse files
committed
feat: add metrics to PGPubsub
1 parent cc0dc10 commit 899ec80

File tree

4 files changed

+586
-327
lines changed

4 files changed

+586
-327
lines changed

cli/server.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -673,10 +673,14 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
673673
}()
674674

675675
options.Database = database.New(sqlDB)
676-
options.Pubsub, err = pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL)
676+
ps, err := pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL)
677677
if err != nil {
678678
return xerrors.Errorf("create pubsub: %w", err)
679679
}
680+
options.Pubsub = ps
681+
if options.DeploymentValues.Prometheus.Enable {
682+
options.PrometheusRegistry.MustRegister(ps)
683+
}
680684
defer options.Pubsub.Close()
681685
}
682686

coderd/database/pubsub/pubsub.go

+141-26
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/google/uuid"
1111
"github.com/lib/pq"
12+
"github.com/prometheus/client_golang/prometheus"
1213
"golang.org/x/xerrors"
1314

1415
"cdr.dev/slog"
@@ -162,8 +163,8 @@ func (q *msgQueue) dropped() {
162163
q.cond.Broadcast()
163164
}
164165

165-
// Pubsub implementation using PostgreSQL.
166-
type pgPubsub struct {
166+
// PGPubsub is a pubsub implementation using PostgreSQL.
167+
type PGPubsub struct {
167168
ctx context.Context
168169
cancel context.CancelFunc
169170
logger slog.Logger
@@ -174,29 +175,38 @@ type pgPubsub struct {
174175
queues map[string]map[uuid.UUID]*msgQueue
175176
closedListener bool
176177
closeListenerErr error
178+
179+
publishesTotal *prometheus.CounterVec
180+
subscribesTotal *prometheus.CounterVec
181+
messagesTotal *prometheus.CounterVec
182+
disconnectionsTotal prometheus.Counter
183+
connected prometheus.Gauge
177184
}
178185

179186
// BufferSize is the maximum number of unhandled messages we will buffer
180187
// for a subscriber before dropping messages.
181188
const BufferSize = 2048
182189

183190
// Subscribe calls the listener when an event matching the name is received.
184-
func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
191+
func (p *PGPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
185192
return p.subscribeQueue(event, newMsgQueue(p.ctx, listener, nil))
186193
}
187194

188-
func (p *pgPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error) {
195+
func (p *PGPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error) {
189196
return p.subscribeQueue(event, newMsgQueue(p.ctx, nil, listener))
190197
}
191198

192-
func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), err error) {
199+
func (p *PGPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), err error) {
193200
p.mut.Lock()
194201
defer p.mut.Unlock()
195202
defer func() {
196203
if err != nil {
197204
// if we hit an error, we need to close the queue so we don't
198205
// leak its goroutine.
199206
newQ.close()
207+
p.subscribesTotal.WithLabelValues("false").Inc()
208+
} else {
209+
p.subscribesTotal.WithLabelValues("true").Inc()
200210
}
201211
}()
202212

@@ -239,20 +249,22 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
239249
}, nil
240250
}
241251

242-
func (p *pgPubsub) Publish(event string, message []byte) error {
252+
func (p *PGPubsub) Publish(event string, message []byte) error {
243253
p.logger.Debug(p.ctx, "publish", slog.F("event", event), slog.F("message_len", len(message)))
244254
// This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't
245255
// support the first parameter being a prepared statement.
246256
//nolint:gosec
247257
_, err := p.db.ExecContext(p.ctx, `select pg_notify(`+pq.QuoteLiteral(event)+`, $1)`, message)
248258
if err != nil {
259+
p.publishesTotal.WithLabelValues("false").Inc()
249260
return xerrors.Errorf("exec pg_notify: %w", err)
250261
}
262+
p.publishesTotal.WithLabelValues("true").Inc()
251263
return nil
252264
}
253265

254266
// Close closes the pubsub instance.
255-
func (p *pgPubsub) Close() error {
267+
func (p *PGPubsub) Close() error {
256268
p.logger.Info(p.ctx, "pubsub is closing")
257269
p.cancel()
258270
err := p.closeListener()
@@ -262,7 +274,7 @@ func (p *pgPubsub) Close() error {
262274
}
263275

264276
// closeListener closes the pgListener, unless it has already been closed.
265-
func (p *pgPubsub) closeListener() error {
277+
func (p *PGPubsub) closeListener() error {
266278
p.mut.Lock()
267279
defer p.mut.Unlock()
268280
if p.closedListener {
@@ -274,7 +286,7 @@ func (p *pgPubsub) closeListener() error {
274286
}
275287

276288
// listen begins receiving messages on the pq listener.
277-
func (p *pgPubsub) listen() {
289+
func (p *PGPubsub) listen() {
278290
defer func() {
279291
p.logger.Info(p.ctx, "pubsub listen stopped receiving notify")
280292
cErr := p.closeListener()
@@ -307,7 +319,13 @@ func (p *pgPubsub) listen() {
307319
}
308320
}
309321

310-
func (p *pgPubsub) listenReceive(notif *pq.Notification) {
322+
func (p *PGPubsub) listenReceive(notif *pq.Notification) {
323+
sizeLabel := messageSizeNormal
324+
if len(notif.Extra) >= colossalThreshold {
325+
sizeLabel = messageSizeColossal
326+
}
327+
p.messagesTotal.WithLabelValues(sizeLabel).Inc()
328+
311329
p.mut.Lock()
312330
defer p.mut.Unlock()
313331
queues, ok := p.queues[notif.Channel]
@@ -320,7 +338,7 @@ func (p *pgPubsub) listenReceive(notif *pq.Notification) {
320338
}
321339
}
322340

323-
func (p *pgPubsub) recordReconnect() {
341+
func (p *PGPubsub) recordReconnect() {
324342
p.mut.Lock()
325343
defer p.mut.Unlock()
326344
for _, listeners := range p.queues {
@@ -330,20 +348,23 @@ func (p *pgPubsub) recordReconnect() {
330348
}
331349
}
332350

333-
// New creates a new Pubsub implementation using a PostgreSQL connection.
334-
func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (Pubsub, error) {
351+
func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error {
352+
p.connected.Set(0)
335353
// Creates a new listener using pq.
336354
errCh := make(chan error)
337-
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) {
355+
p.pgListener = pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) {
338356
switch t {
339357
case pq.ListenerEventConnected:
340-
logger.Info(ctx, "pubsub connected to postgres")
358+
p.logger.Info(ctx, "pubsub connected to postgres")
359+
p.connected.Set(1.0)
341360
case pq.ListenerEventDisconnected:
342-
logger.Error(ctx, "pubsub disconnected from postgres", slog.Error(err))
361+
p.logger.Error(ctx, "pubsub disconnected from postgres", slog.Error(err))
362+
p.connected.Set(0)
343363
case pq.ListenerEventReconnected:
344-
logger.Info(ctx, "pubsub reconnected to postgres")
364+
p.logger.Info(ctx, "pubsub reconnected to postgres")
365+
p.connected.Set(1)
345366
case pq.ListenerEventConnectionAttemptFailed:
346-
logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err))
367+
p.logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err))
347368
}
348369
// This callback gets events whenever the connection state changes.
349370
// Don't send if the errChannel has already been closed.
@@ -358,26 +379,120 @@ func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL s
358379
select {
359380
case err := <-errCh:
360381
if err != nil {
361-
_ = listener.Close()
362-
return nil, xerrors.Errorf("create pq listener: %w", err)
382+
_ = p.pgListener.Close()
383+
return xerrors.Errorf("create pq listener: %w", err)
363384
}
364385
case <-ctx.Done():
365-
_ = listener.Close()
366-
return nil, ctx.Err()
386+
_ = p.pgListener.Close()
387+
return ctx.Err()
367388
}
389+
return nil
390+
}
368391

392+
// these are the metrics we compute implicitly from our existing data structures
393+
var (
394+
currentSubscribersDesc = prometheus.NewDesc(
395+
"coder_pubsub_current_subscribers",
396+
"The current number of active pubsub subscribers",
397+
nil, nil,
398+
)
399+
currentEventsDesc = prometheus.NewDesc(
400+
"coder_pubsub_current_events",
401+
"The current number of pubsub event channels listened for",
402+
nil, nil,
403+
)
404+
)
405+
406+
// We'll track messages as size "normal" and "colossal", where the
407+
// latter are messages larger than 7600 bytes, or 95% of the postgres
408+
// notify limit. If we see a lot of colossal packets that's an indication that
409+
// we might be trying to send too much data over the pubsub and are in danger of
410+
// failing to publish.
411+
const (
412+
colossalThreshold = 7600
413+
messageSizeNormal = "normal"
414+
messageSizeColossal = "colossal"
415+
)
416+
417+
// Describe implements, along with Collect, the prometheus.Collector interface
418+
// for metrics.
419+
func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc) {
420+
// explicit metrics
421+
p.publishesTotal.Describe(descs)
422+
p.subscribesTotal.Describe(descs)
423+
p.messagesTotal.Describe(descs)
424+
p.disconnectionsTotal.Describe(descs)
425+
p.connected.Describe(descs)
426+
427+
// implicit metrics
428+
descs <- currentSubscribersDesc
429+
descs <- currentEventsDesc
430+
}
431+
432+
// Collect implements, along with Describe, the prometheus.Collector interface
433+
// for metrics
434+
func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
435+
// explicit metrics
436+
p.publishesTotal.Collect(metrics)
437+
p.subscribesTotal.Collect(metrics)
438+
p.messagesTotal.Collect(metrics)
439+
p.disconnectionsTotal.Collect(metrics)
440+
p.connected.Collect(metrics)
441+
442+
// implicit metrics
443+
p.mut.Lock()
444+
events := len(p.queues)
445+
subs := 0
446+
for _, subscriberMap := range p.queues {
447+
subs += len(subscriberMap)
448+
}
449+
p.mut.Unlock()
450+
metrics <- prometheus.MustNewConstMetric(currentSubscribersDesc, prometheus.GaugeValue, float64(subs))
451+
metrics <- prometheus.MustNewConstMetric(currentEventsDesc, prometheus.GaugeValue, float64(events))
452+
}
453+
454+
// New creates a new Pubsub implementation using a PostgreSQL connection.
455+
func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (*PGPubsub, error) {
369456
// Start a new context that will be canceled when the pubsub is closed.
370457
ctx, cancel := context.WithCancel(context.Background())
371-
pgPubsub := &pgPubsub{
458+
p := &PGPubsub{
372459
ctx: ctx,
373460
cancel: cancel,
374461
logger: logger,
375462
listenDone: make(chan struct{}),
376463
db: database,
377-
pgListener: listener,
378464
queues: make(map[string]map[uuid.UUID]*msgQueue),
465+
466+
publishesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
467+
Namespace: "coder",
468+
Subsystem: "pubsub",
469+
Name: "publishes_total",
470+
}, []string{"success"}),
471+
subscribesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
472+
Namespace: "coder",
473+
Subsystem: "pubsub",
474+
Name: "subscribes_total",
475+
}, []string{"success"}),
476+
messagesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
477+
Namespace: "coder",
478+
Subsystem: "pubsub",
479+
Name: "messages_total",
480+
}, []string{"size"}),
481+
disconnectionsTotal: prometheus.NewCounter(prometheus.CounterOpts{
482+
Namespace: "coder",
483+
Subsystem: "pubsub",
484+
Name: "disconnections_total",
485+
}),
486+
connected: prometheus.NewGauge(prometheus.GaugeOpts{
487+
Namespace: "coder",
488+
Subsystem: "pubsub",
489+
Name: "connected",
490+
}),
491+
}
492+
if err := p.startListener(startCtx, connectURL); err != nil {
493+
return nil, err
379494
}
380-
go pgPubsub.listen()
495+
go p.listen()
381496
logger.Info(ctx, "pubsub has started")
382-
return pgPubsub, nil
497+
return p, nil
383498
}

0 commit comments

Comments
 (0)