Skip to content

[Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports #30708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ CHANGELOG
to the `Envelope` then find the correct bus when receiving from
the transport. See `ConsumeMessagesCommand`.
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
`ack()` and `reject()`.
* [BC BREAK] 3 new methods were added to `ReceiverInterface`:
`ack()`, `reject()` and `get()`. The methods `receive()`
and `stop()` were removed.
* [BC BREAK] Error handling was moved from the receivers into
`Worker`. Implementations of `ReceiverInterface::handle()`
should now allow all exceptions to be thrown, except for transport
Expand All @@ -24,7 +25,9 @@ CHANGELOG
* The default command name for `ConsumeMessagesCommand` was
changed from `messenger:consume-messages` to `messenger:consume`
* `ConsumeMessagesCommand` has two new optional constructor arguments
* `Worker` has 4 new option constructor arguments.
* [BC BREAK] The first argument to Worker changed from a single
`ReceiverInterface` to an array of `ReceiverInterface`.
* `Worker` has 3 new optional constructor arguments.
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
receiver no longer needs to call this.
* The `AmqpSender` will now retry messages using a dead-letter exchange
Expand Down
77 changes: 50 additions & 27 deletions src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

/**
Expand Down Expand Up @@ -70,10 +71,11 @@ protected function configure(): void

$this
->setDefinition([
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
])
->setDescription('Consumes messages')
Expand All @@ -82,6 +84,10 @@ protected function configure(): void

<info>php %command.full_name% <receiver-name></info>

To receive from multiple transports, pass each name:

<info>php %command.full_name% receiver1 receiver2</info>

Use the --limit option to limit the number of messages received:

<info>php %command.full_name% <receiver-name> --limit=10</info>
Expand Down Expand Up @@ -111,16 +117,22 @@ protected function interact(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);

if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
if (null === $receiverName) {
$io->block('Missing receiver argument.', null, 'error', ' ', true);
$input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames));
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
$io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
$input->setArgument('receiver', $alternatives[0]);
}
if ($this->receiverNames && 0 === \count($input->getArgument('receivers'))) {
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);

$io->writeln('Choose which receivers you want to consume messages from in order of priority.');
if (\count($this->receiverNames) > 1) {
$io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
}

$question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
$question->setMultiselect(true);

$input->setArgument('receivers', $io->askQuestion($question));
}

if (0 === \count($input->getArgument('receivers'))) {
throw new RuntimeException('Please pass at least one receiver.');
}
}

Expand All @@ -135,41 +147,51 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$output->writeln(sprintf('<comment>%s</comment>', $message));
}

if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
}
$receivers = [];
$retryStrategies = [];
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) {
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
if ($this->receiverNames) {
$message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
}

if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
}
throw new RuntimeException($message);
}

$receiver = $this->receiverLocator->get($receiverName);
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
}

$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
}

if (null !== $input->getOption('bus')) {
$bus = $this->busLocator->get($input->getOption('bus'));
} else {
$bus = new RoutableMessageBus($this->busLocator);
}

$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages";
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
$worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
}

if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
$worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
}

if ($timeLimit = $input->getOption('time-limit')) {
$stopsWhen[] = "been running for {$timeLimit}s";
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
}

$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));

if ($stopsWhen) {
$last = array_pop($stopsWhen);
Expand All @@ -183,8 +205,9 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}

$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
$worker->run();
$worker->run([
'sleep' => $input->getOption('sleep') * 1000000,
]);
}

private function convertToBytes(string $memoryLimit): int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,8 @@ class ConsumeMessagesCommandTest extends TestCase
public function testConfigurationWithDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp']);
$inputArgument = $command->getDefinition()->getArgument('receiver');
$inputArgument = $command->getDefinition()->getArgument('receivers');
$this->assertFalse($inputArgument->isRequired());
$this->assertSame('amqp', $inputArgument->getDefault());
}

public function testConfigurationWithoutDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp', 'dummy']);
$inputArgument = $command->getDefinition()->getArgument('receiver');
$this->assertTrue($inputArgument->isRequired());
$this->assertNull($inputArgument->getDefault());
$this->assertSame(['amqp'], $inputArgument->getDefault());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,9 @@ public function __invoke(DummyMessage $message): void

class DummyReceiver implements ReceiverInterface
{
public function receive(callable $handler): void
public function get(): iterable
{
for ($i = 0; $i < 3; ++$i) {
$handler(new Envelope(new DummyMessage("Dummy $i")));
}
yield new Envelope(new DummyMessage('Dummy'));
}

public function stop(): void
Expand Down

This file was deleted.

46 changes: 46 additions & 0 deletions src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

namespace Symfony\Component\Messenger\Tests\Fixtures;

use Symfony\Component\Messenger\WorkerInterface;

class DummyWorker implements WorkerInterface
{
private $isStopped = false;
private $envelopesToReceive;
private $envelopesHandled = 0;

public function __construct(array $envelopesToReceive)
{
$this->envelopesToReceive = $envelopesToReceive;
}

public function run(array $options = [], callable $onHandledCallback = null): void
{
foreach ($this->envelopesToReceive as $envelope) {
if (true === $this->isStopped) {
break;
}

if ($onHandledCallback) {
$onHandledCallback($envelope);
++$this->envelopesHandled;
}
}
}

public function stop(): void
{
$this->isStopped = true;
}

public function isStopped(): bool
{
return $this->isStopped;
}

public function countEnvelopesHandled()
{
return $this->envelopesHandled;
}
}
Loading