Skip to content

fix: fix Listen/Unlisten race on Pubsub #15315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 1, 2024

Conversation

spikecurtis
Copy link
Contributor

@spikecurtis spikecurtis commented Nov 1, 2024

Fixes #15312

When we need to Unlisten() for an event, instead of immediately removing the event from the p.queues, we store a channel to signal any goroutines trying to Subscribe to the same event when we are done. On Subscribe, if the channel is present, wait for it before calling Listen to ensure the ordering is correct.

Copy link
Contributor Author

This stack of pull requests is managed by Graphite. Learn more about stacking.

Join @spikecurtis and the rest of your teammates on Graphite Graphite

@spikecurtis spikecurtis requested a review from mafredri November 1, 2024 09:16
@spikecurtis spikecurtis marked this pull request as ready for review November 1, 2024 09:16
Copy link
Member

@mafredri mafredri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid fix! I think down the line this code could be/become hard to reason about as others look at it or make changes, but I don't have a better idea currently that doesn't have potential performance implications.

p.qMu.Lock()
defer p.qMu.Unlock()
qSet, ok := p.queues[event]
if ok && len(qSet.m) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wanted to be really careful here we could check that the channel on qSet is still ours. It'll work without the check though, just the delete could (very) theoretically be done by another routine, which would still only be a semantic difference.

@@ -188,6 +187,19 @@ func (l pqListenerShim) NotifyChan() <-chan *pq.Notification {
return l.Notify
}

type queueSet struct {
m map[*msgQueue]struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: m => q/queues. I feel this would explain better what one is looking at when viewing the code.

Copy link
Contributor Author

Yeah, I'm not feeling like it's a great design, but I also couldn't come up with anything better.

2 designs I rejected:

  1. I considered having the p.queues be the source of truth with a background goroutine reconciling the Listen/Unlisten, sort of like we do elsewhere in the product, but the problem is that when you Subscribe, you don't want to return until you know the pubsub is listening. So, that kind of eventually consistent model won't work.

  2. We could go back to how things were and hold the p.qMu while calling Listen/Unlisten. We'd need to ensure that we keep reading from the notification channel, even if the p.qMu is locked. One way to do that is by creating an unbounded FIFO queue of notifications, so we could keep reading from the channel, even if we can't immediately process the notifications due to the lock. An unbounded queue sounds dangerous though.

@spikecurtis spikecurtis force-pushed the spike/15312-listen-unlisten branch from 79d7fab to a6ca3e1 Compare November 1, 2024 10:16
@spikecurtis spikecurtis merged commit 005ea53 into main Nov 1, 2024
27 checks passed
@spikecurtis spikecurtis deleted the spike/15312-listen-unlisten branch November 1, 2024 10:35
@github-actions github-actions bot locked and limited conversation to collaborators Nov 1, 2024
@mafredri
Copy link
Member

mafredri commented Nov 1, 2024

Yeah, I think you were right to reject those options, what you ended up is still better.

My idea was to have a single goroutine responsible for coordinating sub/unsub, basically anything queues related. We'd still have to solve for not doing blocking operations synchronously, though.

Could look something like this:

func (p *PGPubsub) listenLoop() {
	var queues map[string]... // p.queues no longer needs locking
	for {
		r := <-p.listen
		switch req.(type) {
		case listen:
			if _, ok := queues[r.event]; !ok {
				// ~go p.pqListener.Listen(); r.err <- err
			} else {
				r.err <- nil
			}
		case unlisten:
			// like listen, but unlisten
		case receive:
			// enqueue notif
	}
}

func (p *PGPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), err error) {
	errc := make(chan error, 1)
	p.listen <- listen{"event", newQ, errc}
	_ = <-errc
	return func() {
		p.listen <- unlisten{"event", newQ, errc}
	}, nil
}

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Listen/Unlisten race in Pubsub
2 participants