Skip to content

[Messenger] Allow to limit consumer to specific queues #38973

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/composer-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"preferred-install": {
"symfony/form": "source",
"symfony/http-kernel": "source",
"symfony/messenger": "source",
"symfony/notifier": "source",
"symfony/validator": "source",
"*": "dist"
Expand Down
3 changes: 2 additions & 1 deletion src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ CHANGELOG
5.3
---

* Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0.
* Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0.
* `AmqpReceiver` implements `QueueReceiverInterface` to fetch messages from a specific set of queues.

5.2.0
-----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

Expand All @@ -25,7 +25,7 @@
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface
{
private $serializer;
private $connection;
Expand All @@ -41,7 +41,15 @@ public function __construct(Connection $connection, SerializerInterface $seriali
*/
public function get(): iterable
{
foreach ($this->connection->getQueueNames() as $queueName) {
yield from $this->getFromQueues($this->connection->getQueueNames());
}

/**
* {@inheritdoc}
*/
public function getFromQueues(array $queueNames): iterable
{
foreach ($queueNames as $queueName) {
yield from $this->getEnvelope($queueName);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/Messenger/Bridge/Amqp/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"require": {
"php": ">=7.2.5",
"symfony/deprecation-contracts": "^2.1",
"symfony/messenger": "^5.1"
"symfony/messenger": "^5.3"
},
"require-dev": {
"symfony/event-dispatcher": "^4.4|^5.0",
Expand Down
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CHANGELOG
---

* `InMemoryTransport` can perform message serialization through dsn `in-memory://?serialize=true`.
* Added `queues` option to `Worker` to only fetch messages from a specific queue from a receiver implementing `QueueReceiverInterface`.

5.2.0
-----
Expand Down
13 changes: 11 additions & 2 deletions src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ protected function configure(): void
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can handle new messages'),
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
])
->setDescription(self::$defaultDescription)
->setHelp(<<<'EOF'
Expand Down Expand Up @@ -104,6 +105,10 @@ protected function configure(): void
messages didn't originate from Messenger:

<info>php %command.full_name% <receiver-name> --bus=event_bus</info>

Use the --queues option to limit a receiver to only certain queues (only supported by some receivers):

<info>php %command.full_name% <receiver-name> --queues=fasttrack</info>
EOF
)
;
Expand Down Expand Up @@ -195,9 +200,13 @@ protected function execute(InputInterface $input, OutputInterface $output)
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;

$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
$worker->run([
$options = [
'sleep' => $input->getOption('sleep') * 1000000,
]);
];
if ($queues = $input->getOption('queues')) {
$options['queues'] = $queues;
}
$worker->run($options);

return 0;
}
Expand Down
37 changes: 37 additions & 0 deletions src/Symfony/Component/Messenger/Tests/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
Expand Down Expand Up @@ -245,6 +247,41 @@ public function testWorkerWithMultipleReceivers()
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
}

public function testWorkerLimitQueues()
{
$envelope = [new Envelope(new DummyMessage('message1'))];
$receiver = $this->createMock(QueueReceiverInterface::class);
$receiver->expects($this->once())
->method('getFromQueues')
->with(['foo'])
->willReturn($envelope)
;
$receiver->expects($this->never())
->method('get')
;

$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();

$dispatcher = new EventDispatcher();
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));

$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
$worker->run(['queues' => ['foo']]);
}

public function testWorkerLimitQueuesUnsupported()
{
$receiver1 = $this->createMock(QueueReceiverInterface::class);
$receiver2 = $this->createMock(ReceiverInterface::class);

$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();

$worker = new Worker(['transport1' => $receiver1, 'transport2' => $receiver2], $bus);
$this->expectException(RuntimeException::class);
$this->expectExceptionMessage(sprintf('Receiver for "transport2" does not implement "%s".', QueueReceiverInterface::class));
$worker->run(['queues' => ['foo']]);
}

public function testWorkerMessageReceivedEventMutability()
{
$envelope = new Envelope(new DummyMessage('Hello'));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Receiver;

use Symfony\Component\Messenger\Envelope;

/**
* Some transports may have multiple queues. This interface is used to read from only some queues.
*
* @author David Buchmann <mail@davidbu.ch>
*
* @experimental in 5.3
*/
interface QueueReceiverInterface extends ReceiverInterface
{
/**
* Get messages from the specified queue names instead of consuming from all queues.
*
* @param string[] $queueNames
*
* @return Envelope[]
*/
public function getFromQueues(array $queueNames): iterable;
}
19 changes: 18 additions & 1 deletion src/Symfony/Component/Messenger/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

Expand Down Expand Up @@ -57,6 +59,7 @@ public function __construct(array $receivers, MessageBusInterface $bus, EventDis
*
* Valid options are:
* * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
* * queues: The queue names to consume from, instead of consuming from all queues. When this is used, all receivers must implement the QueueReceiverInterface
*/
public function run(array $options = []): void
{
Expand All @@ -65,11 +68,25 @@ public function run(array $options = []): void
$options = array_merge([
'sleep' => 1000000,
], $options);
$queueNames = $options['queues'] ?? false;

if ($queueNames) {
// if queue names are specified, all receivers must implement the QueueReceiverInterface
foreach ($this->receivers as $transportName => $receiver) {
if (!$receiver instanceof QueueReceiverInterface) {
throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class));
}
}
}

while (false === $this->shouldStop) {
$envelopeHandled = false;
foreach ($this->receivers as $transportName => $receiver) {
$envelopes = $receiver->get();
if ($queueNames) {
$envelopes = $receiver->getFromQueues($queueNames);
} else {
$envelopes = $receiver->get();
}

foreach ($envelopes as $envelope) {
$envelopeHandled = true;
Expand Down