9
9
10
10
"github.com/google/uuid"
11
11
"github.com/lib/pq"
12
+ "github.com/prometheus/client_golang/prometheus"
12
13
"golang.org/x/xerrors"
13
14
14
15
"cdr.dev/slog"
@@ -162,8 +163,8 @@ func (q *msgQueue) dropped() {
162
163
q .cond .Broadcast ()
163
164
}
164
165
165
- // Pubsub implementation using PostgreSQL.
166
- type pgPubsub struct {
166
+ // PGPubsub is a pubsub implementation using PostgreSQL.
167
+ type PGPubsub struct {
167
168
ctx context.Context
168
169
cancel context.CancelFunc
169
170
logger slog.Logger
@@ -174,29 +175,40 @@ type pgPubsub struct {
174
175
queues map [string ]map [uuid.UUID ]* msgQueue
175
176
closedListener bool
176
177
closeListenerErr error
178
+
179
+ publishesTotal * prometheus.CounterVec
180
+ subscribesTotal * prometheus.CounterVec
181
+ messagesTotal * prometheus.CounterVec
182
+ publishedBytesTotal prometheus.Counter
183
+ receivedBytesTotal prometheus.Counter
184
+ disconnectionsTotal prometheus.Counter
185
+ connected prometheus.Gauge
177
186
}
178
187
179
188
// BufferSize is the maximum number of unhandled messages we will buffer
180
189
// for a subscriber before dropping messages.
181
190
const BufferSize = 2048
182
191
183
192
// Subscribe calls the listener when an event matching the name is received.
184
- func (p * pgPubsub ) Subscribe (event string , listener Listener ) (cancel func (), err error ) {
193
+ func (p * PGPubsub ) Subscribe (event string , listener Listener ) (cancel func (), err error ) {
185
194
return p .subscribeQueue (event , newMsgQueue (p .ctx , listener , nil ))
186
195
}
187
196
188
- func (p * pgPubsub ) SubscribeWithErr (event string , listener ListenerWithErr ) (cancel func (), err error ) {
197
+ func (p * PGPubsub ) SubscribeWithErr (event string , listener ListenerWithErr ) (cancel func (), err error ) {
189
198
return p .subscribeQueue (event , newMsgQueue (p .ctx , nil , listener ))
190
199
}
191
200
192
- func (p * pgPubsub ) subscribeQueue (event string , newQ * msgQueue ) (cancel func (), err error ) {
201
+ func (p * PGPubsub ) subscribeQueue (event string , newQ * msgQueue ) (cancel func (), err error ) {
193
202
p .mut .Lock ()
194
203
defer p .mut .Unlock ()
195
204
defer func () {
196
205
if err != nil {
197
206
// if we hit an error, we need to close the queue so we don't
198
207
// leak its goroutine.
199
208
newQ .close ()
209
+ p .subscribesTotal .WithLabelValues ("false" ).Inc ()
210
+ } else {
211
+ p .subscribesTotal .WithLabelValues ("true" ).Inc ()
200
212
}
201
213
}()
202
214
@@ -239,20 +251,23 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
239
251
}, nil
240
252
}
241
253
242
- func (p * pgPubsub ) Publish (event string , message []byte ) error {
254
+ func (p * PGPubsub ) Publish (event string , message []byte ) error {
243
255
p .logger .Debug (p .ctx , "publish" , slog .F ("event" , event ), slog .F ("message_len" , len (message )))
244
256
// This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't
245
257
// support the first parameter being a prepared statement.
246
258
//nolint:gosec
247
259
_ , err := p .db .ExecContext (p .ctx , `select pg_notify(` + pq .QuoteLiteral (event )+ `, $1)` , message )
248
260
if err != nil {
261
+ p .publishesTotal .WithLabelValues ("false" ).Inc ()
249
262
return xerrors .Errorf ("exec pg_notify: %w" , err )
250
263
}
264
+ p .publishesTotal .WithLabelValues ("true" ).Inc ()
265
+ p .publishedBytesTotal .Add (float64 (len (message )))
251
266
return nil
252
267
}
253
268
254
269
// Close closes the pubsub instance.
255
- func (p * pgPubsub ) Close () error {
270
+ func (p * PGPubsub ) Close () error {
256
271
p .logger .Info (p .ctx , "pubsub is closing" )
257
272
p .cancel ()
258
273
err := p .closeListener ()
@@ -262,7 +277,7 @@ func (p *pgPubsub) Close() error {
262
277
}
263
278
264
279
// closeListener closes the pgListener, unless it has already been closed.
265
- func (p * pgPubsub ) closeListener () error {
280
+ func (p * PGPubsub ) closeListener () error {
266
281
p .mut .Lock ()
267
282
defer p .mut .Unlock ()
268
283
if p .closedListener {
@@ -274,7 +289,7 @@ func (p *pgPubsub) closeListener() error {
274
289
}
275
290
276
291
// listen begins receiving messages on the pq listener.
277
- func (p * pgPubsub ) listen () {
292
+ func (p * PGPubsub ) listen () {
278
293
defer func () {
279
294
p .logger .Info (p .ctx , "pubsub listen stopped receiving notify" )
280
295
cErr := p .closeListener ()
@@ -307,7 +322,14 @@ func (p *pgPubsub) listen() {
307
322
}
308
323
}
309
324
310
- func (p * pgPubsub ) listenReceive (notif * pq.Notification ) {
325
+ func (p * PGPubsub ) listenReceive (notif * pq.Notification ) {
326
+ sizeLabel := messageSizeNormal
327
+ if len (notif .Extra ) >= colossalThreshold {
328
+ sizeLabel = messageSizeColossal
329
+ }
330
+ p .messagesTotal .WithLabelValues (sizeLabel ).Inc ()
331
+ p .receivedBytesTotal .Add (float64 (len (notif .Extra )))
332
+
311
333
p .mut .Lock ()
312
334
defer p .mut .Unlock ()
313
335
queues , ok := p .queues [notif .Channel ]
@@ -320,7 +342,7 @@ func (p *pgPubsub) listenReceive(notif *pq.Notification) {
320
342
}
321
343
}
322
344
323
- func (p * pgPubsub ) recordReconnect () {
345
+ func (p * PGPubsub ) recordReconnect () {
324
346
p .mut .Lock ()
325
347
defer p .mut .Unlock ()
326
348
for _ , listeners := range p .queues {
@@ -330,20 +352,23 @@ func (p *pgPubsub) recordReconnect() {
330
352
}
331
353
}
332
354
333
- // New creates a new Pubsub implementation using a PostgreSQL connection.
334
- func New ( ctx context. Context , logger slog. Logger , database * sql. DB , connectURL string ) ( Pubsub , error ) {
355
+ func ( p * PGPubsub ) startListener ( ctx context. Context , connectURL string ) error {
356
+ p . connected . Set ( 0 )
335
357
// Creates a new listener using pq.
336
358
errCh := make (chan error )
337
- listener : = pq .NewListener (connectURL , time .Second , time .Minute , func (t pq.ListenerEventType , err error ) {
359
+ p . pgListener = pq .NewListener (connectURL , time .Second , time .Minute , func (t pq.ListenerEventType , err error ) {
338
360
switch t {
339
361
case pq .ListenerEventConnected :
340
- logger .Info (ctx , "pubsub connected to postgres" )
362
+ p .logger .Info (ctx , "pubsub connected to postgres" )
363
+ p .connected .Set (1.0 )
341
364
case pq .ListenerEventDisconnected :
342
- logger .Error (ctx , "pubsub disconnected from postgres" , slog .Error (err ))
365
+ p .logger .Error (ctx , "pubsub disconnected from postgres" , slog .Error (err ))
366
+ p .connected .Set (0 )
343
367
case pq .ListenerEventReconnected :
344
- logger .Info (ctx , "pubsub reconnected to postgres" )
368
+ p .logger .Info (ctx , "pubsub reconnected to postgres" )
369
+ p .connected .Set (1 )
345
370
case pq .ListenerEventConnectionAttemptFailed :
346
- logger .Error (ctx , "pubsub failed to connect to postgres" , slog .Error (err ))
371
+ p . logger .Error (ctx , "pubsub failed to connect to postgres" , slog .Error (err ))
347
372
}
348
373
// This callback gets events whenever the connection state changes.
349
374
// Don't send if the errChannel has already been closed.
@@ -358,26 +383,141 @@ func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL s
358
383
select {
359
384
case err := <- errCh :
360
385
if err != nil {
361
- _ = listener .Close ()
362
- return nil , xerrors .Errorf ("create pq listener: %w" , err )
386
+ _ = p . pgListener .Close ()
387
+ return xerrors .Errorf ("create pq listener: %w" , err )
363
388
}
364
389
case <- ctx .Done ():
365
- _ = listener .Close ()
366
- return nil , ctx .Err ()
390
+ _ = p . pgListener .Close ()
391
+ return ctx .Err ()
367
392
}
393
+ return nil
394
+ }
368
395
396
+ // these are the metrics we compute implicitly from our existing data structures
397
+ var (
398
+ currentSubscribersDesc = prometheus .NewDesc (
399
+ "coder_pubsub_current_subscribers" ,
400
+ "The current number of active pubsub subscribers" ,
401
+ nil , nil ,
402
+ )
403
+ currentEventsDesc = prometheus .NewDesc (
404
+ "coder_pubsub_current_events" ,
405
+ "The current number of pubsub event channels listened for" ,
406
+ nil , nil ,
407
+ )
408
+ )
409
+
410
+ // We'll track messages as size "normal" and "colossal", where the
411
+ // latter are messages larger than 7600 bytes, or 95% of the postgres
412
+ // notify limit. If we see a lot of colossal packets that's an indication that
413
+ // we might be trying to send too much data over the pubsub and are in danger of
414
+ // failing to publish.
415
+ const (
416
+ colossalThreshold = 7600
417
+ messageSizeNormal = "normal"
418
+ messageSizeColossal = "colossal"
419
+ )
420
+
421
+ // Describe implements, along with Collect, the prometheus.Collector interface
422
+ // for metrics.
423
+ func (p * PGPubsub ) Describe (descs chan <- * prometheus.Desc ) {
424
+ // explicit metrics
425
+ p .publishesTotal .Describe (descs )
426
+ p .subscribesTotal .Describe (descs )
427
+ p .messagesTotal .Describe (descs )
428
+ p .publishedBytesTotal .Describe (descs )
429
+ p .receivedBytesTotal .Describe (descs )
430
+ p .disconnectionsTotal .Describe (descs )
431
+ p .connected .Describe (descs )
432
+
433
+ // implicit metrics
434
+ descs <- currentSubscribersDesc
435
+ descs <- currentEventsDesc
436
+ }
437
+
438
+ // Collect implements, along with Describe, the prometheus.Collector interface
439
+ // for metrics
440
+ func (p * PGPubsub ) Collect (metrics chan <- prometheus.Metric ) {
441
+ // explicit metrics
442
+ p .publishesTotal .Collect (metrics )
443
+ p .subscribesTotal .Collect (metrics )
444
+ p .messagesTotal .Collect (metrics )
445
+ p .publishedBytesTotal .Collect (metrics )
446
+ p .receivedBytesTotal .Collect (metrics )
447
+ p .disconnectionsTotal .Collect (metrics )
448
+ p .connected .Collect (metrics )
449
+
450
+ // implicit metrics
451
+ p .mut .Lock ()
452
+ events := len (p .queues )
453
+ subs := 0
454
+ for _ , subscriberMap := range p .queues {
455
+ subs += len (subscriberMap )
456
+ }
457
+ p .mut .Unlock ()
458
+ metrics <- prometheus .MustNewConstMetric (currentSubscribersDesc , prometheus .GaugeValue , float64 (subs ))
459
+ metrics <- prometheus .MustNewConstMetric (currentEventsDesc , prometheus .GaugeValue , float64 (events ))
460
+ }
461
+
462
+ // New creates a new Pubsub implementation using a PostgreSQL connection.
463
+ func New (startCtx context.Context , logger slog.Logger , database * sql.DB , connectURL string ) (* PGPubsub , error ) {
369
464
// Start a new context that will be canceled when the pubsub is closed.
370
465
ctx , cancel := context .WithCancel (context .Background ())
371
- pgPubsub := & pgPubsub {
466
+ p := & PGPubsub {
372
467
ctx : ctx ,
373
468
cancel : cancel ,
374
469
logger : logger ,
375
470
listenDone : make (chan struct {}),
376
471
db : database ,
377
- pgListener : listener ,
378
472
queues : make (map [string ]map [uuid.UUID ]* msgQueue ),
473
+
474
+ publishesTotal : prometheus .NewCounterVec (prometheus.CounterOpts {
475
+ Namespace : "coder" ,
476
+ Subsystem : "pubsub" ,
477
+ Name : "publishes_total" ,
478
+ Help : "Total number of calls to Publish" ,
479
+ }, []string {"success" }),
480
+ subscribesTotal : prometheus .NewCounterVec (prometheus.CounterOpts {
481
+ Namespace : "coder" ,
482
+ Subsystem : "pubsub" ,
483
+ Name : "subscribes_total" ,
484
+ Help : "Total number of calls to Subscribe/SubscribeWithErr" ,
485
+ }, []string {"success" }),
486
+ messagesTotal : prometheus .NewCounterVec (prometheus.CounterOpts {
487
+ Namespace : "coder" ,
488
+ Subsystem : "pubsub" ,
489
+ Name : "messages_total" ,
490
+ Help : "Total number of messages received from postgres" ,
491
+ }, []string {"size" }),
492
+ publishedBytesTotal : prometheus .NewCounter (prometheus.CounterOpts {
493
+ Namespace : "coder" ,
494
+ Subsystem : "pubsub" ,
495
+ Name : "published_bytes_total" ,
496
+ Help : "Total number of bytes successfully published across all publishes" ,
497
+ }),
498
+ receivedBytesTotal : prometheus .NewCounter (prometheus.CounterOpts {
499
+ Namespace : "coder" ,
500
+ Subsystem : "pubsub" ,
501
+ Name : "received_bytes_total" ,
502
+ Help : "Total number of bytes received across all messages" ,
503
+ }),
504
+ disconnectionsTotal : prometheus .NewCounter (prometheus.CounterOpts {
505
+ Namespace : "coder" ,
506
+ Subsystem : "pubsub" ,
507
+ Name : "disconnections_total" ,
508
+ Help : "Total number of times we disconnected unexpectedly from postgres" ,
509
+ }),
510
+ connected : prometheus .NewGauge (prometheus.GaugeOpts {
511
+ Namespace : "coder" ,
512
+ Subsystem : "pubsub" ,
513
+ Name : "connected" ,
514
+ Help : "Whether we are connected (1) or not connected (0) to postgres" ,
515
+ }),
516
+ }
517
+ if err := p .startListener (startCtx , connectURL ); err != nil {
518
+ return nil , err
379
519
}
380
- go pgPubsub .listen ()
520
+ go p .listen ()
381
521
logger .Info (ctx , "pubsub has started" )
382
- return pgPubsub , nil
522
+ return p , nil
383
523
}
0 commit comments