Skip to content

[WIP][Messenger] Multiple queue support + prioritization #30699

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

weaverryan
Copy link
Member

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? WIP
Deprecations? no
Tests pass? not yet
Fixed tickets Part of #30558
License MIT
Doc PR TODO

Messenger is currently missing an idea of "prioritizing" messages on the queue. Inspired by Laravel, this adds the ability to add different messages to different "queues" and then process those queues in order.

First, put your messages into whatever "queues" you want, which sort of act like "categories":

$bus->dispatch(new Envelope($message), new QueueNameStamp('low')));
$bus->dispatch(new Envelope($message), new QueueNameStamp('high')));

And when consuming, say which queues you want to consume and in which order:

./bin/console messenger:consume -vv --queues=high,low

All messages will be processed from high before going to low. I chose this approach, versus just putting everything in one queue with a priority (also possible) because it allows you to create multiple workers, each which work on only 1 (or several) queues. For example, you could have 5 workers working on the high queue and one working on low.

TODO:

  • Tests
  • Implementation in Worker & Receiver is a mess

/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class QueueNameStamp implements StampInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about defining it as final? 🤔

@nicolas-grekas
Copy link
Member

nicolas-grekas commented Mar 26, 2019

Does the definition of the "queue" belong to the place that calls dispatch (ie user code)
or does it belong to the routing layer?
Shouldn't this be handled by using virtual transports instead? Why "queue" btw?

@weaverryan
Copy link
Member Author

Does the definition of the "queue" belong to the place that calls dispatch (ie user code)
or does it belong to the routing layer?

That's a fair question. The message->transport routing is a natural fit for config, because you're using keys that are only defined in config - e.g. "send to the amqp transport - amqp is defined in config, so it fits nicely to "route" itfrom config (it would look a little funny if you were to mention this string in a controller or service). But, with queues, these aren't defined anywhere - you can just "invent them" at the time you dispatch - and whatever transport the message goes to will handle that. But, I'm not absolutely convinced one way or another.

Shouldn't this be handled by using virtual transports instead?

This is certainly a possibility - a "virtual" transport that basically "adds the queue name metadata to the message and forwards to the real transport".

Overall, it's a matter of:
A) Configuration at dispatch time via stamp
vs
B) Configuration in config via routing

The first gives nice, clear control per message. But it's harder to apply a stamp automatically to all classes that implement some interface or to messages dispatched by a 3rd party (you'd need an event listener). The second would require a new concept (virtual transports) and puts more config into YAML... which I don't love ;).

Why "queue" btw?

I'm not an AMQP expert, but AMQP has queues, and that's literally what you're doing here: putting each message into a different AMQP queue, then consuming on a queue-by-queue basis. For DoctrineTransport, it's actually just a column on the table (queue). But using the "standard" name from AMQP as the standard name/concept I think makes sense.

@nicolas-grekas
Copy link
Member

The second would require a new concept (virtual transports)

This PR is already a new concept - the stamp + the "queue" concept.
A virtual transport is less of a new concept as its ... a transport :)
But the real question is: where belongs the responsibility of deciding the queue/transport?
I don't think the dispatch point is the correct place - exactly same as deciding the transport doesn't belong there.
There is senderLocator - that's where this routing belongs, no need for a new concept here - a variation of the existing one fits to me.

@nicolas-grekas nicolas-grekas added this to the next milestone Mar 26, 2019
@sroze
Copy link
Contributor

sroze commented Mar 27, 2019

But, with queues, these aren't defined anywhere - you can just "invent them" at the time you dispatch

That's for AMQP-only. If you use Google Pub/Sub, Amazon SQS and so on, I believe you need to define them upfront.

where belongs the responsibility of deciding the queue/transport?
I don't think the dispatch point is the correct place

I agree this this.

I don't believe we need any new concept for this. Not even a "virtual transport". Why can't they just be to different transports, named high_priority_amqp and low_priority_amqp? The "only" thing we need to add is the ability for the Worker to consume messages from multiple transports at a time.

@weaverryan
Copy link
Member Author

I think @sroze is right. I'm closing this PR in favor of #30708, which I can further enhance to allow for handling multiple transports inside a single Worker.

@weaverryan weaverryan closed this Mar 27, 2019
@weaverryan weaverryan deleted the queue-name-stamp branch March 27, 2019 12:32
fabpot added a commit that referenced this pull request Mar 30, 2019
…ker with prioritized transports (weaverryan)

This PR was squashed before being merged into the 4.3-dev branch (closes #30708).

Discussion
----------

[Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | Helps with #30699
| License       | MIT
| Doc PR        | TODO

Highlights:

* `messenger:consume` can now consume messages from multiple transports with priority ❗️

```
bin/console messenger:consume amqp_high amqp_medium amqp_low
```

* How long you want to sleep before checking more messages is now an option to `messenger:consume`
* `ReceiverInterface::receive()` is replaced with `ReceiverInterface::get()`
* Logic for looping & sleeping is moved into `Worker`

Commits
-------

e800bd5 [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports
@nicolas-grekas nicolas-grekas modified the milestones: next, 4.3 Apr 30, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants