Skip to content

[Messenger] Be able to start a worker for multiple queues with custom consumption priorities #45882

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
d-ph opened this issue Mar 29, 2022 · 22 comments · May be fixed by #47602
Open

[Messenger] Be able to start a worker for multiple queues with custom consumption priorities #45882

d-ph opened this issue Mar 29, 2022 · 22 comments · May be fixed by #47602

Comments

@d-ph
Copy link

d-ph commented Mar 29, 2022

Description

Hello,

The documentation says:

You can (...) instruct one worker to handle messages in a priority order:

> php bin/console messenger:consume async_priority_high async_priority_low

The worker will always first look for messages waiting on async_priority_high. If there are none, then it will consume messages from async_priority_low.

I would like to propose that a way to run the messenger:consume worker is added, where the end developer can define the priorities by which the worker should consume queue tasks (in particular: define that two or more queues have the same priority). Consider the following messenger:consume invocation that shows the proposed feature in practice:

php bin/console messenger:consume queue_a^1 queue_b^1 queue_b_low^2 queue_b_lowest^3 queue_c^1

That invocation would start a worker that consume tasks in the following priority:

  1. (queue_a, queue_b, queue_c)
  2. queue_b_low
  3. queue_b_lowest

It all boils down to the following if-statement in the code: link. The worker should continue to consume tasks from the same "priority group level" before breaking the consumption loop (because there was at least one task consumed from that group).

Example

A few more examples on top of the one mentioned above:

php bin/console messenger:consume queue_a queue_b queue_c

This would be implicitly equivalent to

php bin/console messenger:consume queue_a^1 queue_b^2 queue_c^3
php bin/console messenger:consume queue_a^1 queue_b^1 queue_b_low^2 queue_b_lowest^3 queue_c^1 queue_c_low^2 queue_d^10

This would start the worker with the following "priority groups":

  1. (queue_a, queue_b, queue_c)
  2. (queue_b_low, queue_c_low)
  3. queue_b_lowest
  4. queue_d
@Jeroeny
Copy link
Contributor

Jeroeny commented Apr 4, 2022

I have also found the need for more precise prioritisation of queue-consuming. See this PR #44294. It's quite different, but perhaps these efforts can be combined somehow?

@ThomasBeauchataud
Copy link

ThomasBeauchataud commented May 12, 2022

There is two points here :

  1. Being able to add a custom priority in the command
  2. Being able to run create on order with all receivers on the same priority

My proposition for the point 1

Reordering receivers by parsing their name and priorities in the Command if the receivers argument contains a transport with a custom priority

My proposition for the point 2

Allow a strategy mixed (thanks to the PR #44294) to manage priorities and order with receivers having the same priorities.
The solution would be to reorgonise receivers list after each handled message as follow :

  • Initial transport list with their priority [transportName_1 => 1, transportName_2 => 1, transportName_3 => 1, transportName_4 => 2, transportName_5 => 3]
  • If the transportName_1 handle a message, we reorder the receivers list [transportName_2 => 1, transportName_3 => 1, transportName_1 => 1, transportName_4 => 2, transportName_5 => 3]

@buffcode
Copy link
Contributor

I am no supporter of priorities, but rather weights.

A configuration of queueA => 5, queueB => 5, queueC => 10 has a total weight of 20. queueA and queueB have the same weight (priority), but queueC has double the weight. As such queueC will be polled in 50% (10 / 20 = 0.5) of the polls the worker will emit. A and B will be polled in 25% (5 / 20 = 0.25) of the time. This avoids starvation of A and B when C is a very long queue or takes a lot of time.

That being said, I don't favor putting either priorities or weights on the command line but rather implement this in a custom Transport / Strategy.

@d-ph
Copy link
Author

d-ph commented Jul 5, 2022

Fair point @buffcode regarding the potential queue starvation. I would like to inquiry about your proposition to implement the solution as a custom Transport/Strategy.

Hi, @upyx. I would like to discuss the feedback you provided in this ticket. Like buffcode above, you proposed using a custom Transport or Strategy.

A question to both of you: could you demonstrate to me what it would take to make the Worker execute/process the queue tasks in a priority order mentioned in my original post, please? I.e.:

  1. (queue_a, queue_b, queue_c)
  2. queue_b_low
  3. queue_b_lowest

What exactly is meant by the proposition to introduce a custom Transport/Strategy?

I would like to add that I personally (and arguably, not only me) seek a solution where I can make the Worker(s) execute tasks in the priority I require by a mere change in supervisord.conf config file. I.e. by a mere change in the desired Workers' cli invocation. Let's picture that I have 5 workers across 3 machines, all of which (workers) are meant to consume tasks in different order. Does your proposition involve 5 different Transport php classes?

Thank you.

@upyx
Copy link
Contributor

upyx commented Jul 6, 2022

Hi there!

As was mentioned, there are many queue consuming strategies: priorities, grouped priorities, weights, counters, etc. It's hardly possible to implement all of them at the same time. And it isn't necessary. We can provide some abstraction for that, and let users write their own logic.

I'll try to make a proof of concept in a week.

@d-ph
Copy link
Author

d-ph commented Jul 6, 2022

Hello,

Yes, an abstraction for retrieving tasks would satisfy all parties, I agree. Perhaps the following code region could be "extracted" and adapted into an abstract method of getNextEnvelope(): ?Envelope, which would be called for as long as an Envelope is being returned. If null gets returned, then the worker would execute its idle sleep code, etc. I haven't thought this idea through to a great extent, so perhaps it should be treated as food for thought only.

Thanks for looking into coming up with a proof of concept of a solution. Much appreciated and I'm looking forward.

@upyx
Copy link
Contributor

upyx commented Jul 14, 2022

@d-ph :

... Perhaps the following code region could be "extracted" ...

Yes! It's the main idea.

I've wrote some code to explain the concept deepper. I haven't run it 🙂

The OrderedReceiverAggregate is the aggregate of receivers.
The OrderedPollerInterface it's the interfaces for strategies that should be implemented to change the order of consuming. It passes to the OrderedReceiverAggregate.
The PriorityPoller - the default implementation, just like does Worker. Originally, Worker had one receiver. It would be good idea to replace array by an aggregate.
The WeightPoller - the alternative implementation with randomly choosing receiver by it's weight. I'm not sure was that meant by @buffcode 🤷‍♂️

So, what should be done to implement custom ordering:

  • Implement the OrderedPollerInterface
  • Extend or write a factory to inject that implementation in an aggregate
  • Configure services
  • Configure messenger or...
  • Optionally, extend or write a command to run a configured worker by command arguments instead of messenger.yaml

Uh, a lot of work... However, most of it can be automated by compile passes.

d-ph pushed a commit to d-ph/symfony that referenced this issue Jul 29, 2022
1. Not tested
2. No error handling.
@d-ph
Copy link
Author

d-ph commented Jul 29, 2022

Hi @upyx.

I finally have found a bit of time to check your solution. Apologies for the delay.

Now I see what you meant by an aggregate of receivers. The solution uses the Aggregator Pattern, similar to how on a filesystem both a file and a directory are considered a file (the second being a collection/aggregate of files).

Correct me if I'm wrong but the solution appears to eagerly "acquire" all pending envelopes from a single receiver due to the fact that this line in Worker.php immediately asks the (new) aggregate receiver to obtain a complete array of envelopes. I like how the OrderedPollerInterface.php allows the poller to use a php Generator to obtain envelopes one-at-a-time, however due to how the Worker receives those envelopes, the Poller would be requested to produce a complete array of envelopes in one go, beating the purpose of using the Generator. It perhaps goes without saying, that acquiring all pending tasks into the worker is undesirable.

On top of that, and excuse me for being frank here, I dislike how complex the proposed solution is.

  1. It requires a completely new queue transport to be added and configured in messenger. I didn't fully grasp this idea until I saw the code. It really seems like an overkill. Excuse me for the analogy, but it seems like offering a cow, when a glass of a milk was asked for.
  2. It introduces a conceptual tree structure of receivers. I won't try to hide that I'm a simple person, and mentally working with this idea is very burdensome to me. Please note that this isn't a black box piece of code. Devs (like me) will be running xdebug on this code, and having to remember that in the worker's "main loop" a receiver might be a single receiver or an aggregate of receivers, is a challenge.

I spent a few moments to propose a solution that uses the Strategy Pattern. All it does is offering a way for a dev to replace the procedure inside the worker's "main loop". Most importantly:

  1. It allows to choose the strategy and configure it in the cli command invocation. To me, such ad-hoc-ness is a big plus.
  2. It seems orders of magnitude simpler conceptually. It literally just makes the "main loop" variable. If I may quote a dev here: "Sometimes all one needs is simply a public static function()".

Again, apologies for making it difficult. Looking forward to your answers and opinion.

@upyx
Copy link
Contributor

upyx commented Jul 30, 2022

@d-ph you are in the right way.

And yes, using aggregation here is slightly overkill 🙁 But the reason for the high complexity is overcomplicated Middawares and the Worker with features like "redo failured messages". Originally, it looked simple, and it would be nice if it still looked simple. But... we wanted new features, and forgot about the architecture 🥲

From the current point, your solution is better. I have a tricky plan to improve Messenger internals. It will take a while and I don't know how the community will react. I've started here #47090

Correct me if I'm wrong but the solution appears to eagerly "acquire" all pending envelopes from a single receiver due to the fact that this line in Worker.php immediately asks the (new) aggregate receiver to obtain a complete array of envelopes

Nested generators paused on every received message and continued after the message had handled. "This line" gets a Generator that will be iterated later in foreach loop.

@d-ph
Copy link
Author

d-ph commented Aug 11, 2022

It will take a while and I don't know how the community will react.

Ok. So shall we wait for the resolution of your PR (#47090), before we attempt to move on with this ticket? I'm personally not in a great hurry, so can wait a few weeks.

Nested generators pause on every received message and continue after the message had handled. "This line" gets a Generator that will be iterated later in foreach loop.

Yes, you're absolutely right. My bad.

@upyx
Copy link
Contributor

upyx commented Aug 23, 2022

@d-ph I've closed #47090, it has no positive feedback.

@d-ph
Copy link
Author

d-ph commented Aug 30, 2022

@upyx

Tough luck.

So would you say that the solution proposed in my PR is sound for time being (assuming nobody else raises any further concerns of course)?

@upyx
Copy link
Contributor

upyx commented Sep 1, 2022

@d-ph The proposed PRs #44294 and #46334 are not flexible, they hardcoded few intricate variants that can't be extended. I like your PoC, it solves the problem without big drawbacks.

@d-ph
Copy link
Author

d-ph commented Sep 2, 2022

@upyx

Thanks for feedback.

@buffcode

May I ask you to share your opinion on the proposed change, please? The bottom line is that one would be able to start a worker like this:

bin/console messenger:consume queue_a queue_b queue_b_low queue_b_lowest queue_c \
   --strategy="com.symfony.ranked" \
   --strategy-config="{\"ranks\": [1, 1, 2, 3, 1]}"

And it would result in the following "task consumption pattern":

1. (queue_a, queue_b, queue_c)
2. queue_b_low
3. queue_b_lowest

That implementation:

  1. Does not introduce a new cli syntax (i.e. the caret "^").
  2. Allows for anyone to add new strategies, including a weight-based (that prevents queue task starvation).

Would you say that implementation looks alright to you?

Thank you.

@buffcode
Copy link
Contributor

buffcode commented Sep 5, 2022

I like the proposition, because it seems non-BC and open to custom implementations. What seems to be missing is a
CompilerPass / auto-registration of WorkerExecutionStrategyInterface for WorkerExecutionStrategyRegistry in order to throw in a custom strategy.

I dislike using JSON on the CLI but that's a personal flavor and I don't have a better idea rather than just passing down the option string to the strategies and let them handle it for themselves (maybe the strategy doesn't have a complex configuration or just a single option).

In the future it might be interesting to pass a reference to a fixed config (for switching between pre-configured configurations without writing or updating them on CLI) or even config provider, so those configurations can become truly dynamic (eg. when the system load is high, exclude the processing of some queues but keep others running).

@d-ph
Copy link
Author

d-ph commented Sep 7, 2022

@buffcode

What seems to be missing is a CompilerPass

Yeah, it's just a proof at this stage. I'll add to the MessengerPass and FrameworkBundle's resources/ next. I still need to see if it works when actually used in a project, so that's also that.

using JSON on the CLI

I had the same qualms with it (especially having to escape the quotation marks). Json is still, however, a well-adopted (in webdev) format, and it does solve the problem of "how to pass any hashmap to the command as one argument". Your points about passing any string to the strategy as its options, so it may decide on how to deserialise it, is a valid point. I guess going with json enforces a predictable standard (with its own cost). Yeah, a grey area here -- not worthwhile a long debate, when there's already a consensus.

to pass a reference to a fixed config (...) or even config provider

Good ideas. I personally see little value in the fixed config proposition, since in my mind the strategy options for each invocation of a worker would be "bespoke"/custom for that worker, but I certainly see value in the config provider idea that would consume tasks from multiple queues based on system load and other runtime parameters.

Thank you for your feedback.

@Jeroeny, @ThomasBeauchataud

Hi guys,

Just a quick question to you two, if you have the time. May I know if my proposition satisfy your use cases?

@Jeroeny
Copy link
Contributor

Jeroeny commented Sep 7, 2022

Hi, I definitely like your proposition (better than I had), so I hope that can be shaped into a nice PR.

I also like the approach of a reference to a fixed config or config provider (the same way the consume argument references a preconfigured receiver).
It could even be on the same level as a receiver. Because when you've preconfigured a strategy, it's likely to have already defined/know the receivers for that. A command would become for example messenger:consume my_strategy.

d-ph pushed a commit to d-ph/symfony that referenced this issue Sep 16, 2022
d-ph pushed a commit to d-ph/symfony that referenced this issue Sep 16, 2022
d-ph pushed a commit to d-ph/symfony that referenced this issue Dec 9, 2022
…es with custom consumption priorities. Replay symfony#47209 on top of this code change.
@carsonbot
Copy link

Thank you for this suggestion.
There has not been a lot of activity here for a while. Would you still like to see this feature?

@buffcode
Copy link
Contributor

buffcode commented Mar 8, 2023

@carsonbot Yes, still interested

@carsonbot carsonbot removed the Stalled label Mar 8, 2023
@carsonbot
Copy link

Thank you for this suggestion.
There has not been a lot of activity here for a while. Would you still like to see this feature?

@buffcode
Copy link
Contributor

@carsonbot Yes

@carsonbot carsonbot removed the Stalled label Sep 11, 2023
@andrey-bondar
Copy link

@carsonbot +1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants