Skip to content

[Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports #30708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 30, 2019

Conversation

weaverryan
Copy link
Member

@weaverryan weaverryan commented Mar 26, 2019

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? no
Deprecations? no
Tests pass? yes
Fixed tickets Helps with #30699
License MIT
Doc PR TODO

Highlights:

  • messenger:consume can now consume messages from multiple transports with priority ❗️
bin/console messenger:consume amqp_high amqp_medium amqp_low
  • How long you want to sleep before checking more messages is now an option to messenger:consume
  • ReceiverInterface::receive() is replaced with ReceiverInterface::get()
  • Logic for looping & sleeping is moved into Worker

@weaverryan weaverryan requested a review from sroze as a code owner March 26, 2019 13:16
@weaverryan weaverryan force-pushed the receiver-no-handler branch from e7c770a to d5fd552 Compare March 26, 2019 13:17
@nicolas-grekas nicolas-grekas added this to the next milestone Mar 26, 2019
@weaverryan weaverryan force-pushed the receiver-no-handler branch 5 times, most recently from 2ee34de to 357cdb9 Compare March 27, 2019 03:16
}
$handled(null);

usleep(1000000);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two changes here:

  1. I changed from .2 seconds to 1 second. Waiting only 200ms between when there is not a message is quite rapid. For reference, in Laravel, this defaults to 3 and is configurable on the command/worker.

@weaverryan weaverryan force-pushed the receiver-no-handler branch from 357cdb9 to 2d1f17b Compare March 27, 2019 03:25
@weaverryan
Copy link
Member Author

weaverryan commented Mar 27, 2019

This is ready!

tl;dr In recent changes, the ReceiverInterface is not really the layer that's handling the "looping and waiting for messages" layer - the Worker is doing that. This completes that transformation. It will also allow us to create a Worker that listens to multiple queues at once, in a priority order (#30699).

@sroze
Copy link
Contributor

sroze commented Mar 27, 2019

The reasoning of having handle($handler) was for the transports to be able to catch the exceptions from the handlers and deal with the errors (i.e. ack, nack, etc...). Since we moved this to the worker, I'm happy to move back to get one message. Though, why wouldn't we return an iterable instead to match the behaviour of prefetching (i.e. some transports would have loaded X messages, not just one)?

@weaverryan weaverryan force-pushed the receiver-no-handler branch 2 times, most recently from 3864a11 to 8866804 Compare March 28, 2019 03:14
@weaverryan weaverryan changed the title [Messenger] Changes ReceiverInterface::handle() to get() [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports Mar 28, 2019
* Stop receiving some messages.
*/
public function stop(): void;
public function get(): iterable;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iterable, like always, was kind of a pain, compared to just returning one. But, it is more flexible and end-users won't touch this method much anyways.

@weaverryan weaverryan force-pushed the receiver-no-handler branch from 8866804 to 9c9c4e0 Compare March 28, 2019 03:24
@weaverryan
Copy link
Member Author

This is ready for review! Description updated.

Better viewed (especially the Worker with ?w=1). The 3 StopWhen*** classes are not new - but were moved & changed enough (they decorate WorkerInterface instead of ReceiverInerface so they look new.

Copy link
Member

@Nyholm Nyholm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this.

I am not a fan of priority in a message system, but this is priority of transports. That is a different thing. Im all 👍
(after my comments are addressed/answered)

Copy link
Member

@Nyholm Nyholm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good. I’m happy with the updated doc block on ReceiverInterface::get().

Thank you

Copy link
Member

@fabpot fabpot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor typos

@fabpot fabpot force-pushed the receiver-no-handler branch from 8356476 to e800bd5 Compare March 30, 2019 07:01
@fabpot
Copy link
Member

fabpot commented Mar 30, 2019

Thank you @weaverryan.

@fabpot fabpot merged commit e800bd5 into symfony:master Mar 30, 2019
fabpot added a commit that referenced this pull request Mar 30, 2019
…ker with prioritized transports (weaverryan)

This PR was squashed before being merged into the 4.3-dev branch (closes #30708).

Discussion
----------

[Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | Helps with #30699
| License       | MIT
| Doc PR        | TODO

Highlights:

* `messenger:consume` can now consume messages from multiple transports with priority ❗️

```
bin/console messenger:consume amqp_high amqp_medium amqp_low
```

* How long you want to sleep before checking more messages is now an option to `messenger:consume`
* `ReceiverInterface::receive()` is replaced with `ReceiverInterface::get()`
* Logic for looping & sleeping is moved into `Worker`

Commits
-------

e800bd5 [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports
@fabpot
Copy link
Member

fabpot commented Mar 30, 2019

typos fixed in f4176b0

@@ -25,21 +25,25 @@ interface ReceiverInterface
/**
* Receive some messages to the given handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc is outdated: There is no "given handler"

fabpot added a commit that referenced this pull request Mar 31, 2019
…erInterface::get` phpdoc. (sroze)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Remove the mention of handler in the `ReceiverInterface::get` phpdoc.

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #30708 (review)
| License       | MIT
| Doc PR        | ø

As spotted by @Tobion, we don't have an handler as an argument anymore.

Commits
-------

9c63112 Remove the mention of handler in the phpdoc.
@weaverryan weaverryan deleted the receiver-no-handler branch March 31, 2019 14:42
fabpot added a commit that referenced this pull request Mar 31, 2019
…ryan)

This PR was squashed before being merged into the 4.3-dev branch (closes #30754).

Discussion
----------

[Messenger] New messenger:stop-workers Command

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | Kinda of #29451
| License       | MIT
| Doc PR        | symfony/symfony-docs#11236

o/ me again.

This requires and is built on top of #30708

When you deploy, all workers need to be stopped and restarted. That's not currently possible, unless you manually track the pids and send a SIGTERM signal. We can make that much easier :).

Now run:

```
bin/console messenger:stop-workers
```

And it will signal to all workers (even if they're distributed on other servers) that they should stop, once they finish processing their current message. This is done via a key in `cache.app`.

Cheers!

Commits
-------

5897162 [Messenger] New messenger:stop-workers Command
@nicolas-grekas nicolas-grekas modified the milestones: next, 4.3 Apr 30, 2019
@Tobion
Copy link
Contributor

Tobion commented May 6, 2019

Just thinking out loud for reference: The prioritized transports can be used to prioritize messages by publishing messages to different transports and consuming them in order. But it's a different way than rabbitmq uses message priority. If you want to use rabbitmq priorities instead, you can do that by setting the priority attribute on the AmqpStamp as introduced in #30913

@fabpot fabpot mentioned this pull request May 9, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants