-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports #30708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
e7c770a
to
d5fd552
Compare
2ee34de
to
357cdb9
Compare
} | ||
$handled(null); | ||
|
||
usleep(1000000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two changes here:
- I changed from .2 seconds to 1 second. Waiting only 200ms between when there is not a message is quite rapid. For reference, in Laravel, this defaults to 3 and is configurable on the command/worker.
357cdb9
to
2d1f17b
Compare
This is ready! tl;dr In recent changes, the |
2d1f17b
to
fa4776f
Compare
The reasoning of having |
src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php
Outdated
Show resolved
Hide resolved
3864a11
to
8866804
Compare
* Stop receiving some messages. | ||
*/ | ||
public function stop(): void; | ||
public function get(): iterable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The iterable, like always, was kind of a pain, compared to just returning one. But, it is more flexible and end-users won't touch this method much anyways.
8866804
to
9c9c4e0
Compare
This is ready for review! Description updated. Better viewed (especially the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like this.
I am not a fan of priority in a message system, but this is priority of transports. That is a different thing. Im all 👍
(after my comments are addressed/answered)
f5ebe36
to
8356476
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good. I’m happy with the updated doc block on ReceiverInterface::get().
Thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor typos
src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php
Show resolved
Hide resolved
8356476
to
e800bd5
Compare
Thank you @weaverryan. |
…ker with prioritized transports (weaverryan) This PR was squashed before being merged into the 4.3-dev branch (closes #30708). Discussion ---------- [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | Helps with #30699 | License | MIT | Doc PR | TODO Highlights: * `messenger:consume` can now consume messages from multiple transports with priority ❗️ ``` bin/console messenger:consume amqp_high amqp_medium amqp_low ``` * How long you want to sleep before checking more messages is now an option to `messenger:consume` * `ReceiverInterface::receive()` is replaced with `ReceiverInterface::get()` * Logic for looping & sleeping is moved into `Worker` Commits ------- e800bd5 [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports
typos fixed in f4176b0 |
@@ -25,21 +25,25 @@ interface ReceiverInterface | |||
/** | |||
* Receive some messages to the given handler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doc is outdated: There is no "given handler"
…erInterface::get` phpdoc. (sroze) This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Remove the mention of handler in the `ReceiverInterface::get` phpdoc. | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | no | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | #30708 (review) | License | MIT | Doc PR | ø As spotted by @Tobion, we don't have an handler as an argument anymore. Commits ------- 9c63112 Remove the mention of handler in the phpdoc.
…ryan) This PR was squashed before being merged into the 4.3-dev branch (closes #30754). Discussion ---------- [Messenger] New messenger:stop-workers Command | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | Kinda of #29451 | License | MIT | Doc PR | symfony/symfony-docs#11236 o/ me again. This requires and is built on top of #30708 When you deploy, all workers need to be stopped and restarted. That's not currently possible, unless you manually track the pids and send a SIGTERM signal. We can make that much easier :). Now run: ``` bin/console messenger:stop-workers ``` And it will signal to all workers (even if they're distributed on other servers) that they should stop, once they finish processing their current message. This is done via a key in `cache.app`. Cheers! Commits ------- 5897162 [Messenger] New messenger:stop-workers Command
Just thinking out loud for reference: The prioritized transports can be used to prioritize messages by publishing messages to different transports and consuming them in order. But it's a different way than rabbitmq uses message priority. If you want to use rabbitmq priorities instead, you can do that by setting the priority attribute on the |
Highlights:
messenger:consume
can now consume messages from multiple transports with priority ❗️messenger:consume
ReceiverInterface::receive()
is replaced withReceiverInterface::get()
Worker