diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index b0853536ea93f..b276f3a14bf53 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -2183,6 +2183,11 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $sendersServiceLocator = ServiceLocatorTagPass::register($container, $senderReferences); + $workerExecutionStrategyRegistry = $container->getDefinition('messenger.worker_execution_strategy.registry'); + foreach ($container->findTaggedServiceIds('messenger.worker_execution_strategy', true) as $serviceId => $unused) { + $workerExecutionStrategyRegistry->addMethodCall('registerStrategy', [$container->findDefinition($serviceId)->getClass()]); + } + $container->getDefinition('messenger.senders_locator') ->replaceArgument(0, $messageToSendersMapping) ->replaceArgument(1, $sendersServiceLocator) diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php index d64cd058e61f8..c192e552f4260 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php @@ -156,6 +156,7 @@ abstract_arg('Routable message bus'), service('messenger.receiver_locator'), service('event_dispatcher'), + service('messenger.worker_execution_strategy.registry'), service('logger')->nullOnInvalid(), [], // Receiver names service('messenger.listener.reset_services')->nullOnInvalid(), diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index 0a67d48474d03..7aeddfa5bb28e 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -43,6 +43,9 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory; use Symfony\Component\Messenger\Transport\TransportFactory; +use Symfony\Component\Messenger\WorkerExecution\DefaultWorkerExecutionStrategy; +use Symfony\Component\Messenger\WorkerExecution\RankedWorkerExecutionStrategy; +use Symfony\Component\Messenger\WorkerExecution\WorkerExecutionStrategyRegistry; return static function (ContainerConfigurator $container) { $container->services() @@ -216,5 +219,13 @@ abstract_arg('message bus locator'), service('messenger.default_bus'), ]) + + ->set('messenger.worker_execution_strategy.registry', WorkerExecutionStrategyRegistry::class) + + ->set('messenger.worker_execution_strategy.default', DefaultWorkerExecutionStrategy::class) + ->tag('messenger.worker_execution_strategy') + + ->set('messenger.worker_execution_strategy.ranked', RankedWorkerExecutionStrategy::class) + ->tag('messenger.worker_execution_strategy') ; }; diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index f1872c3514555..61fc866267c32 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -33,6 +33,8 @@ use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener; use Symfony\Component\Messenger\RoutableMessageBus; use Symfony\Component\Messenger\Worker; +use Symfony\Component\Messenger\WorkerExecution\DefaultWorkerExecutionStrategy; +use Symfony\Component\Messenger\WorkerExecution\WorkerExecutionStrategyRegistry; /** * @author Samuel Roze @@ -48,8 +50,9 @@ class ConsumeMessagesCommand extends Command private ?ResetServicesListener $resetServicesListener; private array $busIds; private ?ContainerInterface $rateLimiterLocator; + private WorkerExecutionStrategyRegistry $strategyRegistry; - public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [], ResetServicesListener $resetServicesListener = null, array $busIds = [], ContainerInterface $rateLimiterLocator = null) + public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, WorkerExecutionStrategyRegistry $strategyRegistry, LoggerInterface $logger = null, array $receiverNames = [], ResetServicesListener $resetServicesListener = null, array $busIds = [], ContainerInterface $rateLimiterLocator = null) { $this->routableBus = $routableBus; $this->receiverLocator = $receiverLocator; @@ -59,6 +62,7 @@ public function __construct(RoutableMessageBus $routableBus, ContainerInterface $this->resetServicesListener = $resetServicesListener; $this->busIds = $busIds; $this->rateLimiterLocator = $rateLimiterLocator; + $this->strategyRegistry = $strategyRegistry; parent::__construct(); } @@ -78,6 +82,8 @@ protected function configure(): void new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'), new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'), new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'), + new InputOption('strategy', null, InputOption::VALUE_REQUIRED, 'Execution strategy', DefaultWorkerExecutionStrategy::getAlias()), + new InputOption('strategy-config', null, InputOption::VALUE_REQUIRED, 'Json-encoded custom config for the strategy. See the chosen strategy for info', '{}'), ]) ->setHelp(<<<'EOF' The %command.name% command consumes messages and dispatches them to the message bus. @@ -210,7 +216,14 @@ protected function execute(InputInterface $input, OutputInterface $output): int $bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus; - $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger, $rateLimiters); + $strategyAlias = $input->getOption('strategy'); + $strategyConfig = json_decode($input->getOption('strategy-config'), true); + if ($strategyConfig === null) { + throw new RuntimeException('Could not json-decode the value of the --strategy-config parameter'); + } + $strategy = $this->strategyRegistry->createStrategy($strategyAlias, $strategyConfig); + + $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger, $rateLimiters, $strategy); $options = [ 'sleep' => $input->getOption('sleep') * 1000000, ]; diff --git a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php index 22a50ec1e89aa..2087e82f267fa 100644 --- a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php +++ b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php @@ -295,9 +295,9 @@ private function registerReceivers(ContainerBuilder $container, array $busIds) $consumeCommandDefinition->replaceArgument(0, new Reference('messenger.routable_message_bus')); } - $consumeCommandDefinition->replaceArgument(4, array_values($receiverNames)); + $consumeCommandDefinition->replaceArgument(5, array_values($receiverNames)); try { - $consumeCommandDefinition->replaceArgument(6, $busIds); + $consumeCommandDefinition->replaceArgument(7, $busIds); } catch (OutOfBoundsException) { // ignore to preserve compatibility with symfony/framework-bundle < 5.4 } diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 1608644f9d6bd..5c934c3503218 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -31,6 +31,8 @@ use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\RateLimiter\LimiterInterface; +use Symfony\Component\Messenger\WorkerExecution\WorkerExecutionStrategyContext; +use Symfony\Component\Messenger\WorkerExecution\WorkerExecutionStrategyInterface; /** * @author Samuel Roze @@ -49,11 +51,12 @@ class Worker private array $acks = []; private \SplObjectStorage $unacks; private ?array $rateLimiters; + private WorkerExecutionStrategyInterface $executionStrategy; /** * @param ReceiverInterface[] $receivers Where the key is the transport name */ - public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null, array $rateLimiters = null) + public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null, array $rateLimiters = null, WorkerExecutionStrategyInterface $executionStrategy) { $this->receivers = $receivers; $this->bus = $bus; @@ -64,6 +67,7 @@ public function __construct(array $receivers, MessageBusInterface $bus, EventDis ]); $this->unacks = new \SplObjectStorage(); $this->rateLimiters = $rateLimiters; + $this->executionStrategy = $executionStrategy; } /** @@ -93,35 +97,13 @@ public function run(array $options = []): void } } + $workerExecutionContext = new WorkerExecutionStrategyContext($this, $queueNames ?: []); + while (!$this->shouldStop) { - $envelopeHandled = false; $envelopeHandledStart = microtime(true); - foreach ($this->receivers as $transportName => $receiver) { - if ($queueNames) { - $envelopes = $receiver->getFromQueues($queueNames); - } else { - $envelopes = $receiver->get(); - } - - foreach ($envelopes as $envelope) { - $envelopeHandled = true; - - $this->rateLimit($transportName); - $this->handleMessage($envelope, $transportName); - $this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false)); - - if ($this->shouldStop) { - break 2; - } - } - // after handling a single receiver, quit and start the loop again - // this should prevent multiple lower priority receivers from - // blocking too long before the higher priority are checked - if ($envelopeHandled) { - break; - } - } + $result = $this->executionStrategy->processQueueTasks($workerExecutionContext); + $envelopeHandled = $result->wereEnvelopesHandled; if (!$envelopeHandled && $this->flush(false)) { continue; @@ -140,7 +122,10 @@ public function run(array $options = []): void $this->eventDispatcher?->dispatch(new WorkerStoppedEvent($this)); } - private function handleMessage(Envelope $envelope, string $transportName): void + /** + * @internal + */ + public function handleMessage(Envelope $envelope, string $transportName): void { $event = new WorkerMessageReceivedEvent($envelope, $transportName); $this->eventDispatcher?->dispatch($event); @@ -222,7 +207,10 @@ private function ack(): bool return (bool) $acks; } - private function rateLimit(string $transportName): void + /** + * @internal + */ + public function rateLimit(string $transportName): void { if (!$this->rateLimiters) { return; @@ -279,4 +267,22 @@ public function getMetadata(): WorkerMetadata { return $this->metadata; } + + public function getEventDispatcher(): ?EventDispatcherInterface + { + return $this->eventDispatcher; + } + + public function getShouldStop(): bool + { + return $this->shouldStop; + } + + /** + * @return ReceiverInterface[] + */ + public function getReceivers(): array + { + return $this->receivers; + } } diff --git a/src/Symfony/Component/Messenger/WorkerExecution/DefaultWorkerExecutionStrategy.php b/src/Symfony/Component/Messenger/WorkerExecution/DefaultWorkerExecutionStrategy.php new file mode 100644 index 0000000000000..858bdc6457fde --- /dev/null +++ b/src/Symfony/Component/Messenger/WorkerExecution/DefaultWorkerExecutionStrategy.php @@ -0,0 +1,47 @@ +getReceivers() as $transportName => $receiver) { + if ($context->getQueueNames()) { + $envelopes = $receiver->getFromQueues($context->getQueueNames()); + } else { + $envelopes = $receiver->get(); + } + + foreach ($envelopes as $envelope) { + $envelopeHandled = true; + + $result = $context->handleMessage($envelope, $transportName); + + if ($result->shouldStop) { + break 2; + } + } + + // after handling a single receiver, quit and start the loop again + // this should prevent multiple lower priority receivers from + // blocking too long before the higher priority are checked + if ($envelopeHandled) { + break; + } + } + + return new WorkerExecutionStrategyResult($envelopeHandled); + } +} diff --git a/src/Symfony/Component/Messenger/WorkerExecution/RankedWorkerExecutionStrategy.php b/src/Symfony/Component/Messenger/WorkerExecution/RankedWorkerExecutionStrategy.php new file mode 100644 index 0000000000000..084eb4e932bae --- /dev/null +++ b/src/Symfony/Component/Messenger/WorkerExecution/RankedWorkerExecutionStrategy.php @@ -0,0 +1,116 @@ +options = $options; + + $this->validateOptions($options); + } + + private function validateOptions(array $options): void + { + if (!array_key_exists('ranks', $options) || !is_array($options['ranks'])) { + throw new RuntimeException('Invalid worker ranked strategy options. Missing key "ranks" or it is not an array'); + } + + foreach ($options['ranks'] as $rankNumber) { + if (!is_int($rankNumber)) { + throw new RuntimeException('Invalid worker ranked strategy options. One of the ranks is not an integer'); + } + } + } + + public static function getAlias(): string + { + return 'com.symfony.ranked'; + } + + public function processQueueTasks(WorkerExecutionStrategyContext $context): WorkerExecutionStrategyResult + { + $envelopeHandled = false; + + foreach ($this->groupReceiversByRanks($context->getReceivers()) as $receiversGroup) { + foreach ($receiversGroup as $transportName => $receiver) { + if ($context->getQueueNames()) { + $envelopes = $receiver->getFromQueues($context->getQueueNames()); + } else { + $envelopes = $receiver->get(); + } + + foreach ($envelopes as $envelope) { + $envelopeHandled = true; + + $result = $context->handleMessage($envelope, $transportName); + + if ($result->shouldStop) { + break 3; + } + } + } + + // after handling a single grouped rank of receivers, quit and start the loop again + // this should prevent multiple lower priority receivers from + // blocking too long before the higher priority are checked + if ($envelopeHandled) { + break; + } + } + + return new WorkerExecutionStrategyResult($envelopeHandled); + } + + /** + * @param ReceiverInterface[] $receivers Where the key is the transport name + * @return array> Ordered groups of receivers by ranks number + */ + private function groupReceiversByRanks(array $receivers): array + { + $receiversRanks = $this->options['ranks']; + $receiversValues = array_values($receivers); + $receiversKeys = array_keys($receivers); + + if (count($receiversRanks) !== count($receivers)) { + throw new RuntimeException('Worker ranked strategy: The count of queue receivers does not match the count of their ranks'); + } + + /** + * @var array> $receiversGroupedByRanks + */ + $receiversGroupedByRanks = []; + foreach ($receiversValues as $index => $receiver) { + $receiversGroupedByRanks[(int) $receiversRanks[$index]][$receiversKeys[$index]] = $receiver; + } + + uksort($receiversGroupedByRanks, static function ($rankA, $rankB) { + return $rankA <=> $rankB; + }); + + return $receiversGroupedByRanks; + } +} diff --git a/src/Symfony/Component/Messenger/WorkerExecution/WorkerExecutionStrategyContext.php b/src/Symfony/Component/Messenger/WorkerExecution/WorkerExecutionStrategyContext.php new file mode 100644 index 0000000000000..08d0d2435af4f --- /dev/null +++ b/src/Symfony/Component/Messenger/WorkerExecution/WorkerExecutionStrategyContext.php @@ -0,0 +1,50 @@ +worker = $worker; + $this->queueNames = $queueNames; + } + + /** + * @return ReceiverInterface[] Where the key is the transport name + */ + public function getReceivers(): array + { + return $this->worker->getReceivers(); + } + + /** + * @return string[] + */ + public function getQueueNames(): array + { + return $this->queueNames; + } + + /** + * The strategy *must* stop executing and return immediately if the result of this function + * requests so. + */ + public function handleMessage(mixed $envelope, int|string $transportName): WorkerMessageHandlingResult + { + $this->worker->rateLimit($transportName); + $this->worker->handleMessage($envelope, $transportName); + $this->worker->getEventDispatcher()?->dispatch(new WorkerRunningEvent($this->worker, false)); + + return new WorkerMessageHandlingResult($this->worker->getShouldStop()); + } +} diff --git a/src/Symfony/Component/Messenger/WorkerExecution/WorkerExecutionStrategyInterface.php b/src/Symfony/Component/Messenger/WorkerExecution/WorkerExecutionStrategyInterface.php new file mode 100644 index 0000000000000..48678e8cc20d2 --- /dev/null +++ b/src/Symfony/Component/Messenger/WorkerExecution/WorkerExecutionStrategyInterface.php @@ -0,0 +1,17 @@ +> */ + private array $strategies = []; + + /** + * @param class-string $strategyClass + */ + public function registerStrategy(string $strategyClass): void + { + $this->strategies[$strategyClass::getAlias()] = $strategyClass; + } + + public function createStrategy(mixed $strategyAlias, mixed $strategyConfig): WorkerExecutionStrategyInterface + { + $strategyClass = $this->strategies[$strategyAlias] ?? null; + if (!$strategyClass) { + throw new RuntimeException("Unknown strategy alias: '{$strategyAlias}'"); + } + + return new $strategyClass($strategyConfig); + } +} diff --git a/src/Symfony/Component/Messenger/WorkerExecution/WorkerExecutionStrategyResult.php b/src/Symfony/Component/Messenger/WorkerExecution/WorkerExecutionStrategyResult.php new file mode 100644 index 0000000000000..a1422e031c599 --- /dev/null +++ b/src/Symfony/Component/Messenger/WorkerExecution/WorkerExecutionStrategyResult.php @@ -0,0 +1,13 @@ +wereEnvelopesHandled = $wereEnvelopesHandled; + } +} diff --git a/src/Symfony/Component/Messenger/WorkerExecution/WorkerMessageHandlingResult.php b/src/Symfony/Component/Messenger/WorkerExecution/WorkerMessageHandlingResult.php new file mode 100644 index 0000000000000..2e1c6e7846e10 --- /dev/null +++ b/src/Symfony/Component/Messenger/WorkerExecution/WorkerMessageHandlingResult.php @@ -0,0 +1,13 @@ +shouldStop = $shouldStop; + } +}