-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[WIP][Messenger] Multiple queue support + prioritization #30699
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ed0d6e9
to
01380a4
Compare
/** | ||
* @author Ryan Weaver <ryan@symfonycasts.com> | ||
*/ | ||
class QueueNameStamp implements StampInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about defining it as final
? 🤔
Does the definition of the "queue" belong to the place that calls dispatch (ie user code) |
That's a fair question. The message->transport routing is a natural fit for config, because you're using keys that are only defined in config - e.g. "send to the
This is certainly a possibility - a "virtual" transport that basically "adds the queue name metadata to the message and forwards to the real transport". Overall, it's a matter of: The first gives nice, clear control per message. But it's harder to apply a stamp automatically to all classes that implement some interface or to messages dispatched by a 3rd party (you'd need an event listener). The second would require a new concept (virtual transports) and puts more config into YAML... which I don't love ;).
I'm not an AMQP expert, but AMQP has queues, and that's literally what you're doing here: putting each message into a different AMQP queue, then consuming on a queue-by-queue basis. For DoctrineTransport, it's actually just a column on the table ( |
This PR is already a new concept - the stamp + the "queue" concept. |
That's for AMQP-only. If you use Google Pub/Sub, Amazon SQS and so on, I believe you need to define them upfront.
I agree this this. I don't believe we need any new concept for this. Not even a "virtual transport". Why can't they just be to different transports, named |
…ker with prioritized transports (weaverryan) This PR was squashed before being merged into the 4.3-dev branch (closes #30708). Discussion ---------- [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | Helps with #30699 | License | MIT | Doc PR | TODO Highlights: * `messenger:consume` can now consume messages from multiple transports with priority ❗️ ``` bin/console messenger:consume amqp_high amqp_medium amqp_low ``` * How long you want to sleep before checking more messages is now an option to `messenger:consume` * `ReceiverInterface::receive()` is replaced with `ReceiverInterface::get()` * Logic for looping & sleeping is moved into `Worker` Commits ------- e800bd5 [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports
Messenger is currently missing an idea of "prioritizing" messages on the queue. Inspired by Laravel, this adds the ability to add different messages to different "queues" and then process those queues in order.
First, put your messages into whatever "queues" you want, which sort of act like "categories":
And when consuming, say which queues you want to consume and in which order:
./bin/console messenger:consume -vv --queues=high,low
All messages will be processed from
high
before going tolow
. I chose this approach, versus just putting everything in one queue with a priority (also possible) because it allows you to create multiple workers, each which work on only 1 (or several) queues. For example, you could have 5 workers working on thehigh
queue and one working onlow
.TODO: