-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Add per-message priority #41574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 7.4
Are you sure you want to change the base?
Conversation
Hey! I see that this is your first PR. That is great! Welcome! Symfony has a contribution guide which I suggest you to read. In short:
Review the GitHub status checks of your pull request and try to solve the reported issues. If some tests are failing, try to see if they are failing because of this change. When two Symfony core team members approve this change, it will be merged and you will become an official Symfony contributor! I am going to sit back now and wait for the reviews. Cheers! Carsonbot |
public function testSendWithDelay() | ||
{ | ||
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new DelayStamp(1000)); | ||
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; | ||
|
||
$serializer = $this->createMock(SerializerInterface::class); | ||
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); | ||
|
||
$connection = $this->createMock(Connection::class); | ||
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 1000); | ||
|
||
$sender = new AmqpSender($connection, $serializer); | ||
$sender->send($envelope); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really a part of this feature, but there was no test for DelayStamp in AMQP transport.
I see two ways to deal with lower/higher number issue:
I'd personally prefer the latter, but let's discuss it first. |
Took this one as far as I can, would appreciate reviews. RabbitMQ doesn't allow to change Still trying to figure out if this can be done for Redis adapter without BC breaks. |
a47be79
to
6232247
Compare
@carsonbot find me a reviewer please Status: needs review |
@X-Coder264 could maybe review this PR? |
Hey! I think @tyx has recently worked with this code. Maybe they can help review this? Cheers! Carsonbot |
Thank you for this PR! Have you tested the AMQP implementation? I've built almost the same implementation today (creating queues with priority option, decorated the MessageBusInterface to AmqpStamp certain messages based on their parameters). Consider the following:
The RabbitMQ documentation has a piece on how this is mostly expected behavior unless QoS is used or the prefetch count is reduced (https://www.rabbitmq.com/priority.html#interaction-with-consumers). In addition to that, the current Symfony documentation states that the prefetch count is deprecated since it has no effect: Setting it manually in Connection.php also does not fix this. I'm just adding a comment for now since I had no time to test your PR. Maybe I'm doing something completely wrong, but the implementation is very similar. Have you faced this issue as well? |
Hi @rjhllr , thanks for highlighting this possible issue. As far as I can tell by I've tried to emulate your scenario using my PR's branch: class MyEvent
{
public $priorityAsText;
public function __construct(string $priorityAsText)
{
$this->priorityAsText = $priorityAsText;
}
} class MyEventHandler implements MessageHandlerInterface
{
public function __invoke(MyEvent $event)
{
sleep(1);
dump($event->priorityAsText);
}
} class MyEventSpawnCommand extends Command
{
// <...>
protected function execute(InputInterface $input, OutputInterface $output): int
{
for ($i = 0; $i < 5; $i++) {
$event = new MyEvent("LOW");
$this->eventBus->dispatch($event);
}
sleep(3);
for ($i = 0; $i < 5; $i++) {
$event = new MyEvent("HIGH");
$this->eventBus->dispatch($event, [new PriorityStamp(3)]);
}
return Command::SUCCESS;
}
} First I start
But I'm not sure I understood the "each message takes 1s to complete" correctly. Does this test cover your case? |
738a0f4
to
c862a50
Compare
c862a50
to
7df8ee2
Compare
Hi @sroze , you're marked as a code owner, could you take a look at this one, please? |
7df8ee2
to
f332667
Compare
f332667
to
43e7ee8
Compare
43e7ee8
to
0cf3e57
Compare
Hi @OskarStark , Sorry to bother you, but maybe you could help me to find someone to review this feature. Regarding the checks, fabbot is asking for a dot after colon, I believe that's incorrect. And I'm not quite sure what happened with PHPUnit 8.0 low-deps suite. |
Hi You can ignore the test for now. And fabbot is indeed a false positive. |
c05887a
to
6cd62b6
Compare
673d7a0
to
f7613e2
Compare
f7613e2
to
38a7364
Compare
Rebased to 6.2, but I guess it still needs a review |
Todo
Usage example
Problems
These two transports have quite different priority systems.
RabbitMQ:
x-max-priority
to be set when queue is created, you can't change it later0 .. 255
Beanstalkd: