-
Notifications
You must be signed in to change notification settings - Fork 924
fix: fix Listen/Unlisten race on Pubsub #15315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @spikecurtis and the rest of your teammates on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Solid fix! I think down the line this code could be/become hard to reason about as others look at it or make changes, but I don't have a better idea currently that doesn't have potential performance implications.
p.qMu.Lock() | ||
defer p.qMu.Unlock() | ||
qSet, ok := p.queues[event] | ||
if ok && len(qSet.m) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we wanted to be really careful here we could check that the channel on qSet is still ours. It'll work without the check though, just the delete could (very) theoretically be done by another routine, which would still only be a semantic difference.
@@ -188,6 +187,19 @@ func (l pqListenerShim) NotifyChan() <-chan *pq.Notification { | |||
return l.Notify | |||
} | |||
|
|||
type queueSet struct { | |||
m map[*msgQueue]struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: m => q/queues. I feel this would explain better what one is looking at when viewing the code.
Yeah, I'm not feeling like it's a great design, but I also couldn't come up with anything better. 2 designs I rejected:
|
79d7fab
to
a6ca3e1
Compare
Yeah, I think you were right to reject those options, what you ended up is still better. My idea was to have a single goroutine responsible for coordinating sub/unsub, basically anything queues related. We'd still have to solve for not doing blocking operations synchronously, though. Could look something like this: func (p *PGPubsub) listenLoop() {
var queues map[string]... // p.queues no longer needs locking
for {
r := <-p.listen
switch req.(type) {
case listen:
if _, ok := queues[r.event]; !ok {
// ~go p.pqListener.Listen(); r.err <- err
} else {
r.err <- nil
}
case unlisten:
// like listen, but unlisten
case receive:
// enqueue notif
}
}
func (p *PGPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), err error) {
errc := make(chan error, 1)
p.listen <- listen{"event", newQ, errc}
_ = <-errc
return func() {
p.listen <- unlisten{"event", newQ, errc}
}, nil
} |
Fixes #15312
When we need to
Unlisten()
for an event, instead of immediately removing the event from thep.queues
, we store a channel to signal any goroutines trying to Subscribe to the same event when we are done. OnSubscribe
, if the channel is present, wait for it before callingListen
to ensure the ordering is correct.