Skip to content

Commit ff526cc

Browse files
committed
feat: add metrics to PGPubsub
1 parent e748312 commit ff526cc

File tree

4 files changed

+615
-327
lines changed

4 files changed

+615
-327
lines changed

cli/server.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -673,10 +673,14 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
673673
}()
674674

675675
options.Database = database.New(sqlDB)
676-
options.Pubsub, err = pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL)
676+
ps, err := pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL)
677677
if err != nil {
678678
return xerrors.Errorf("create pubsub: %w", err)
679679
}
680+
options.Pubsub = ps
681+
if options.DeploymentValues.Prometheus.Enable {
682+
options.PrometheusRegistry.MustRegister(ps)
683+
}
680684
defer options.Pubsub.Close()
681685
}
682686

coderd/database/pubsub/pubsub.go

+166-26
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/google/uuid"
1111
"github.com/lib/pq"
12+
"github.com/prometheus/client_golang/prometheus"
1213
"golang.org/x/xerrors"
1314

1415
"cdr.dev/slog"
@@ -162,8 +163,8 @@ func (q *msgQueue) dropped() {
162163
q.cond.Broadcast()
163164
}
164165

165-
// Pubsub implementation using PostgreSQL.
166-
type pgPubsub struct {
166+
// PGPubsub is a pubsub implementation using PostgreSQL.
167+
type PGPubsub struct {
167168
ctx context.Context
168169
cancel context.CancelFunc
169170
logger slog.Logger
@@ -174,29 +175,40 @@ type pgPubsub struct {
174175
queues map[string]map[uuid.UUID]*msgQueue
175176
closedListener bool
176177
closeListenerErr error
178+
179+
publishesTotal *prometheus.CounterVec
180+
subscribesTotal *prometheus.CounterVec
181+
messagesTotal *prometheus.CounterVec
182+
publishedBytesTotal prometheus.Counter
183+
receivedBytesTotal prometheus.Counter
184+
disconnectionsTotal prometheus.Counter
185+
connected prometheus.Gauge
177186
}
178187

179188
// BufferSize is the maximum number of unhandled messages we will buffer
180189
// for a subscriber before dropping messages.
181190
const BufferSize = 2048
182191

183192
// Subscribe calls the listener when an event matching the name is received.
184-
func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
193+
func (p *PGPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
185194
return p.subscribeQueue(event, newMsgQueue(p.ctx, listener, nil))
186195
}
187196

188-
func (p *pgPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error) {
197+
func (p *PGPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error) {
189198
return p.subscribeQueue(event, newMsgQueue(p.ctx, nil, listener))
190199
}
191200

192-
func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), err error) {
201+
func (p *PGPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), err error) {
193202
p.mut.Lock()
194203
defer p.mut.Unlock()
195204
defer func() {
196205
if err != nil {
197206
// if we hit an error, we need to close the queue so we don't
198207
// leak its goroutine.
199208
newQ.close()
209+
p.subscribesTotal.WithLabelValues("false").Inc()
210+
} else {
211+
p.subscribesTotal.WithLabelValues("true").Inc()
200212
}
201213
}()
202214

@@ -239,20 +251,23 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
239251
}, nil
240252
}
241253

242-
func (p *pgPubsub) Publish(event string, message []byte) error {
254+
func (p *PGPubsub) Publish(event string, message []byte) error {
243255
p.logger.Debug(p.ctx, "publish", slog.F("event", event), slog.F("message_len", len(message)))
244256
// This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't
245257
// support the first parameter being a prepared statement.
246258
//nolint:gosec
247259
_, err := p.db.ExecContext(p.ctx, `select pg_notify(`+pq.QuoteLiteral(event)+`, $1)`, message)
248260
if err != nil {
261+
p.publishesTotal.WithLabelValues("false").Inc()
249262
return xerrors.Errorf("exec pg_notify: %w", err)
250263
}
264+
p.publishesTotal.WithLabelValues("true").Inc()
265+
p.publishedBytesTotal.Add(float64(len(message)))
251266
return nil
252267
}
253268

254269
// Close closes the pubsub instance.
255-
func (p *pgPubsub) Close() error {
270+
func (p *PGPubsub) Close() error {
256271
p.logger.Info(p.ctx, "pubsub is closing")
257272
p.cancel()
258273
err := p.closeListener()
@@ -262,7 +277,7 @@ func (p *pgPubsub) Close() error {
262277
}
263278

264279
// closeListener closes the pgListener, unless it has already been closed.
265-
func (p *pgPubsub) closeListener() error {
280+
func (p *PGPubsub) closeListener() error {
266281
p.mut.Lock()
267282
defer p.mut.Unlock()
268283
if p.closedListener {
@@ -274,7 +289,7 @@ func (p *pgPubsub) closeListener() error {
274289
}
275290

276291
// listen begins receiving messages on the pq listener.
277-
func (p *pgPubsub) listen() {
292+
func (p *PGPubsub) listen() {
278293
defer func() {
279294
p.logger.Info(p.ctx, "pubsub listen stopped receiving notify")
280295
cErr := p.closeListener()
@@ -307,7 +322,14 @@ func (p *pgPubsub) listen() {
307322
}
308323
}
309324

310-
func (p *pgPubsub) listenReceive(notif *pq.Notification) {
325+
func (p *PGPubsub) listenReceive(notif *pq.Notification) {
326+
sizeLabel := messageSizeNormal
327+
if len(notif.Extra) >= colossalThreshold {
328+
sizeLabel = messageSizeColossal
329+
}
330+
p.messagesTotal.WithLabelValues(sizeLabel).Inc()
331+
p.receivedBytesTotal.Add(float64(len(notif.Extra)))
332+
311333
p.mut.Lock()
312334
defer p.mut.Unlock()
313335
queues, ok := p.queues[notif.Channel]
@@ -320,7 +342,7 @@ func (p *pgPubsub) listenReceive(notif *pq.Notification) {
320342
}
321343
}
322344

323-
func (p *pgPubsub) recordReconnect() {
345+
func (p *PGPubsub) recordReconnect() {
324346
p.mut.Lock()
325347
defer p.mut.Unlock()
326348
for _, listeners := range p.queues {
@@ -330,20 +352,23 @@ func (p *pgPubsub) recordReconnect() {
330352
}
331353
}
332354

333-
// New creates a new Pubsub implementation using a PostgreSQL connection.
334-
func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (Pubsub, error) {
355+
func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error {
356+
p.connected.Set(0)
335357
// Creates a new listener using pq.
336358
errCh := make(chan error)
337-
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) {
359+
p.pgListener = pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) {
338360
switch t {
339361
case pq.ListenerEventConnected:
340-
logger.Info(ctx, "pubsub connected to postgres")
362+
p.logger.Info(ctx, "pubsub connected to postgres")
363+
p.connected.Set(1.0)
341364
case pq.ListenerEventDisconnected:
342-
logger.Error(ctx, "pubsub disconnected from postgres", slog.Error(err))
365+
p.logger.Error(ctx, "pubsub disconnected from postgres", slog.Error(err))
366+
p.connected.Set(0)
343367
case pq.ListenerEventReconnected:
344-
logger.Info(ctx, "pubsub reconnected to postgres")
368+
p.logger.Info(ctx, "pubsub reconnected to postgres")
369+
p.connected.Set(1)
345370
case pq.ListenerEventConnectionAttemptFailed:
346-
logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err))
371+
p.logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err))
347372
}
348373
// This callback gets events whenever the connection state changes.
349374
// Don't send if the errChannel has already been closed.
@@ -358,26 +383,141 @@ func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL s
358383
select {
359384
case err := <-errCh:
360385
if err != nil {
361-
_ = listener.Close()
362-
return nil, xerrors.Errorf("create pq listener: %w", err)
386+
_ = p.pgListener.Close()
387+
return xerrors.Errorf("create pq listener: %w", err)
363388
}
364389
case <-ctx.Done():
365-
_ = listener.Close()
366-
return nil, ctx.Err()
390+
_ = p.pgListener.Close()
391+
return ctx.Err()
367392
}
393+
return nil
394+
}
368395

396+
// these are the metrics we compute implicitly from our existing data structures
397+
var (
398+
currentSubscribersDesc = prometheus.NewDesc(
399+
"coder_pubsub_current_subscribers",
400+
"The current number of active pubsub subscribers",
401+
nil, nil,
402+
)
403+
currentEventsDesc = prometheus.NewDesc(
404+
"coder_pubsub_current_events",
405+
"The current number of pubsub event channels listened for",
406+
nil, nil,
407+
)
408+
)
409+
410+
// We'll track messages as size "normal" and "colossal", where the
411+
// latter are messages larger than 7600 bytes, or 95% of the postgres
412+
// notify limit. If we see a lot of colossal packets that's an indication that
413+
// we might be trying to send too much data over the pubsub and are in danger of
414+
// failing to publish.
415+
const (
416+
colossalThreshold = 7600
417+
messageSizeNormal = "normal"
418+
messageSizeColossal = "colossal"
419+
)
420+
421+
// Describe implements, along with Collect, the prometheus.Collector interface
422+
// for metrics.
423+
func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc) {
424+
// explicit metrics
425+
p.publishesTotal.Describe(descs)
426+
p.subscribesTotal.Describe(descs)
427+
p.messagesTotal.Describe(descs)
428+
p.publishedBytesTotal.Describe(descs)
429+
p.receivedBytesTotal.Describe(descs)
430+
p.disconnectionsTotal.Describe(descs)
431+
p.connected.Describe(descs)
432+
433+
// implicit metrics
434+
descs <- currentSubscribersDesc
435+
descs <- currentEventsDesc
436+
}
437+
438+
// Collect implements, along with Describe, the prometheus.Collector interface
439+
// for metrics
440+
func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
441+
// explicit metrics
442+
p.publishesTotal.Collect(metrics)
443+
p.subscribesTotal.Collect(metrics)
444+
p.messagesTotal.Collect(metrics)
445+
p.publishedBytesTotal.Collect(metrics)
446+
p.receivedBytesTotal.Collect(metrics)
447+
p.disconnectionsTotal.Collect(metrics)
448+
p.connected.Collect(metrics)
449+
450+
// implicit metrics
451+
p.mut.Lock()
452+
events := len(p.queues)
453+
subs := 0
454+
for _, subscriberMap := range p.queues {
455+
subs += len(subscriberMap)
456+
}
457+
p.mut.Unlock()
458+
metrics <- prometheus.MustNewConstMetric(currentSubscribersDesc, prometheus.GaugeValue, float64(subs))
459+
metrics <- prometheus.MustNewConstMetric(currentEventsDesc, prometheus.GaugeValue, float64(events))
460+
}
461+
462+
// New creates a new Pubsub implementation using a PostgreSQL connection.
463+
func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (*PGPubsub, error) {
369464
// Start a new context that will be canceled when the pubsub is closed.
370465
ctx, cancel := context.WithCancel(context.Background())
371-
pgPubsub := &pgPubsub{
466+
p := &PGPubsub{
372467
ctx: ctx,
373468
cancel: cancel,
374469
logger: logger,
375470
listenDone: make(chan struct{}),
376471
db: database,
377-
pgListener: listener,
378472
queues: make(map[string]map[uuid.UUID]*msgQueue),
473+
474+
publishesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
475+
Namespace: "coder",
476+
Subsystem: "pubsub",
477+
Name: "publishes_total",
478+
Help: "Total number of calls to Publish",
479+
}, []string{"success"}),
480+
subscribesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
481+
Namespace: "coder",
482+
Subsystem: "pubsub",
483+
Name: "subscribes_total",
484+
Help: "Total number of calls to Subscribe/SubscribeWithErr",
485+
}, []string{"success"}),
486+
messagesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
487+
Namespace: "coder",
488+
Subsystem: "pubsub",
489+
Name: "messages_total",
490+
Help: "Total number of messages received from postgres",
491+
}, []string{"size"}),
492+
publishedBytesTotal: prometheus.NewCounter(prometheus.CounterOpts{
493+
Namespace: "coder",
494+
Subsystem: "pubsub",
495+
Name: "published_bytes_total",
496+
Help: "Total number of bytes successfully published across all publishes",
497+
}),
498+
receivedBytesTotal: prometheus.NewCounter(prometheus.CounterOpts{
499+
Namespace: "coder",
500+
Subsystem: "pubsub",
501+
Name: "received_bytes_total",
502+
Help: "Total number of bytes received across all messages",
503+
}),
504+
disconnectionsTotal: prometheus.NewCounter(prometheus.CounterOpts{
505+
Namespace: "coder",
506+
Subsystem: "pubsub",
507+
Name: "disconnections_total",
508+
Help: "Total number of times we disconnected unexpectedly from postgres",
509+
}),
510+
connected: prometheus.NewGauge(prometheus.GaugeOpts{
511+
Namespace: "coder",
512+
Subsystem: "pubsub",
513+
Name: "connected",
514+
Help: "Whether we are connected (1) or not connected (0) to postgres",
515+
}),
516+
}
517+
if err := p.startListener(startCtx, connectURL); err != nil {
518+
return nil, err
379519
}
380-
go pgPubsub.listen()
520+
go p.listen()
381521
logger.Info(ctx, "pubsub has started")
382-
return pgPubsub, nil
522+
return p, nil
383523
}

0 commit comments

Comments
 (0)