@@ -11,7 +11,6 @@ import (
11
11
"sync/atomic"
12
12
"time"
13
13
14
- "github.com/google/uuid"
15
14
"github.com/lib/pq"
16
15
"github.com/prometheus/client_golang/prometheus"
17
16
"golang.org/x/xerrors"
@@ -188,6 +187,19 @@ func (l pqListenerShim) NotifyChan() <-chan *pq.Notification {
188
187
return l .Notify
189
188
}
190
189
190
+ type queueSet struct {
191
+ m map [* msgQueue ]struct {}
192
+ // unlistenInProgress will be non-nil if another goroutine is unlistening for the event this
193
+ // queueSet corresponds to. If non-nil, that goroutine will close the channel when it is done.
194
+ unlistenInProgress chan struct {}
195
+ }
196
+
197
+ func newQueueSet () * queueSet {
198
+ return & queueSet {
199
+ m : make (map [* msgQueue ]struct {}),
200
+ }
201
+ }
202
+
191
203
// PGPubsub is a pubsub implementation using PostgreSQL.
192
204
type PGPubsub struct {
193
205
logger slog.Logger
@@ -196,7 +208,7 @@ type PGPubsub struct {
196
208
db * sql.DB
197
209
198
210
qMu sync.Mutex
199
- queues map [string ]map [uuid. UUID ] * msgQueue
211
+ queues map [string ]* queueSet
200
212
201
213
// making the close state its own mutex domain simplifies closing logic so
202
214
// that we don't have to hold the qMu --- which could block processing
@@ -243,6 +255,48 @@ func (p *PGPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
243
255
}
244
256
}()
245
257
258
+ var (
259
+ unlistenInProgress <- chan struct {}
260
+ // MUST hold the p.qMu lock to manipulate this!
261
+ qs * queueSet
262
+ )
263
+ func () {
264
+ p .qMu .Lock ()
265
+ defer p .qMu .Unlock ()
266
+
267
+ var ok bool
268
+ if qs , ok = p .queues [event ]; ! ok {
269
+ qs = newQueueSet ()
270
+ p .queues [event ] = qs
271
+ }
272
+ qs .m [newQ ] = struct {}{}
273
+ unlistenInProgress = qs .unlistenInProgress
274
+ }()
275
+ // NOTE there cannot be any `return` statements between here and the next +-+, otherwise the
276
+ // assumptions the defer makes could be violated
277
+ if unlistenInProgress != nil {
278
+ // We have to wait here because we don't want our `Listen` call to happen before the other
279
+ // goroutine calls `Unlisten`. That would result in this subscription not getting any
280
+ // events. c.f. https://github.com/coder/coder/issues/15312
281
+ p .logger .Debug (context .Background (), "waiting for Unlisten in progress" , slog .F ("event" , event ))
282
+ <- unlistenInProgress
283
+ p .logger .Debug (context .Background (), "unlistening complete" , slog .F ("event" , event ))
284
+ }
285
+ // +-+ (see above)
286
+ defer func () {
287
+ if err != nil {
288
+ p .qMu .Lock ()
289
+ defer p .qMu .Unlock ()
290
+ delete (qs .m , newQ )
291
+ if len (qs .m ) == 0 {
292
+ // we know that newQ was in the queueSet since we last unlocked, so there cannot
293
+ // have been any _new_ goroutines trying to Unlisten(). Therefore, if the queueSet
294
+ // is now empty, it's safe to delete.
295
+ delete (p .queues , event )
296
+ }
297
+ }
298
+ }()
299
+
246
300
// The pgListener waits for the response to `LISTEN` on a mainloop that also dispatches
247
301
// notifies. We need to avoid holding the mutex while this happens, since holding the mutex
248
302
// blocks reading notifications and can deadlock the pgListener.
@@ -258,32 +312,40 @@ func (p *PGPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
258
312
if err != nil {
259
313
return nil , xerrors .Errorf ("listen: %w" , err )
260
314
}
261
- p .qMu .Lock ()
262
- defer p .qMu .Unlock ()
263
315
264
- var eventQs map [uuid.UUID ]* msgQueue
265
- var ok bool
266
- if eventQs , ok = p .queues [event ]; ! ok {
267
- eventQs = make (map [uuid.UUID ]* msgQueue )
268
- p .queues [event ] = eventQs
269
- }
270
- id := uuid .New ()
271
- eventQs [id ] = newQ
272
316
return func () {
273
- p .qMu .Lock ()
274
- listeners := p .queues [event ]
275
- q := listeners [id ]
276
- q .close ()
277
- delete (listeners , id )
278
- if len (listeners ) == 0 {
279
- delete (p .queues , event )
280
- }
281
- listenerCount := len (listeners )
282
- p .qMu .Unlock ()
283
- // as above, we must not hold the lock while calling into pgListener
317
+ var unlistening chan struct {}
318
+ func () {
319
+ p .qMu .Lock ()
320
+ defer p .qMu .Unlock ()
321
+ newQ .close ()
322
+ qSet , ok := p .queues [event ]
323
+ if ! ok {
324
+ p .logger .Critical (context .Background (), "event was removed before cancel" , slog .F ("event" , event ))
325
+ return
326
+ }
327
+ delete (qSet .m , newQ )
328
+ if len (qSet .m ) == 0 {
329
+ unlistening = make (chan struct {})
330
+ qSet .unlistenInProgress = unlistening
331
+ }
332
+ }()
284
333
285
- if listenerCount == 0 {
334
+ // as above, we must not hold the lock while calling into pgListener
335
+ if unlistening != nil {
286
336
uErr := p .pgListener .Unlisten (event )
337
+ close (unlistening )
338
+ // we can now delete the queueSet if it is empty.
339
+ func () {
340
+ p .qMu .Lock ()
341
+ defer p .qMu .Unlock ()
342
+ qSet , ok := p .queues [event ]
343
+ if ok && len (qSet .m ) == 0 {
344
+ p .logger .Debug (context .Background (), "removing queueSet" , slog .F ("event" , event ))
345
+ delete (p .queues , event )
346
+ }
347
+ }()
348
+
287
349
p .closeMu .Lock ()
288
350
defer p .closeMu .Unlock ()
289
351
if uErr != nil && ! p .closedListener {
@@ -361,21 +423,21 @@ func (p *PGPubsub) listenReceive(notif *pq.Notification) {
361
423
362
424
p .qMu .Lock ()
363
425
defer p .qMu .Unlock ()
364
- queues , ok := p .queues [notif .Channel ]
426
+ qSet , ok := p .queues [notif .Channel ]
365
427
if ! ok {
366
428
return
367
429
}
368
430
extra := []byte (notif .Extra )
369
- for _ , q := range queues {
431
+ for q := range qSet . m {
370
432
q .enqueue (extra )
371
433
}
372
434
}
373
435
374
436
func (p * PGPubsub ) recordReconnect () {
375
437
p .qMu .Lock ()
376
438
defer p .qMu .Unlock ()
377
- for _ , listeners := range p .queues {
378
- for _ , q := range listeners {
439
+ for _ , qSet := range p .queues {
440
+ for q := range qSet . m {
379
441
q .dropped ()
380
442
}
381
443
}
@@ -590,8 +652,8 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
590
652
p .qMu .Lock ()
591
653
events := len (p .queues )
592
654
subs := 0
593
- for _ , subscriberMap := range p .queues {
594
- subs += len (subscriberMap )
655
+ for _ , qSet := range p .queues {
656
+ subs += len (qSet . m )
595
657
}
596
658
p .qMu .Unlock ()
597
659
metrics <- prometheus .MustNewConstMetric (currentSubscribersDesc , prometheus .GaugeValue , float64 (subs ))
@@ -629,7 +691,7 @@ func newWithoutListener(logger slog.Logger, db *sql.DB) *PGPubsub {
629
691
logger : logger ,
630
692
listenDone : make (chan struct {}),
631
693
db : db ,
632
- queues : make (map [string ]map [uuid. UUID ] * msgQueue ),
694
+ queues : make (map [string ]* queueSet ),
633
695
latencyMeasurer : NewLatencyMeasurer (logger .Named ("latency-measurer" )),
634
696
635
697
publishesTotal : prometheus .NewCounterVec (prometheus.CounterOpts {
0 commit comments