diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 937a9fcb4dd8d..76e839e7ce40e 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -32,6 +32,7 @@ CHANGELOG * Add `WrappedExceptionsInterface` interface for exceptions that hold multiple individual exceptions * Deprecate `HandlerFailedException::getNestedExceptions()`, `HandlerFailedException::getNestedExceptionsOfClass()` and `DelayedMessageHandlingException::getExceptions()` which are replaced by a new `getWrappedExceptions()` method + * Separate `WorkerRunningEvent` into `WorkerBusyEvent` and `WorkerIdleEvent` 6.3 --- diff --git a/src/Symfony/Component/Messenger/Event/WorkerBusyEvent.php b/src/Symfony/Component/Messenger/Event/WorkerBusyEvent.php new file mode 100644 index 0000000000000..feca10aabd4f4 --- /dev/null +++ b/src/Symfony/Component/Messenger/Event/WorkerBusyEvent.php @@ -0,0 +1,27 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Event; + +use Symfony\Component\Messenger\Worker; + +/** + * Dispatched after the worker processed a message. + * + * @author Jeroen Spee + */ +final class WorkerBusyEvent extends WorkerRunningEvent +{ + public function __construct(Worker $worker) + { + parent::__construct($worker, false); + } +} diff --git a/src/Symfony/Component/Messenger/Event/WorkerIdleEvent.php b/src/Symfony/Component/Messenger/Event/WorkerIdleEvent.php new file mode 100644 index 0000000000000..40e92292e88f0 --- /dev/null +++ b/src/Symfony/Component/Messenger/Event/WorkerIdleEvent.php @@ -0,0 +1,27 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Event; + +use Symfony\Component\Messenger\Worker; + +/** + * Dispatched after the worker didn't receive any message from its receivers. + * + * @author Jeroen Spee + */ +final class WorkerIdleEvent extends WorkerRunningEvent +{ + public function __construct(Worker $worker) + { + parent::__construct($worker, true); + } +} diff --git a/src/Symfony/Component/Messenger/Event/WorkerRunningEvent.php b/src/Symfony/Component/Messenger/Event/WorkerRunningEvent.php index 14be2ab0be780..f70b7532813a3 100644 --- a/src/Symfony/Component/Messenger/Event/WorkerRunningEvent.php +++ b/src/Symfony/Component/Messenger/Event/WorkerRunningEvent.php @@ -14,11 +14,11 @@ use Symfony\Component\Messenger\Worker; /** - * Dispatched after the worker processed a message or didn't receive a message at all. + * Events that are dispatched after the worker processed a message or didn't receive a message at all. * * @author Tobias Schultze */ -final class WorkerRunningEvent +class WorkerRunningEvent { private Worker $worker; private bool $isWorkerIdle; diff --git a/src/Symfony/Component/Messenger/EventListener/ResetServicesListener.php b/src/Symfony/Component/Messenger/EventListener/ResetServicesListener.php index 12bd1db4cca83..3869dddf21c86 100644 --- a/src/Symfony/Component/Messenger/EventListener/ResetServicesListener.php +++ b/src/Symfony/Component/Messenger/EventListener/ResetServicesListener.php @@ -13,7 +13,7 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter; -use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Event\WorkerBusyEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent; /** @@ -28,11 +28,9 @@ public function __construct(ServicesResetter $servicesResetter) $this->servicesResetter = $servicesResetter; } - public function resetServices(WorkerRunningEvent $event): void + public function resetServices(WorkerBusyEvent $event): void { - if (!$event->isWorkerIdle()) { - $this->servicesResetter->reset(); - } + $this->servicesResetter->reset(); } public function resetServicesAtStop(WorkerStoppedEvent $event): void @@ -43,7 +41,7 @@ public function resetServicesAtStop(WorkerStoppedEvent $event): void public static function getSubscribedEvents(): array { return [ - WorkerRunningEvent::class => ['resetServices', -1024], + WorkerBusyEvent::class => ['resetServices', -1024], WorkerStoppedEvent::class => ['resetServicesAtStop', -1024], ]; } diff --git a/src/Symfony/Component/Messenger/EventListener/StopWorkerOnFailureLimitListener.php b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnFailureLimitListener.php index 36f514be887dd..3b8acfb7c3b6d 100644 --- a/src/Symfony/Component/Messenger/EventListener/StopWorkerOnFailureLimitListener.php +++ b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnFailureLimitListener.php @@ -13,8 +13,8 @@ use Psr\Log\LoggerInterface; use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\Event\WorkerBusyEvent; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; -use Symfony\Component\Messenger\Event\WorkerRunningEvent; use Symfony\Component\Messenger\Exception\InvalidArgumentException; /** @@ -41,9 +41,9 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void ++$this->failedMessages; } - public function onWorkerRunning(WorkerRunningEvent $event): void + public function onWorkerBusy(WorkerBusyEvent $event): void { - if (!$event->isWorkerIdle() && $this->failedMessages >= $this->maximumNumberOfFailures) { + if ($this->failedMessages >= $this->maximumNumberOfFailures) { $this->failedMessages = 0; $event->getWorker()->stop(); @@ -55,7 +55,7 @@ public static function getSubscribedEvents(): array { return [ WorkerMessageFailedEvent::class => 'onMessageFailed', - WorkerRunningEvent::class => 'onWorkerRunning', + WorkerBusyEvent::class => 'onWorkerBusy', ]; } } diff --git a/src/Symfony/Component/Messenger/EventListener/StopWorkerOnMessageLimitListener.php b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnMessageLimitListener.php index 0f2b2a3f0b05b..f14a43b1ea7fc 100644 --- a/src/Symfony/Component/Messenger/EventListener/StopWorkerOnMessageLimitListener.php +++ b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnMessageLimitListener.php @@ -13,7 +13,7 @@ use Psr\Log\LoggerInterface; use Symfony\Component\EventDispatcher\EventSubscriberInterface; -use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Event\WorkerBusyEvent; use Symfony\Component\Messenger\Exception\InvalidArgumentException; /** @@ -36,9 +36,9 @@ public function __construct(int $maximumNumberOfMessages, LoggerInterface $logge } } - public function onWorkerRunning(WorkerRunningEvent $event): void + public function onWorkerBusy(WorkerBusyEvent $event): void { - if (!$event->isWorkerIdle() && ++$this->receivedMessages >= $this->maximumNumberOfMessages) { + if (++$this->receivedMessages >= $this->maximumNumberOfMessages) { $this->receivedMessages = 0; $event->getWorker()->stop(); @@ -49,7 +49,7 @@ public function onWorkerRunning(WorkerRunningEvent $event): void public static function getSubscribedEvents(): array { return [ - WorkerRunningEvent::class => 'onWorkerRunning', + WorkerBusyEvent::class => 'onWorkerBusy', ]; } } diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnCustomStopExceptionListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnCustomStopExceptionListenerTest.php index c9e6c6d5db887..a5c095a51b38d 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnCustomStopExceptionListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnCustomStopExceptionListenerTest.php @@ -13,8 +13,8 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Event\WorkerBusyEvent; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; -use Symfony\Component\Messenger\Event\WorkerRunningEvent; use Symfony\Component\Messenger\EventListener\StopWorkerOnCustomStopExceptionListener; use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\StopWorkerException; @@ -50,7 +50,7 @@ public function test(\Throwable $throwable, bool $shouldStop) $worker = $this->createMock(Worker::class); $worker->expects($shouldStop ? $this->once() : $this->never())->method('stop'); - $runningEvent = new WorkerRunningEvent($worker, false); + $runningEvent = new WorkerBusyEvent($worker); $listener->onWorkerRunning($runningEvent); } diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnFailureLimitListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnFailureLimitListenerTest.php index 9d776a39e53b4..2b033c0f72105 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnFailureLimitListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnFailureLimitListenerTest.php @@ -14,8 +14,8 @@ use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Event\WorkerBusyEvent; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; -use Symfony\Component\Messenger\Event\WorkerRunningEvent; use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Worker; @@ -31,17 +31,17 @@ public function testWorkerStopsWhenMaximumCountReached(int $max, bool $shouldSto $worker->expects($shouldStop ? $this->atLeastOnce() : $this->never())->method('stop'); $failedEvent = $this->createFailedEvent(); - $runningEvent = new WorkerRunningEvent($worker, false); + $runningEvent = new WorkerBusyEvent($worker); $failureLimitListener = new StopWorkerOnFailureLimitListener($max); // simulate three messages (of which 2 failed) $failureLimitListener->onMessageFailed($failedEvent); - $failureLimitListener->onWorkerRunning($runningEvent); + $failureLimitListener->onWorkerBusy($runningEvent); - $failureLimitListener->onWorkerRunning($runningEvent); + $failureLimitListener->onWorkerBusy($runningEvent); $failureLimitListener->onMessageFailed($failedEvent); - $failureLimitListener->onWorkerRunning($runningEvent); + $failureLimitListener->onWorkerBusy($runningEvent); } public static function countProvider(): iterable @@ -62,11 +62,11 @@ public function testWorkerLogsMaximumCountReachedWhenLoggerIsGiven() ); $worker = $this->createMock(Worker::class); - $event = new WorkerRunningEvent($worker, false); + $event = new WorkerBusyEvent($worker); $failureLimitListener = new StopWorkerOnFailureLimitListener(1, $logger); $failureLimitListener->onMessageFailed($this->createFailedEvent()); - $failureLimitListener->onWorkerRunning($event); + $failureLimitListener->onWorkerBusy($event); } private function createFailedEvent(): WorkerMessageFailedEvent diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMemoryLimitListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMemoryLimitListenerTest.php index 8ae5be4be80a8..3558d541a3dd6 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMemoryLimitListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMemoryLimitListenerTest.php @@ -13,7 +13,7 @@ use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Event\WorkerBusyEvent; use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener; use Symfony\Component\Messenger\Worker; @@ -28,7 +28,7 @@ public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $me $worker = $this->createMock(Worker::class); $worker->expects($shouldStop ? $this->once() : $this->never())->method('stop'); - $event = new WorkerRunningEvent($worker, false); + $event = new WorkerBusyEvent($worker); $memoryLimitListener = new StopWorkerOnMemoryLimitListener($memoryLimit, null, $memoryResolver); $memoryLimitListener->onWorkerRunning($event); @@ -50,7 +50,7 @@ public function testWorkerLogsMemoryExceededWhenLoggerIsGiven() $memoryResolver = fn () => 70; $worker = $this->createMock(Worker::class); - $event = new WorkerRunningEvent($worker, false); + $event = new WorkerBusyEvent($worker); $memoryLimitListener = new StopWorkerOnMemoryLimitListener(64, $logger, $memoryResolver); $memoryLimitListener->onWorkerRunning($event); diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMessageLimitListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMessageLimitListenerTest.php index 8ce9a198f08c1..fa8d169467c8f 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMessageLimitListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnMessageLimitListenerTest.php @@ -13,7 +13,7 @@ use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Event\WorkerBusyEvent; use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; use Symfony\Component\Messenger\Worker; @@ -26,13 +26,13 @@ public function testWorkerStopsWhenMaximumCountExceeded(int $max, bool $shouldSt { $worker = $this->createMock(Worker::class); $worker->expects($shouldStop ? $this->atLeastOnce() : $this->never())->method('stop'); - $event = new WorkerRunningEvent($worker, false); + $event = new WorkerBusyEvent($worker); $maximumCountListener = new StopWorkerOnMessageLimitListener($max); // simulate three messages processed - $maximumCountListener->onWorkerRunning($event); - $maximumCountListener->onWorkerRunning($event); - $maximumCountListener->onWorkerRunning($event); + $maximumCountListener->onWorkerBusy($event); + $maximumCountListener->onWorkerBusy($event); + $maximumCountListener->onWorkerBusy($event); } public static function countProvider(): iterable @@ -53,9 +53,9 @@ public function testWorkerLogsMaximumCountExceededWhenLoggerIsGiven() ); $worker = $this->createMock(Worker::class); - $event = new WorkerRunningEvent($worker, false); + $event = new WorkerBusyEvent($worker); $maximumCountListener = new StopWorkerOnMessageLimitListener(1, $logger); - $maximumCountListener->onWorkerRunning($event); + $maximumCountListener->onWorkerBusy($event); } } diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnRestartSignalListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnRestartSignalListenerTest.php index 3b83f04268ce5..6d709be514d1f 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnRestartSignalListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnRestartSignalListenerTest.php @@ -14,7 +14,7 @@ use PHPUnit\Framework\TestCase; use Psr\Cache\CacheItemInterface; use Psr\Cache\CacheItemPoolInterface; -use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Event\WorkerBusyEvent; use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener; use Symfony\Component\Messenger\Worker; @@ -36,7 +36,7 @@ public function testWorkerStopsWhenMemoryLimitExceeded(?int $lastRestartTimeOffs $worker = $this->createMock(Worker::class); $worker->expects($shouldStop ? $this->once() : $this->never())->method('stop'); - $event = new WorkerRunningEvent($worker, false); + $event = new WorkerBusyEvent($worker); $stopOnSignalListener = new StopWorkerOnRestartSignalListener($cachePool); $stopOnSignalListener->onWorkerStarted(); @@ -60,7 +60,7 @@ public function testWorkerDoesNotStopIfRestartNotInCache() $worker = $this->createMock(Worker::class); $worker->expects($this->never())->method('stop'); - $event = new WorkerRunningEvent($worker, false); + $event = new WorkerBusyEvent($worker); $stopOnSignalListener = new StopWorkerOnRestartSignalListener($cachePool); $stopOnSignalListener->onWorkerStarted(); diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnTimeLimitListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnTimeLimitListenerTest.php index 90f76da61226a..a80901001a68a 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnTimeLimitListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnTimeLimitListenerTest.php @@ -13,7 +13,7 @@ use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Event\WorkerBusyEvent; use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener; use Symfony\Component\Messenger\Worker; @@ -30,7 +30,7 @@ public function testWorkerStopsWhenTimeLimitIsReached() $worker = $this->createMock(Worker::class); $worker->expects($this->once())->method('stop'); - $event = new WorkerRunningEvent($worker, false); + $event = new WorkerBusyEvent($worker); $timeoutListener = new StopWorkerOnTimeLimitListener(1, $logger); $timeoutListener->onWorkerStarted(); diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index f13cbcf4ef793..120b1e22e05af 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -18,6 +18,7 @@ use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Event\WorkerBusyEvent; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; @@ -81,8 +82,8 @@ public function __construct() public function dispatch(object $event): object { - if ($event instanceof WorkerRunningEvent) { - $this->listener->onWorkerRunning($event); + if ($event instanceof WorkerBusyEvent) { + $this->listener->onWorkerBusy($event); } return $event; @@ -141,7 +142,7 @@ public function testWorkerResetsTransportsIfResetServicesListenerIsCalled() $dispatcher = new EventDispatcher(); $dispatcher->addSubscriber(new ResetServicesListener(new ServicesResetter(new \ArrayIterator([$resettableReceiver]), ['reset']))); - $dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) { + $dispatcher->addListener(WorkerBusyEvent::class, function (WorkerBusyEvent $event) { $event->getWorker()->stop(); }); @@ -201,10 +202,11 @@ public function testWorkerDispatchesEventsOnSuccess() $this->isInstanceOf(WorkerMessageReceivedEvent::class), $this->isInstanceOf(WorkerMessageHandledEvent::class), $this->isInstanceOf(WorkerRunningEvent::class), + $this->isInstanceOf(WorkerBusyEvent::class), $this->isInstanceOf(WorkerStoppedEvent::class), ]; - $eventDispatcher->expects($this->exactly(5)) + $eventDispatcher->expects($this->exactly(6)) ->method('dispatch') ->willReturnCallback(function ($event) use (&$series) { array_shift($series)->evaluate($event); @@ -255,10 +257,11 @@ public function testWorkerDispatchesEventsOnError() $this->isInstanceOf(WorkerMessageReceivedEvent::class), $this->isInstanceOf(WorkerMessageFailedEvent::class), $this->isInstanceOf(WorkerRunningEvent::class), + $this->isInstanceOf(WorkerBusyEvent::class), $this->isInstanceOf(WorkerStoppedEvent::class), ]; - $eventDispatcher->expects($this->exactly(5)) + $eventDispatcher->expects($this->exactly(6)) ->method('dispatch') ->willReturnCallback(function ($event) use (&$series) { array_shift($series)->evaluate($event); diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index eb0af634769b9..4ad4f3d7e21e9 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -15,6 +15,8 @@ use Psr\Log\LoggerInterface; use Symfony\Component\Clock\Clock; use Symfony\Component\Clock\ClockInterface; +use Symfony\Component\Messenger\Event\WorkerBusyEvent; +use Symfony\Component\Messenger\Event\WorkerIdleEvent; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; @@ -108,6 +110,7 @@ public function run(array $options = []): void $this->rateLimit($transportName); $this->handleMessage($envelope, $transportName); $this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false)); + $this->eventDispatcher?->dispatch(new WorkerBusyEvent($this)); if ($this->shouldStop) { break 2; @@ -130,6 +133,7 @@ public function run(array $options = []): void if (!$envelopeHandled) { $this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, true)); + $this->eventDispatcher?->dispatch(new WorkerIdleEvent($this)); if (0 < $sleep = (int) ($options['sleep'] - 1e6 * ($this->clock->now()->format('U.u') - $envelopeHandledStart->format('U.u')))) { $this->clock->sleep($sleep / 1e6);