From 4f174a70e786e7fac8b866a7ecc3f0e3c963916c Mon Sep 17 00:00:00 2001 From: Mathias STRASSER Date: Thu, 8 Nov 2018 09:48:52 +0100 Subject: [PATCH] Add "force-consumption" option --- src/Symfony/Component/Messenger/CHANGELOG.md | 1 + .../Command/ConsumeMessagesCommand.php | 10 +++ .../Receiver/ForceConsumptionReceiverTest.php | 68 +++++++++++++++++++ .../Receiver/ForceConsumptionReceiver.php | 57 ++++++++++++++++ 4 files changed, 136 insertions(+) create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/Receiver/ForceConsumptionReceiverTest.php create mode 100644 src/Symfony/Component/Messenger/Transport/Receiver/ForceConsumptionReceiver.php diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index d1e85da795d8f..4a19e59dcbe2d 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -35,6 +35,7 @@ CHANGELOG * The `ContainerHandlerLocator`, `AbstractHandlerLocator`, `SenderLocator` and `AbstractSenderLocator` classes have been removed * `Envelope::all()` takes a new optional `$stampFqcn` argument and returns the stamps for the specified FQCN, or all stamps by their class name * `Envelope::get()` has been renamed `Envelope::last()` + * Add option `force-consumption` to force the consumption of messages even if an exception is thrown by a message handler 4.1.0 ----- diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index 83b834c593035..6067419def23b 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -20,6 +20,7 @@ use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; +use Symfony\Component\Messenger\Transport\Receiver\ForceConsumptionReceiver; use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver; use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver; use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver; @@ -66,6 +67,7 @@ protected function configure(): void new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'), new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'), new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName), + new InputOption('force-consumption', 'f', InputOption::VALUE_REQUIRED, 'Force the consumption of messages even if an exception is thrown by a message handler', false), )) ->setDescription('Consumes messages') ->setHelp(<<<'EOF' @@ -84,6 +86,10 @@ protected function configure(): void Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached: php %command.full_name% --time-limit=3600 + +Use the --force-consumption option to force the consumption of messages: + + php %command.full_name% --force-consumption EOF ) ; @@ -155,6 +161,10 @@ protected function execute(InputInterface $input, OutputInterface $output): void $receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger); } + if ($input->getOption('force-consumption')) { + $receiver = new ForceConsumptionReceiver($receiver, $this->logger); + } + $worker = new Worker($receiver, $bus); $worker->run(); } diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/ForceConsumptionReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/ForceConsumptionReceiverTest.php new file mode 100644 index 0000000000000..f8eff829c306c --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/ForceConsumptionReceiverTest.php @@ -0,0 +1,68 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\Receiver; + +use Exception; +use PHPUnit\Framework\TestCase; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Receiver\ForceConsumptionReceiver; + +class ForceConsumptionReceiverTest extends TestCase +{ + /** + * @dataProvider logProvider + */ + public function testReceiverDoesNotStopWhenExceptionIsThrown(bool $isLoggable) + { + $callable = function ($handler) { + $handler(new Envelope(new DummyMessage('API'))); + }; + + $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) + ->setConstructorArgs(array($callable)) + ->enableProxyingToOriginalMethods() + ->getMock() + ; + + $logger = null; + if ($isLoggable) { + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('alert') + ->with( + $this->equalTo('Receiver reached an exception: "{message}"'), + $this->equalTo(array('message' => 'my exception')) + ); + } + + $decoratedReceiver->expects($this->exactly(2))->method('receive'); + + $timeoutReceiver = new ForceConsumptionReceiver($decoratedReceiver, $logger); + $timeoutReceiver->receive( + function () { + throw new Exception('my exception'); + } + ); + + $timeoutReceiver->receive(function () {}); + } + + public function logProvider() + { + return array( + 'with log' => array(true), + 'without log' => array(false), + ); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/ForceConsumptionReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/ForceConsumptionReceiver.php new file mode 100644 index 0000000000000..40b55898a87db --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Receiver/ForceConsumptionReceiver.php @@ -0,0 +1,57 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Receiver; + +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; + +/** + * @author Mathias STRASSER + * + * @experimental in 4.2 + */ +final class ForceConsumptionReceiver implements ReceiverInterface +{ + private $decoratedReceiver; + private $logger; + + public function __construct(ReceiverInterface $decoratedReceiver, LoggerInterface $logger = null) + { + $this->decoratedReceiver = $decoratedReceiver; + $this->logger = $logger; + } + + public function receive(callable $handler): void + { + $this->decoratedReceiver->receive( + function (?Envelope $envelope) use ($handler) { + try { + $handler($envelope); + } catch (\Throwable $exception) { + if (null === $this->logger) { + return; + } + + $this->logger->alert( + 'Receiver reached an exception: "{message}"', + array('message' => $exception->getMessage()) + ); + } + } + ); + } + + public function stop(): void + { + $this->decoratedReceiver->stop(); + } +}