@@ -162,13 +162,15 @@ func (q *msgQueue) dropped() {
162
162
163
163
// Pubsub implementation using PostgreSQL.
164
164
type pgPubsub struct {
165
- ctx context.Context
166
- cancel context.CancelFunc
167
- listenDone chan struct {}
168
- pgListener * pq.Listener
169
- db * sql.DB
170
- mut sync.Mutex
171
- queues map [string ]map [uuid.UUID ]* msgQueue
165
+ ctx context.Context
166
+ cancel context.CancelFunc
167
+ listenDone chan struct {}
168
+ pgListener * pq.Listener
169
+ db * sql.DB
170
+ mut sync.Mutex
171
+ queues map [string ]map [uuid.UUID ]* msgQueue
172
+ closedListener bool
173
+ closeListenerErr error
172
174
}
173
175
174
176
// BufferSize is the maximum number of unhandled messages we will buffer
@@ -240,15 +242,29 @@ func (p *pgPubsub) Publish(event string, message []byte) error {
240
242
// Close closes the pubsub instance.
241
243
func (p * pgPubsub ) Close () error {
242
244
p .cancel ()
243
- err := p .pgListener . Close ()
245
+ err := p .closeListener ()
244
246
<- p .listenDone
245
247
return err
246
248
}
247
249
250
+ // closeListener closes the pgListener, unless it has already been closed.
251
+ func (p * pgPubsub ) closeListener () error {
252
+ p .mut .Lock ()
253
+ defer p .mut .Unlock ()
254
+ if p .closedListener {
255
+ return p .closeListenerErr
256
+ }
257
+ p .closeListenerErr = p .pgListener .Close ()
258
+ p .closedListener = true
259
+ return p .closeListenerErr
260
+ }
261
+
248
262
// listen begins receiving messages on the pq listener.
249
263
func (p * pgPubsub ) listen () {
250
- defer close (p .listenDone )
251
- defer p .pgListener .Close ()
264
+ defer func () {
265
+ _ = p .closeListener ()
266
+ close (p .listenDone )
267
+ }()
252
268
253
269
var (
254
270
notif * pq.Notification
0 commit comments