Skip to content

Commit acf2f3f

Browse files
committed
fix: close pg PubSub listener to avoid race
1 parent 2d61d53 commit acf2f3f

File tree

2 files changed

+26
-10
lines changed

2 files changed

+26
-10
lines changed

coderd/database/pubsub/pubsub.go

+26-10
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,15 @@ func (q *msgQueue) dropped() {
162162

163163
// Pubsub implementation using PostgreSQL.
164164
type pgPubsub struct {
165-
ctx context.Context
166-
cancel context.CancelFunc
167-
listenDone chan struct{}
168-
pgListener *pq.Listener
169-
db *sql.DB
170-
mut sync.Mutex
171-
queues map[string]map[uuid.UUID]*msgQueue
165+
ctx context.Context
166+
cancel context.CancelFunc
167+
listenDone chan struct{}
168+
pgListener *pq.Listener
169+
db *sql.DB
170+
mut sync.Mutex
171+
queues map[string]map[uuid.UUID]*msgQueue
172+
closedListener bool
173+
closeListenerErr error
172174
}
173175

174176
// BufferSize is the maximum number of unhandled messages we will buffer
@@ -240,15 +242,29 @@ func (p *pgPubsub) Publish(event string, message []byte) error {
240242
// Close closes the pubsub instance.
241243
func (p *pgPubsub) Close() error {
242244
p.cancel()
243-
err := p.pgListener.Close()
245+
err := p.closeListener()
244246
<-p.listenDone
245247
return err
246248
}
247249

250+
// closeListener closes the pgListener, unless it has already been closed.
251+
func (p *pgPubsub) closeListener() error {
252+
p.mut.Lock()
253+
defer p.mut.Unlock()
254+
if p.closedListener {
255+
return p.closeListenerErr
256+
}
257+
p.closeListenerErr = p.pgListener.Close()
258+
p.closedListener = true
259+
return p.closeListenerErr
260+
}
261+
248262
// listen begins receiving messages on the pq listener.
249263
func (p *pgPubsub) listen() {
250-
defer close(p.listenDone)
251-
defer p.pgListener.Close()
264+
defer func() {
265+
_ = p.closeListener()
266+
close(p.listenDone)
267+
}()
252268

253269
var (
254270
notif *pq.Notification
-3 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)