From 257ec8746542a73c6e0620a40cf8697ba441b4a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Pineau?= Date: Wed, 23 Dec 2020 17:57:53 +0100 Subject: [PATCH] [Messenger] Added StopWorkerException --- .../Resources/config/messenger.php | 4 ++ .../Bundle/FrameworkBundle/composer.json | 4 +- src/Symfony/Component/Messenger/CHANGELOG.md | 5 ++ ...topWorkerOnCustomStopExceptionListener.php | 57 +++++++++++++++++++ .../Exception/StopWorkerException.php | 23 ++++++++ .../StopWorkerExceptionInterface.php | 19 +++++++ ...orkerOnCustomStopExceptionListenerTest.php | 57 +++++++++++++++++++ 7 files changed, 167 insertions(+), 2 deletions(-) create mode 100644 src/Symfony/Component/Messenger/EventListener/StopWorkerOnCustomStopExceptionListener.php create mode 100644 src/Symfony/Component/Messenger/Exception/StopWorkerException.php create mode 100644 src/Symfony/Component/Messenger/Exception/StopWorkerExceptionInterface.php create mode 100644 src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnCustomStopExceptionListenerTest.php diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index 61a993b255174..d7953122fbe51 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -20,6 +20,7 @@ use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnCustomStopExceptionListener; use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener; use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener; use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware; @@ -193,6 +194,9 @@ ->set('messenger.listener.stop_worker_on_sigterm_signal_listener', StopWorkerOnSigtermSignalListener::class) ->tag('kernel.event_subscriber') + ->set('messenger.listener.stop_worker_on_stop_exception_listener', StopWorkerOnCustomStopExceptionListener::class) + ->tag('kernel.event_subscriber') + ->set('messenger.routable_message_bus', RoutableMessageBus::class) ->args([ abstract_arg('message bus locator'), diff --git a/src/Symfony/Bundle/FrameworkBundle/composer.json b/src/Symfony/Bundle/FrameworkBundle/composer.json index d72d6cb29e31b..fe597ca11303e 100644 --- a/src/Symfony/Bundle/FrameworkBundle/composer.json +++ b/src/Symfony/Bundle/FrameworkBundle/composer.json @@ -48,7 +48,7 @@ "symfony/http-client": "^4.4|^5.0|^6.0", "symfony/lock": "^4.4|^5.0|^6.0", "symfony/mailer": "^5.2|^6.0", - "symfony/messenger": "^5.2|^6.0", + "symfony/messenger": "^5.4|^6.0", "symfony/mime": "^4.4|^5.0|^6.0", "symfony/notifier": "^5.3|^6.0", "symfony/allmysms-notifier": "^5.3|^6.0", @@ -115,7 +115,7 @@ "symfony/form": "<5.2", "symfony/lock": "<4.4", "symfony/mailer": "<5.2", - "symfony/messenger": "<4.4", + "symfony/messenger": "<5.4", "symfony/mime": "<4.4", "symfony/property-info": "<4.4", "symfony/property-access": "<5.3", diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 381e88c4b8783..fed0bc8b76f93 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -1,6 +1,11 @@ CHANGELOG ========= +5.4 +--- + + * Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker. + 5.3 --- diff --git a/src/Symfony/Component/Messenger/EventListener/StopWorkerOnCustomStopExceptionListener.php b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnCustomStopExceptionListener.php new file mode 100644 index 0000000000000..ee8daec763459 --- /dev/null +++ b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnCustomStopExceptionListener.php @@ -0,0 +1,57 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\EventListener; + +use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Exception\HandlerFailedException; +use Symfony\Component\Messenger\Exception\StopWorkerExceptionInterface; + +/** + * @author Grégoire Pineau + */ +class StopWorkerOnCustomStopExceptionListener implements EventSubscriberInterface +{ + private $stop = false; + + public function onMessageFailed(WorkerMessageFailedEvent $event): void + { + $th = $event->getThrowable(); + if ($th instanceof StopWorkerExceptionInterface) { + $this->stop = true; + } + if ($th instanceof HandlerFailedException) { + foreach ($th->getNestedExceptions() as $e) { + if ($e instanceof StopWorkerExceptionInterface) { + $this->stop = true; + break; + } + } + } + } + + public function onWorkerRunning(WorkerRunningEvent $event): void + { + if ($this->stop) { + $event->getWorker()->stop(); + } + } + + public static function getSubscribedEvents(): array + { + return [ + WorkerMessageFailedEvent::class => 'onMessageFailed', + WorkerRunningEvent::class => 'onWorkerRunning', + ]; + } +} diff --git a/src/Symfony/Component/Messenger/Exception/StopWorkerException.php b/src/Symfony/Component/Messenger/Exception/StopWorkerException.php new file mode 100644 index 0000000000000..c2100c28d8fb4 --- /dev/null +++ b/src/Symfony/Component/Messenger/Exception/StopWorkerException.php @@ -0,0 +1,23 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Exception; + +/** + * @author Grégoire Pineau + */ +class StopWorkerException extends RuntimeException implements StopWorkerExceptionInterface +{ + public function __construct(string $message = 'Worker should stop.', ?\Throwable $previous = null) + { + parent::__construct($message, 0, $previous); + } +} diff --git a/src/Symfony/Component/Messenger/Exception/StopWorkerExceptionInterface.php b/src/Symfony/Component/Messenger/Exception/StopWorkerExceptionInterface.php new file mode 100644 index 0000000000000..16019e87180e1 --- /dev/null +++ b/src/Symfony/Component/Messenger/Exception/StopWorkerExceptionInterface.php @@ -0,0 +1,19 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Exception; + +/** + * @author Grégoire Pineau + */ +interface StopWorkerExceptionInterface extends \Throwable +{ +} diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnCustomStopExceptionListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnCustomStopExceptionListenerTest.php new file mode 100644 index 0000000000000..9450567bedb7f --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnCustomStopExceptionListenerTest.php @@ -0,0 +1,57 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\EventListener; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\EventListener\StopWorkerOnCustomStopExceptionListener; +use Symfony\Component\Messenger\Exception\HandlerFailedException; +use Symfony\Component\Messenger\Exception\StopWorkerException; +use Symfony\Component\Messenger\Exception\StopWorkerExceptionInterface; +use Symfony\Component\Messenger\Worker; + +class StopWorkerOnCustomStopExceptionListenerTest extends TestCase +{ + public function provideTests(): \Generator + { + yield 'it should not stop (1)' => [new \Exception(), false]; + yield 'it should not stop (2)' => [new HandlerFailedException(new Envelope(new \stdClass()), [new \Exception()]), false]; + + $t = new class() extends \Exception implements StopWorkerExceptionInterface {}; + yield 'it should stop with custom exception' => [$t, true]; + yield 'it should stop with core exception' => [new StopWorkerException(), true]; + + yield 'it should stop with custom exception wrapped (1)' => [new HandlerFailedException(new Envelope(new \stdClass()), [new StopWorkerException()]), true]; + yield 'it should stop with custom exception wrapped (2)' => [new HandlerFailedException(new Envelope(new \stdClass()), [new \Exception(), new StopWorkerException()]), true]; + yield 'it should stop with core exception wrapped (1)' => [new HandlerFailedException(new Envelope(new \stdClass()), [$t]), true]; + yield 'it should stop with core exception wrapped (2)' => [new HandlerFailedException(new Envelope(new \stdClass()), [new \Exception(), $t]), true]; + } + + /** @dataProvider provideTests */ + public function test(\Throwable $throwable, bool $shouldStop) + { + $listener = new StopWorkerOnCustomStopExceptionListener(); + + $envelope = new Envelope(new \stdClass()); + $failedEvent = new WorkerMessageFailedEvent($envelope, 'my_receiver', $throwable); + + $listener->onMessageFailed($failedEvent); + + $worker = $this->createMock(Worker::class); + $worker->expects($shouldStop ? $this->once() : $this->never())->method('stop'); + $runningEvent = new WorkerRunningEvent($worker, false); + + $listener->onWorkerRunning($runningEvent); + } +}