9
9
10
10
"github.com/google/uuid"
11
11
"github.com/lib/pq"
12
+ "github.com/prometheus/client_golang/prometheus"
12
13
"golang.org/x/xerrors"
13
14
14
15
"cdr.dev/slog"
@@ -162,8 +163,8 @@ func (q *msgQueue) dropped() {
162
163
q .cond .Broadcast ()
163
164
}
164
165
165
- // Pubsub implementation using PostgreSQL.
166
- type pgPubsub struct {
166
+ // PGPubsub is a pubsub implementation using PostgreSQL.
167
+ type PGPubsub struct {
167
168
ctx context.Context
168
169
cancel context.CancelFunc
169
170
logger slog.Logger
@@ -174,29 +175,38 @@ type pgPubsub struct {
174
175
queues map [string ]map [uuid.UUID ]* msgQueue
175
176
closedListener bool
176
177
closeListenerErr error
178
+
179
+ publishesTotal * prometheus.CounterVec
180
+ subscribesTotal * prometheus.CounterVec
181
+ messagesTotal * prometheus.CounterVec
182
+ disconnectionsTotal prometheus.Counter
183
+ connected prometheus.Gauge
177
184
}
178
185
179
186
// BufferSize is the maximum number of unhandled messages we will buffer
180
187
// for a subscriber before dropping messages.
181
188
const BufferSize = 2048
182
189
183
190
// Subscribe calls the listener when an event matching the name is received.
184
- func (p * pgPubsub ) Subscribe (event string , listener Listener ) (cancel func (), err error ) {
191
+ func (p * PGPubsub ) Subscribe (event string , listener Listener ) (cancel func (), err error ) {
185
192
return p .subscribeQueue (event , newMsgQueue (p .ctx , listener , nil ))
186
193
}
187
194
188
- func (p * pgPubsub ) SubscribeWithErr (event string , listener ListenerWithErr ) (cancel func (), err error ) {
195
+ func (p * PGPubsub ) SubscribeWithErr (event string , listener ListenerWithErr ) (cancel func (), err error ) {
189
196
return p .subscribeQueue (event , newMsgQueue (p .ctx , nil , listener ))
190
197
}
191
198
192
- func (p * pgPubsub ) subscribeQueue (event string , newQ * msgQueue ) (cancel func (), err error ) {
199
+ func (p * PGPubsub ) subscribeQueue (event string , newQ * msgQueue ) (cancel func (), err error ) {
193
200
p .mut .Lock ()
194
201
defer p .mut .Unlock ()
195
202
defer func () {
196
203
if err != nil {
197
204
// if we hit an error, we need to close the queue so we don't
198
205
// leak its goroutine.
199
206
newQ .close ()
207
+ p .subscribesTotal .WithLabelValues ("false" ).Inc ()
208
+ } else {
209
+ p .subscribesTotal .WithLabelValues ("true" ).Inc ()
200
210
}
201
211
}()
202
212
@@ -239,20 +249,22 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
239
249
}, nil
240
250
}
241
251
242
- func (p * pgPubsub ) Publish (event string , message []byte ) error {
252
+ func (p * PGPubsub ) Publish (event string , message []byte ) error {
243
253
p .logger .Debug (p .ctx , "publish" , slog .F ("event" , event ), slog .F ("message_len" , len (message )))
244
254
// This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't
245
255
// support the first parameter being a prepared statement.
246
256
//nolint:gosec
247
257
_ , err := p .db .ExecContext (p .ctx , `select pg_notify(` + pq .QuoteLiteral (event )+ `, $1)` , message )
248
258
if err != nil {
259
+ p .publishesTotal .WithLabelValues ("false" ).Inc ()
249
260
return xerrors .Errorf ("exec pg_notify: %w" , err )
250
261
}
262
+ p .publishesTotal .WithLabelValues ("true" ).Inc ()
251
263
return nil
252
264
}
253
265
254
266
// Close closes the pubsub instance.
255
- func (p * pgPubsub ) Close () error {
267
+ func (p * PGPubsub ) Close () error {
256
268
p .logger .Info (p .ctx , "pubsub is closing" )
257
269
p .cancel ()
258
270
err := p .closeListener ()
@@ -262,7 +274,7 @@ func (p *pgPubsub) Close() error {
262
274
}
263
275
264
276
// closeListener closes the pgListener, unless it has already been closed.
265
- func (p * pgPubsub ) closeListener () error {
277
+ func (p * PGPubsub ) closeListener () error {
266
278
p .mut .Lock ()
267
279
defer p .mut .Unlock ()
268
280
if p .closedListener {
@@ -274,7 +286,7 @@ func (p *pgPubsub) closeListener() error {
274
286
}
275
287
276
288
// listen begins receiving messages on the pq listener.
277
- func (p * pgPubsub ) listen () {
289
+ func (p * PGPubsub ) listen () {
278
290
defer func () {
279
291
p .logger .Info (p .ctx , "pubsub listen stopped receiving notify" )
280
292
cErr := p .closeListener ()
@@ -307,7 +319,13 @@ func (p *pgPubsub) listen() {
307
319
}
308
320
}
309
321
310
- func (p * pgPubsub ) listenReceive (notif * pq.Notification ) {
322
+ func (p * PGPubsub ) listenReceive (notif * pq.Notification ) {
323
+ sizeLabel := messageSizeNormal
324
+ if len (notif .Extra ) >= colossalThreshold {
325
+ sizeLabel = messageSizeColossal
326
+ }
327
+ p .messagesTotal .WithLabelValues (sizeLabel ).Inc ()
328
+
311
329
p .mut .Lock ()
312
330
defer p .mut .Unlock ()
313
331
queues , ok := p .queues [notif .Channel ]
@@ -320,7 +338,7 @@ func (p *pgPubsub) listenReceive(notif *pq.Notification) {
320
338
}
321
339
}
322
340
323
- func (p * pgPubsub ) recordReconnect () {
341
+ func (p * PGPubsub ) recordReconnect () {
324
342
p .mut .Lock ()
325
343
defer p .mut .Unlock ()
326
344
for _ , listeners := range p .queues {
@@ -330,20 +348,23 @@ func (p *pgPubsub) recordReconnect() {
330
348
}
331
349
}
332
350
333
- // New creates a new Pubsub implementation using a PostgreSQL connection.
334
- func New ( ctx context. Context , logger slog. Logger , database * sql. DB , connectURL string ) ( Pubsub , error ) {
351
+ func ( p * PGPubsub ) startListener ( ctx context. Context , connectURL string ) error {
352
+ p . connected . Set ( 0 )
335
353
// Creates a new listener using pq.
336
354
errCh := make (chan error )
337
- listener : = pq .NewListener (connectURL , time .Second , time .Minute , func (t pq.ListenerEventType , err error ) {
355
+ p . pgListener = pq .NewListener (connectURL , time .Second , time .Minute , func (t pq.ListenerEventType , err error ) {
338
356
switch t {
339
357
case pq .ListenerEventConnected :
340
- logger .Info (ctx , "pubsub connected to postgres" )
358
+ p .logger .Info (ctx , "pubsub connected to postgres" )
359
+ p .connected .Set (1.0 )
341
360
case pq .ListenerEventDisconnected :
342
- logger .Error (ctx , "pubsub disconnected from postgres" , slog .Error (err ))
361
+ p .logger .Error (ctx , "pubsub disconnected from postgres" , slog .Error (err ))
362
+ p .connected .Set (0 )
343
363
case pq .ListenerEventReconnected :
344
- logger .Info (ctx , "pubsub reconnected to postgres" )
364
+ p .logger .Info (ctx , "pubsub reconnected to postgres" )
365
+ p .connected .Set (1 )
345
366
case pq .ListenerEventConnectionAttemptFailed :
346
- logger .Error (ctx , "pubsub failed to connect to postgres" , slog .Error (err ))
367
+ p . logger .Error (ctx , "pubsub failed to connect to postgres" , slog .Error (err ))
347
368
}
348
369
// This callback gets events whenever the connection state changes.
349
370
// Don't send if the errChannel has already been closed.
@@ -358,26 +379,120 @@ func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL s
358
379
select {
359
380
case err := <- errCh :
360
381
if err != nil {
361
- _ = listener .Close ()
362
- return nil , xerrors .Errorf ("create pq listener: %w" , err )
382
+ _ = p . pgListener .Close ()
383
+ return xerrors .Errorf ("create pq listener: %w" , err )
363
384
}
364
385
case <- ctx .Done ():
365
- _ = listener .Close ()
366
- return nil , ctx .Err ()
386
+ _ = p . pgListener .Close ()
387
+ return ctx .Err ()
367
388
}
389
+ return nil
390
+ }
368
391
392
+ // these are the metrics we compute implicitly from our existing data structures
393
+ var (
394
+ currentSubscribersDesc = prometheus .NewDesc (
395
+ "coder_pubsub_current_subscribers" ,
396
+ "The current number of active pubsub subscribers" ,
397
+ nil , nil ,
398
+ )
399
+ currentEventsDesc = prometheus .NewDesc (
400
+ "coder_pubsub_current_events" ,
401
+ "The current number of pubsub event channels listened for" ,
402
+ nil , nil ,
403
+ )
404
+ )
405
+
406
+ // We'll track messages as size "normal" and "colossal", where the
407
+ // latter are messages larger than 7600 bytes, or 95% of the postgres
408
+ // notify limit. If we see a lot of colossal packets that's an indication that
409
+ // we might be trying to send too much data over the pubsub and are in danger of
410
+ // failing to publish.
411
+ const (
412
+ colossalThreshold = 7600
413
+ messageSizeNormal = "normal"
414
+ messageSizeColossal = "colossal"
415
+ )
416
+
417
+ // Describe implements, along with Collect, the prometheus.Collector interface
418
+ // for metrics.
419
+ func (p * PGPubsub ) Describe (descs chan <- * prometheus.Desc ) {
420
+ // explicit metrics
421
+ p .publishesTotal .Describe (descs )
422
+ p .subscribesTotal .Describe (descs )
423
+ p .messagesTotal .Describe (descs )
424
+ p .disconnectionsTotal .Describe (descs )
425
+ p .connected .Describe (descs )
426
+
427
+ // implicit metrics
428
+ descs <- currentSubscribersDesc
429
+ descs <- currentEventsDesc
430
+ }
431
+
432
+ // Collect implements, along with Describe, the prometheus.Collector interface
433
+ // for metrics
434
+ func (p * PGPubsub ) Collect (metrics chan <- prometheus.Metric ) {
435
+ // explicit metrics
436
+ p .publishesTotal .Collect (metrics )
437
+ p .subscribesTotal .Collect (metrics )
438
+ p .messagesTotal .Collect (metrics )
439
+ p .disconnectionsTotal .Collect (metrics )
440
+ p .connected .Collect (metrics )
441
+
442
+ // implicit metrics
443
+ p .mut .Lock ()
444
+ events := len (p .queues )
445
+ subs := 0
446
+ for _ , subscriberMap := range p .queues {
447
+ subs += len (subscriberMap )
448
+ }
449
+ p .mut .Unlock ()
450
+ metrics <- prometheus .MustNewConstMetric (currentSubscribersDesc , prometheus .GaugeValue , float64 (subs ))
451
+ metrics <- prometheus .MustNewConstMetric (currentEventsDesc , prometheus .GaugeValue , float64 (events ))
452
+ }
453
+
454
+ // New creates a new Pubsub implementation using a PostgreSQL connection.
455
+ func New (startCtx context.Context , logger slog.Logger , database * sql.DB , connectURL string ) (* PGPubsub , error ) {
369
456
// Start a new context that will be canceled when the pubsub is closed.
370
457
ctx , cancel := context .WithCancel (context .Background ())
371
- pgPubsub := & pgPubsub {
458
+ p := & PGPubsub {
372
459
ctx : ctx ,
373
460
cancel : cancel ,
374
461
logger : logger ,
375
462
listenDone : make (chan struct {}),
376
463
db : database ,
377
- pgListener : listener ,
378
464
queues : make (map [string ]map [uuid.UUID ]* msgQueue ),
465
+
466
+ publishesTotal : prometheus .NewCounterVec (prometheus.CounterOpts {
467
+ Namespace : "coder" ,
468
+ Subsystem : "pubsub" ,
469
+ Name : "publishes_total" ,
470
+ }, []string {"success" }),
471
+ subscribesTotal : prometheus .NewCounterVec (prometheus.CounterOpts {
472
+ Namespace : "coder" ,
473
+ Subsystem : "pubsub" ,
474
+ Name : "subscribes_total" ,
475
+ }, []string {"success" }),
476
+ messagesTotal : prometheus .NewCounterVec (prometheus.CounterOpts {
477
+ Namespace : "coder" ,
478
+ Subsystem : "pubsub" ,
479
+ Name : "messages_total" ,
480
+ }, []string {"size" }),
481
+ disconnectionsTotal : prometheus .NewCounter (prometheus.CounterOpts {
482
+ Namespace : "coder" ,
483
+ Subsystem : "pubsub" ,
484
+ Name : "disconnections_total" ,
485
+ }),
486
+ connected : prometheus .NewGauge (prometheus.GaugeOpts {
487
+ Namespace : "coder" ,
488
+ Subsystem : "pubsub" ,
489
+ Name : "connected" ,
490
+ }),
491
+ }
492
+ if err := p .startListener (startCtx , connectURL ); err != nil {
493
+ return nil , err
379
494
}
380
- go pgPubsub .listen ()
495
+ go p .listen ()
381
496
logger .Info (ctx , "pubsub has started" )
382
- return pgPubsub , nil
497
+ return p , nil
383
498
}
0 commit comments