From 81e52b2f4edb4cdf3afc3110ad7f7b3b49035242 Mon Sep 17 00:00:00 2001 From: Nicolas Grekas Date: Wed, 6 Oct 2021 23:01:20 +0200 Subject: [PATCH] [Messenger] allow processing messages in batches --- .../Transport/AmqpExtIntegrationTest.php | 3 +- src/Symfony/Component/Messenger/CHANGELOG.md | 1 + .../Messenger/Handler/Acknowledger.php | 83 ++++++++++ .../Handler/BatchHandlerInterface.php | 34 ++++ .../Messenger/Handler/BatchHandlerTrait.php | 75 +++++++++ .../Messenger/Handler/HandlerDescriptor.php | 56 +++---- .../Middleware/HandleMessageMiddleware.php | 63 ++++++- .../Component/Messenger/Stamp/AckStamp.php | 35 ++++ .../Stamp/FlushBatchHandlersStamp.php | 30 ++++ .../Messenger/Stamp/NoAutoAckStamp.php | 32 ++++ .../Tests/Handler/HandlersLocatorTest.php | 8 +- .../HandleMessageMiddlewareTest.php | 132 +++++++++++++++ .../Component/Messenger/Tests/WorkerTest.php | 154 +++++++++++++++++- src/Symfony/Component/Messenger/Worker.php | 127 +++++++++++---- 14 files changed, 758 insertions(+), 75 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Handler/Acknowledger.php create mode 100644 src/Symfony/Component/Messenger/Handler/BatchHandlerInterface.php create mode 100644 src/Symfony/Component/Messenger/Handler/BatchHandlerTrait.php create mode 100644 src/Symfony/Component/Messenger/Stamp/AckStamp.php create mode 100644 src/Symfony/Component/Messenger/Stamp/FlushBatchHandlersStamp.php create mode 100644 src/Symfony/Component/Messenger/Stamp/NoAutoAckStamp.php diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php index ef123266e0061..aa551e4e85080 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php @@ -215,7 +215,8 @@ public function testItReceivesSignals() with stamps: [ "Symfony\\Component\\Messenger\\Bridge\\Amqp\\Transport\\AmqpReceivedStamp", "Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp", - "Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp" + "Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp", + "Symfony\\Component\\Messenger\\Stamp\\AckStamp" ] Done. diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 28badcc6e60f6..ede59fef6ceef 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -5,6 +5,7 @@ CHANGELOG --- * Add `AsMessageHandler` attribute for declaring message handlers on PHP 8. + * Add support for handling messages in batches with `BatchHandlerInterface` and corresponding trait * Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker. * Add support for resetting container services after each messenger message. * Added `WorkerMetadata` class which allows you to access the configuration details of a worker, like `queueNames` and `transportNames` it consumes from. diff --git a/src/Symfony/Component/Messenger/Handler/Acknowledger.php b/src/Symfony/Component/Messenger/Handler/Acknowledger.php new file mode 100644 index 0000000000000..a2317b78369fe --- /dev/null +++ b/src/Symfony/Component/Messenger/Handler/Acknowledger.php @@ -0,0 +1,83 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Handler; + +use Symfony\Component\Messenger\Exception\LogicException; + +/** + * @author Nicolas Grekas + */ +class Acknowledger +{ + private $handlerClass; + private $ack; + private $error = null; + private $result = null; + + /** + * @param null|\Closure(\Throwable|null, mixed):void $ack + */ + public function __construct(string $handlerClass, \Closure $ack = null) + { + $this->handlerClass = $handlerClass; + $this->ack = $ack ?? static function () {}; + } + + /** + * @param mixed $result + */ + public function ack($result = null): void + { + $this->doAck(null, $result); + } + + public function nack(\Throwable $error): void + { + $this->doAck($error); + } + + public function getError(): ?\Throwable + { + return $this->error; + } + + /** + * @return mixed + */ + public function getResult() + { + return $this->result; + } + + public function isAcknowledged(): bool + { + return null === $this->ack; + } + + public function __destruct() + { + if ($this->ack instanceof \Closure) { + throw new LogicException(sprintf('The acknowledger was not called by the "%s" batch handler.', $this->handlerClass)); + } + } + + private function doAck(\Throwable $e = null, $result = null): void + { + if (!$ack = $this->ack) { + throw new LogicException(sprintf('The acknowledger cannot be called twice by the "%s" batch handler.', $this->handlerClass)); + } + $this->ack = null; + $this->error = $e; + $this->result = $result; + $ack($e, $result); + } +} diff --git a/src/Symfony/Component/Messenger/Handler/BatchHandlerInterface.php b/src/Symfony/Component/Messenger/Handler/BatchHandlerInterface.php new file mode 100644 index 0000000000000..ad053dac1d8d3 --- /dev/null +++ b/src/Symfony/Component/Messenger/Handler/BatchHandlerInterface.php @@ -0,0 +1,34 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Handler; + +/** + * @author Nicolas Grekas + */ +interface BatchHandlerInterface +{ + /** + * @param Acknowledger|null $ack The function to call to ack/nack the $message. + * The message should be handled synchronously when null. + * + * @return mixed The number of pending messages in the batch if $ack is not null, + * the result from handling the message otherwise + */ + //public function __invoke(object $message, Acknowledger $ack = null): mixed; + + /** + * Flushes any pending buffers. + * + * @param bool $force Whether flushing is required; it can be skipped if not + */ + public function flush(bool $force): void; +} diff --git a/src/Symfony/Component/Messenger/Handler/BatchHandlerTrait.php b/src/Symfony/Component/Messenger/Handler/BatchHandlerTrait.php new file mode 100644 index 0000000000000..be7124dd38893 --- /dev/null +++ b/src/Symfony/Component/Messenger/Handler/BatchHandlerTrait.php @@ -0,0 +1,75 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Handler; + +use Symfony\Component\Messenger\Exception\LogicException; + +/** + * @author Nicolas Grekas + */ +trait BatchHandlerTrait +{ + private $jobs = []; + + /** + * {@inheritdoc} + */ + public function flush(bool $force): void + { + if ($jobs = $this->jobs) { + $this->jobs = []; + $this->process($jobs); + } + } + + /** + * @param Acknowledger|null $ack The function to call to ack/nack the $message. + * The message should be handled synchronously when null. + * + * @return mixed The number of pending messages in the batch if $ack is not null, + * the result from handling the message otherwise + */ + private function handle(object $message, ?Acknowledger $ack) + { + if (null === $ack) { + $ack = new Acknowledger(get_debug_type($this)); + $this->jobs[] = [$message, $ack]; + $this->flush(true); + + return $ack->getResult(); + } + + $this->jobs[] = [$message, $ack]; + if (!$this->shouldFlush()) { + return \count($this->jobs); + } + + $this->flush(true); + + return 0; + } + + private function shouldFlush(): bool + { + return 10 <= \count($this->jobs); + } + + /** + * Completes the jobs in the list. + * + * @list $jobs A list of pairs of messages and their corresponding acknowledgers + */ + private function process(array $jobs): void + { + throw new LogicException(sprintf('"%s" should implement abstract method "process()".', get_debug_type($this))); + } +} diff --git a/src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php b/src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php index 0f5bf28e5fc80..6acb2c2f377bf 100644 --- a/src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php +++ b/src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php @@ -19,12 +19,34 @@ final class HandlerDescriptor { private $handler; + private $name; + private $batchHandler; private $options; public function __construct(callable $handler, array $options = []) { + if (!$handler instanceof \Closure) { + $handler = \Closure::fromCallable($handler); + } + $this->handler = $handler; $this->options = $options; + + $r = new \ReflectionFunction($handler); + + if (str_contains($r->name, '{closure}')) { + $this->name = 'Closure'; + } elseif (!$handler = $r->getClosureThis()) { + $class = $r->getClosureScopeClass(); + + $this->name = ($class ? $class->name.'::' : '').$r->name; + } else { + if ($handler instanceof BatchHandlerInterface) { + $this->batchHandler = $handler; + } + + $this->name = \get_class($handler).'::'.$r->name; + } } public function getHandler(): callable @@ -34,7 +56,7 @@ public function getHandler(): callable public function getName(): string { - $name = $this->callableName($this->handler); + $name = $this->name; $alias = $this->options['alias'] ?? null; if (null !== $alias) { @@ -44,37 +66,13 @@ public function getName(): string return $name; } - public function getOption(string $option) + public function getBatchHandler(): ?BatchHandlerInterface { - return $this->options[$option] ?? null; + return $this->batchHandler; } - private function callableName(callable $handler): string + public function getOption(string $option) { - if (\is_array($handler)) { - if (\is_object($handler[0])) { - return \get_class($handler[0]).'::'.$handler[1]; - } - - return $handler[0].'::'.$handler[1]; - } - - if (\is_string($handler)) { - return $handler; - } - - if ($handler instanceof \Closure) { - $r = new \ReflectionFunction($handler); - if (str_contains($r->name, '{closure}')) { - return 'Closure'; - } - if ($class = $r->getClosureScopeClass()) { - return $class->name.'::'.$r->name; - } - - return $r->name; - } - - return \get_class($handler).'::__invoke'; + return $this->options[$option] ?? null; } } diff --git a/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php index eaf6b9508017b..3daa659f7e86f 100644 --- a/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php +++ b/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php @@ -15,10 +15,15 @@ use Psr\Log\NullLogger; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\HandlerFailedException; +use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\NoHandlerForMessageException; +use Symfony\Component\Messenger\Handler\Acknowledger; use Symfony\Component\Messenger\Handler\HandlerDescriptor; use Symfony\Component\Messenger\Handler\HandlersLocatorInterface; +use Symfony\Component\Messenger\Stamp\AckStamp; +use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp; use Symfony\Component\Messenger\Stamp\HandledStamp; +use Symfony\Component\Messenger\Stamp\NoAutoAckStamp; /** * @author Samuel Roze @@ -60,7 +65,38 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope try { $handler = $handlerDescriptor->getHandler(); - $handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message)); + $batchHandler = $handlerDescriptor->getBatchHandler(); + + /** @var AckStamp $ackStamp */ + if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) { + $ack = new Acknowledger(get_debug_type($batchHandler), static function (\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) { + if (null !== $e) { + $e = new HandlerFailedException($envelope, [$e]); + } else { + $envelope = $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor, $result)); + } + + $ackStamp->ack($envelope, $e); + }); + + $result = $handler($message, $ack); + + if (!\is_int($result) || 0 > $result) { + throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler))); + } + + if (!$ack->isAcknowledged()) { + $envelope = $envelope->with(new NoAutoAckStamp($handlerDescriptor)); + } elseif ($ack->getError()) { + throw $ack->getError(); + } else { + $result = $ack->getResult(); + } + } else { + $result = $handler($message); + } + + $handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $result); $envelope = $envelope->with($handledStamp); $this->logger->info('Message {class} handled by {handler}', $context + ['handler' => $handledStamp->getHandlerName()]); } catch (\Throwable $e) { @@ -68,6 +104,19 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope } } + /** @var FlushBatchHandlersStamp $flushStamp */ + if ($flushStamp = $envelope->last(FlushBatchHandlersStamp::class)) { + /** @var NoAutoAckStamp $stamp */ + foreach ($envelope->all(NoAutoAckStamp::class) as $stamp) { + try { + $handler = $stamp->getHandlerDescriptor()->getBatchHandler(); + $handler->flush($flushStamp->force()); + } catch (\Throwable $e) { + $exceptions[] = $e; + } + } + } + if (null === $handler) { if (!$this->allowNoHandlers) { throw new NoHandlerForMessageException(sprintf('No handler for message "%s".', $context['class'])); @@ -85,11 +134,13 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope private function messageHasAlreadyBeenHandled(Envelope $envelope, HandlerDescriptor $handlerDescriptor): bool { - $some = array_filter($envelope - ->all(HandledStamp::class), function (HandledStamp $stamp) use ($handlerDescriptor) { - return $stamp->getHandlerName() === $handlerDescriptor->getName(); - }); + /** @var HandledStamp $stamp */ + foreach ($envelope->all(HandledStamp::class) as $stamp) { + if ($stamp->getHandlerName() === $handlerDescriptor->getName()) { + return true; + } + } - return \count($some) > 0; + return false; } } diff --git a/src/Symfony/Component/Messenger/Stamp/AckStamp.php b/src/Symfony/Component/Messenger/Stamp/AckStamp.php new file mode 100644 index 0000000000000..b94c2c98e395c --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/AckStamp.php @@ -0,0 +1,35 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +use Symfony\Component\Messenger\Envelope; + +/** + * Marker stamp for messages that can be ack/nack'ed. + */ +final class AckStamp implements NonSendableStampInterface +{ + private $ack; + + /** + * @param \Closure(Envelope, \Throwable|null) $ack + */ + public function __construct(\Closure $ack) + { + $this->ack = $ack; + } + + public function ack(Envelope $envelope, \Throwable $e = null): void + { + ($this->ack)($envelope, $e); + } +} diff --git a/src/Symfony/Component/Messenger/Stamp/FlushBatchHandlersStamp.php b/src/Symfony/Component/Messenger/Stamp/FlushBatchHandlersStamp.php new file mode 100644 index 0000000000000..5dfbe2281efe3 --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/FlushBatchHandlersStamp.php @@ -0,0 +1,30 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +/** + * Marker telling that any batch handlers bound to the envelope should be flushed. + */ +final class FlushBatchHandlersStamp implements NonSendableStampInterface +{ + private $force; + + public function __construct(bool $force) + { + $this->force = $force; + } + + public function force(): bool + { + return $this->force; + } +} diff --git a/src/Symfony/Component/Messenger/Stamp/NoAutoAckStamp.php b/src/Symfony/Component/Messenger/Stamp/NoAutoAckStamp.php new file mode 100644 index 0000000000000..15ba383b79c9c --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/NoAutoAckStamp.php @@ -0,0 +1,32 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +use Symfony\Component\Messenger\Handler\HandlerDescriptor; + +/** + * Marker telling that ack should not be done automatically for this message. + */ +final class NoAutoAckStamp implements NonSendableStampInterface +{ + private $handlerDescriptor; + + public function __construct(HandlerDescriptor $handlerDescriptor) + { + $this->handlerDescriptor = $handlerDescriptor; + } + + public function getHandlerDescriptor(): HandlerDescriptor + { + return $this->handlerDescriptor; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Handler/HandlersLocatorTest.php b/src/Symfony/Component/Messenger/Tests/Handler/HandlersLocatorTest.php index 1c00a751e9d06..7bb7347877b6c 100644 --- a/src/Symfony/Component/Messenger/Tests/Handler/HandlersLocatorTest.php +++ b/src/Symfony/Component/Messenger/Tests/Handler/HandlersLocatorTest.php @@ -27,7 +27,10 @@ public function testItYieldsHandlerDescriptors() DummyMessage::class => [$handler], ]); - $this->assertEquals([new HandlerDescriptor($handler)], iterator_to_array($locator->getHandlers(new Envelope(new DummyMessage('a'))))); + $descriptor = new HandlerDescriptor($handler); + $descriptor->getName(); + + $this->assertEquals([$descriptor], iterator_to_array($locator->getHandlers(new Envelope(new DummyMessage('a'))))); } public function testItReturnsOnlyHandlersMatchingTransport() @@ -43,6 +46,9 @@ public function testItReturnsOnlyHandlersMatchingTransport() ], ]); + $first->getName(); + $second->getName(); + $this->assertEquals([ $first, $second, diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/HandleMessageMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/HandleMessageMiddlewareTest.php index c33bad5137d8c..503800e1cd6e1 100644 --- a/src/Symfony/Component/Messenger/Tests/Middleware/HandleMessageMiddlewareTest.php +++ b/src/Symfony/Component/Messenger/Tests/Middleware/HandleMessageMiddlewareTest.php @@ -13,12 +13,18 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\HandlerFailedException; +use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\NoHandlerForMessageException; +use Symfony\Component\Messenger\Handler\Acknowledger; +use Symfony\Component\Messenger\Handler\BatchHandlerInterface; +use Symfony\Component\Messenger\Handler\BatchHandlerTrait; use Symfony\Component\Messenger\Handler\HandlerDescriptor; use Symfony\Component\Messenger\Handler\HandlersLocator; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; use Symfony\Component\Messenger\Middleware\StackMiddleware; +use Symfony\Component\Messenger\Stamp\AckStamp; use Symfony\Component\Messenger\Stamp\HandledStamp; +use Symfony\Component\Messenger\Stamp\NoAutoAckStamp; use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; @@ -129,6 +135,132 @@ public function testAllowNoHandlers() $this->assertInstanceOf(Envelope::class, $middleware->handle(new Envelope(new DummyMessage('Hey')), new StackMiddleware())); } + + public function testBatchHandler() + { + $handler = new class() implements BatchHandlerInterface { + public $processedMessages; + + use BatchHandlerTrait; + + public function __invoke(DummyMessage $message, Acknowledger $ack = null) + { + return $this->handle($message, $ack); + } + + private function shouldFlush() + { + return 2 <= \count($this->jobs); + } + + private function process(array $jobs): void + { + $this->processedMessages = array_column($jobs, 0); + + foreach ($jobs as [$job, $ack]) { + $ack->ack($job); + } + } + }; + + $middleware = new HandleMessageMiddleware(new HandlersLocator([ + DummyMessage::class => [new HandlerDescriptor($handler)], + ])); + + $ackedMessages = []; + $ack = static function (Envelope $envelope, \Throwable $e = null) use (&$ackedMessages) { + if (null !== $e) { + throw $e; + } + $ackedMessages[] = $envelope->last(HandledStamp::class)->getResult(); + }; + + $expectedMessages = [ + new DummyMessage('Hey'), + new DummyMessage('Bob'), + ]; + + $envelopes = []; + foreach ($expectedMessages as $message) { + $envelopes[] = $middleware->handle(new Envelope($message, [new AckStamp($ack)]), new StackMiddleware()); + } + + $this->assertSame($expectedMessages, $handler->processedMessages); + $this->assertSame($expectedMessages, $ackedMessages); + + $this->assertNotNull($envelopes[0]->last(NoAutoAckStamp::class)); + $this->assertNull($envelopes[1]->last(NoAutoAckStamp::class)); + } + + public function testBatchHandlerNoAck() + { + $handler = new class() implements BatchHandlerInterface { + use BatchHandlerTrait; + + public function __invoke(DummyMessage $message, Acknowledger $ack = null) + { + return $this->handle($message, $ack); + } + + private function shouldFlush() + { + return true; + } + + private function process(array $jobs): void + { + } + }; + + $middleware = new HandleMessageMiddleware(new HandlersLocator([ + DummyMessage::class => [new HandlerDescriptor($handler)], + ])); + + $error = null; + $ack = static function (Envelope $envelope, \Throwable $e = null) use (&$error) { + $error = $e; + }; + + $this->expectException(LogicException::class); + $this->expectExceptionMessage('The acknowledger was not called by the "Symfony\Component\Messenger\Handler\BatchHandlerInterface@anonymous" batch handler.'); + + $middleware->handle(new Envelope(new DummyMessage('Hey'), [new AckStamp($ack)]), new StackMiddleware()); + } + + public function testBatchHandlerNoBatch() + { + $handler = new class() implements BatchHandlerInterface { + public $processedMessages; + + use BatchHandlerTrait; + + public function __invoke(DummyMessage $message, Acknowledger $ack = null) + { + return $this->handle($message, $ack); + } + + private function shouldFlush() + { + return false; + } + + private function process(array $jobs): void + { + $this->processedMessages = array_column($jobs, 0); + [$job, $ack] = array_shift($jobs); + $ack->ack($job); + } + }; + + $middleware = new HandleMessageMiddleware(new HandlersLocator([ + DummyMessage::class => [new HandlerDescriptor($handler)], + ])); + + $message = new DummyMessage('Hey'); + $middleware->handle(new Envelope($message), new StackMiddleware()); + + $this->assertSame([$message], $handler->processedMessages); + } } class HandleMessageMiddlewareTestCallable diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index a5a3f902c669e..4cee64e3ac1ff 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -23,7 +23,14 @@ use Symfony\Component\Messenger\Event\WorkerStoppedEvent; use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; use Symfony\Component\Messenger\Exception\RuntimeException; +use Symfony\Component\Messenger\Handler\Acknowledger; +use Symfony\Component\Messenger\Handler\BatchHandlerInterface; +use Symfony\Component\Messenger\Handler\BatchHandlerTrait; +use Symfony\Component\Messenger\Handler\HandlerDescriptor; +use Symfony\Component\Messenger\Handler\HandlersLocator; +use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\SentStamp; @@ -49,17 +56,13 @@ public function testWorkerDispatchTheReceivedMessage() ]); $bus = $this->createMock(MessageBusInterface::class); + $envelopes = []; $bus->expects($this->exactly(2)) ->method('dispatch') - ->withConsecutive( - [new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])], - [new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])] - ) - ->willReturnOnConsecutiveCalls( - $this->returnArgument(0), - $this->returnArgument(0) - ); + ->willReturnCallback(function ($envelope) use (&$envelopes) { + return $envelopes[] = $envelope; + }); $dispatcher = new EventDispatcher(); $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(2)); @@ -67,6 +70,12 @@ public function testWorkerDispatchTheReceivedMessage() $worker = new Worker(['transport' => $receiver], $bus, $dispatcher); $worker->run(); + $this->assertSame($apiMessage, $envelopes[0]->getMessage()); + $this->assertSame($ipaMessage, $envelopes[1]->getMessage()); + $this->assertCount(1, $envelopes[0]->all(ReceivedStamp::class)); + $this->assertCount(1, $envelopes[0]->all(ConsumedByWorkerStamp::class)); + $this->assertSame('transport', $envelopes[0]->last(ReceivedStamp::class)->getTransportName()); + $this->assertSame(2, $receiver->getAcknowledgeCount()); } @@ -340,6 +349,109 @@ public function testWorkerShouldLogOnStop() $worker->stop(); } + + public function testBatchProcessing() + { + $expectedMessages = [ + new DummyMessage('Hey'), + new DummyMessage('Bob'), + ]; + + $receiver = new DummyReceiver([ + [new Envelope($expectedMessages[0])], + [new Envelope($expectedMessages[1])], + ]); + + $handler = new DummyBatchHandler(); + + $middleware = new HandleMessageMiddleware(new HandlersLocator([ + DummyMessage::class => [new HandlerDescriptor($handler)], + ])); + + $bus = new MessageBus([$middleware]); + + $dispatcher = new EventDispatcher(); + $dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) use ($receiver) { + static $i = 0; + if (1 < ++$i) { + $event->getWorker()->stop(); + $this->assertSame(2, $receiver->getAcknowledgeCount()); + } else { + $this->assertSame(0, $receiver->getAcknowledgeCount()); + } + }); + + $worker = new Worker([$receiver], $bus, $dispatcher); + $worker->run(); + + $this->assertSame($expectedMessages, $handler->processedMessages); + } + + public function testFlushBatchOnIdle() + { + $expectedMessages = [ + new DummyMessage('Hey'), + ]; + + $receiver = new DummyReceiver([ + [new Envelope($expectedMessages[0])], + [], + ]); + + $handler = new DummyBatchHandler(); + + $middleware = new HandleMessageMiddleware(new HandlersLocator([ + DummyMessage::class => [new HandlerDescriptor($handler)], + ])); + + $bus = new MessageBus([$middleware]); + + $dispatcher = new EventDispatcher(); + $dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) use ($receiver) { + static $i = 0; + if (1 < ++$i) { + $event->getWorker()->stop(); + $this->assertSame(1, $receiver->getAcknowledgeCount()); + } else { + $this->assertSame(0, $receiver->getAcknowledgeCount()); + } + }); + + $worker = new Worker([$receiver], $bus, $dispatcher); + $worker->run(); + + $this->assertSame($expectedMessages, $handler->processedMessages); + } + + public function testFlushBatchOnStop() + { + $expectedMessages = [ + new DummyMessage('Hey'), + ]; + + $receiver = new DummyReceiver([ + [new Envelope($expectedMessages[0])], + ]); + + $handler = new DummyBatchHandler(); + + $middleware = new HandleMessageMiddleware(new HandlersLocator([ + DummyMessage::class => [new HandlerDescriptor($handler)], + ])); + + $bus = new MessageBus([$middleware]); + + $dispatcher = new EventDispatcher(); + $dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) use ($receiver) { + $event->getWorker()->stop(); + $this->assertSame(0, $receiver->getAcknowledgeCount()); + }); + + $worker = new Worker([$receiver], $bus, $dispatcher); + $worker->run(); + + $this->assertSame($expectedMessages, $handler->processedMessages); + } } class DummyReceiver implements ReceiverInterface @@ -400,3 +512,29 @@ public function getFromQueues(array $queueNames): iterable return $this->get(); } } + +class DummyBatchHandler implements BatchHandlerInterface +{ + use BatchHandlerTrait; + + public $processedMessages; + + public function __invoke(DummyMessage $message, Acknowledger $ack = null) + { + return $this->handle($message, $ack); + } + + private function shouldFlush() + { + return 2 <= \count($this->jobs); + } + + private function process(array $jobs): void + { + $this->processedMessages = array_column($jobs, 0); + + foreach ($jobs as [$job, $ack]) { + $ack->ack($job); + } + } +} diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 183a48852dda2..754d1c1b1e75a 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -23,7 +23,10 @@ use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; use Symfony\Component\Messenger\Exception\RuntimeException; +use Symfony\Component\Messenger\Stamp\AckStamp; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; +use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp; +use Symfony\Component\Messenger\Stamp\NoAutoAckStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; @@ -43,6 +46,8 @@ class Worker private $logger; private $shouldStop = false; private $metadata; + private $acks = []; + private $unacks; /** * @param ReceiverInterface[] $receivers Where the key is the transport name @@ -56,6 +61,7 @@ public function __construct(array $receivers, MessageBusInterface $bus, EventDis $this->metadata = new WorkerMetadata([ 'transportNames' => array_keys($receivers), ]); + $this->unacks = new \SplObjectStorage(); } /** @@ -85,7 +91,7 @@ public function run(array $options = []): void } } - while (false === $this->shouldStop) { + while (!$this->shouldStop) { $envelopeHandled = false; $envelopeHandledStart = microtime(true); foreach ($this->receivers as $transportName => $receiver) { @@ -98,7 +104,7 @@ public function run(array $options = []): void foreach ($envelopes as $envelope) { $envelopeHandled = true; - $this->handleMessage($envelope, $receiver, $transportName); + $this->handleMessage($envelope, $transportName); $this->dispatchEvent(new WorkerRunningEvent($this, false)); if ($this->shouldStop) { @@ -114,6 +120,10 @@ public function run(array $options = []): void } } + if (!$envelopeHandled && $this->flush(false)) { + continue; + } + if (!$envelopeHandled) { $this->dispatchEvent(new WorkerRunningEvent($this, true)); @@ -123,10 +133,11 @@ public function run(array $options = []): void } } + $this->flush(true); $this->dispatchEvent(new WorkerStoppedEvent($this)); } - private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName): void + private function handleMessage(Envelope $envelope, string $transportName): void { $event = new WorkerMessageReceivedEvent($envelope, $transportName); $this->dispatchEvent($event); @@ -136,45 +147,101 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, return; } + $acked = false; + $ack = function (Envelope $envelope, \Throwable $e = null) use ($transportName, &$acked) { + $acked = true; + $this->acks[] = [$transportName, $envelope, $e]; + }; + try { - $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp())); - } catch (\Throwable $throwable) { - $rejectFirst = $throwable instanceof RejectRedeliveredMessageException; - if ($rejectFirst) { - // redelivered messages are rejected first so that continuous failures in an event listener or while - // publishing for retry does not cause infinite redelivery loops - $receiver->reject($envelope); + $e = null; + $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp(), new AckStamp($ack))); + } catch (\Throwable $e) { + } + + $noAutoAckStamp = $envelope->last(NoAutoAckStamp::class); + + if (!$acked && !$noAutoAckStamp) { + $this->acks[] = [$transportName, $envelope, $e]; + } elseif ($noAutoAckStamp) { + $this->unacks[$noAutoAckStamp->getHandlerDescriptor()->getBatchHandler()] = [$envelope->withoutAll(AckStamp::class), $transportName]; + } + + $this->ack(); + } + + private function ack(): bool + { + $acks = $this->acks; + $this->acks = []; + + foreach ($acks as [$transportName, $envelope, $e]) { + $receiver = $this->receivers[$transportName]; + + if (null !== $e) { + if ($rejectFirst = $e instanceof RejectRedeliveredMessageException) { + // redelivered messages are rejected first so that continuous failures in an event listener or while + // publishing for retry does not cause infinite redelivery loops + $receiver->reject($envelope); + } + + if ($e instanceof HandlerFailedException) { + $envelope = $e->getEnvelope(); + } + + $failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $e); + + $this->dispatchEvent($failedEvent); + $envelope = $failedEvent->getEnvelope(); + + if (!$rejectFirst) { + $receiver->reject($envelope); + } + + continue; } - if ($throwable instanceof HandlerFailedException) { - $envelope = $throwable->getEnvelope(); + $handledEvent = new WorkerMessageHandledEvent($envelope, $transportName); + $this->dispatchEvent($handledEvent); + $envelope = $handledEvent->getEnvelope(); + + if (null !== $this->logger) { + $message = $envelope->getMessage(); + $context = [ + 'message' => $message, + 'class' => \get_class($message), + ]; + $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context); } - $failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $throwable); - $this->dispatchEvent($failedEvent); - $envelope = $failedEvent->getEnvelope(); + $receiver->ack($envelope); + } + + return (bool) $acks; + } - if (!$rejectFirst) { - $receiver->reject($envelope); - } + private function flush(bool $force): bool + { + $unacks = $this->unacks; - return; + if (!$unacks->count()) { + return false; } - $handledEvent = new WorkerMessageHandledEvent($envelope, $transportName); - $this->dispatchEvent($handledEvent); - $envelope = $handledEvent->getEnvelope(); + $this->unacks = new \SplObjectStorage(); - if (null !== $this->logger) { - $message = $envelope->getMessage(); - $context = [ - 'message' => $message, - 'class' => \get_class($message), - ]; - $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context); + foreach ($unacks as $batchHandler) { + [$envelope, $transportName] = $unacks[$batchHandler]; + try { + $this->bus->dispatch($envelope->with(new FlushBatchHandlersStamp($force))); + $envelope = $envelope->withoutAll(NoAutoAckStamp::class); + unset($unacks[$batchHandler], $batchHandler); + } catch (\Throwable $e) { + $this->acks[] = [$transportName, $envelope, $e]; + } } - $receiver->ack($envelope); + return $this->ack(); } public function stop(): void