Skip to content

[Messenger] Add BatchAsyncHandlerTrait for ParallelMessageBus #60080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: 7.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Bundle\FrameworkBundle\DependencyInjection;

use Amp\Parallel\Worker\Task;
use Composer\InstalledVersions;
use Http\Client\HttpAsyncClient;
use Http\Client\HttpClient;
Expand Down Expand Up @@ -113,6 +114,7 @@
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Middleware\RouterContextMiddleware;
use Symfony\Component\Messenger\ParallelMessageBus;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface as MessengerTransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
Expand Down Expand Up @@ -2090,6 +2092,10 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder

$loader->load('messenger.php');

if (!class_exists(ParallelMessageBus::class)) {
$container->removeDefinition('parallel_bus');
}

if (!interface_exists(DenormalizerInterface::class)) {
$container->removeDefinition('serializer.normalizer.flatten_exception');
}
Expand Down Expand Up @@ -2161,7 +2167,12 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder

if ($busId === $config['default_bus']) {
$container->setAlias('messenger.default_bus', $busId)->setPublic(true);
$container->setAlias(MessageBusInterface::class, $busId);

$messageBusAlias = $container->setAlias(MessageBusInterface::class, $busId);

if (class_exists(Task::class)) {
$messageBusAlias->setPublic(true);
}
} else {
$container->registerAliasForArgument($busId, MessageBusInterface::class);
}
Expand Down
11 changes: 11 additions & 0 deletions src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Middleware\TraceableMiddleware;
use Symfony\Component\Messenger\Middleware\ValidationMiddleware;
use Symfony\Component\Messenger\ParallelMessageBus;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory;
Expand All @@ -54,6 +55,7 @@
abstract_arg('per message senders map'),
abstract_arg('senders service locator'),
])

->set('messenger.middleware.send_message', SendMessageMiddleware::class)
->abstract()
->args([
Expand Down Expand Up @@ -134,6 +136,15 @@
])
->tag('messenger.transport_factory')

->set('parallel_bus', ParallelMessageBus::class)
->args([
[],
param('kernel.environment'),
param('kernel.debug'),
param('kernel.project_dir'),
])
->tag('messenger.bus')

->set('messenger.transport.in_memory.factory', InMemoryTransportFactory::class)
->args([
service('clock')->nullOnInvalid(),
Expand Down
6 changes: 6 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
CHANGELOG
=========

7.3
---

* Add `Symfony\Component\Messenger\Handler\BatchAsyncHandlerTrait` designed for parallel execution using ParallelMessageBus


7.2
---

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ protected function configure(): void
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
new InputOption('parallel-limit', 'p', InputOption::VALUE_REQUIRED, 'The number of concurrent processes', 10),
])
->setHelp(<<<'EOF'
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
Expand Down Expand Up @@ -240,6 +241,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$options['queues'] = $queues;
}

$options['parallel-limit'] = $input->getOption('parallel-limit');

try {
$this->worker->run($options);
} finally {
Expand Down
88 changes: 88 additions & 0 deletions src/Symfony/Component/Messenger/DispatchTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger;

use Amp\Cache\LocalCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
use App\Kernel;
use Psr\Container\ContainerInterface;
use Symfony\Component\Dotenv\Dotenv;
use Symfony\Component\Messenger\Stamp\AckStamp;

class DispatchTask implements Task
{
private static ?LocalCache $cache = null;

public function __construct(
private Envelope $envelope,
private array $stamps,
private readonly string $env,
private readonly bool $isDebug,
private readonly string $projectDir,
) {
if (!class_exists(LocalCache::class)) {
throw new \LogicException(\sprintf('Package "amp/cache" is required to use the "%s". Try running "composer require amphp/cache".', LocalCache::class));
}
}

public function run(Channel $channel, Cancellation $cancellation): mixed
{
$container = $this->getContainer();
$envelope = $this->dispatch($container, $channel);

return $envelope->withoutStampsOfType(AckStamp::class);
}

private function dispatch(ContainerInterface $container, $channel)
{
$messageBus = $container->get(MessageBusInterface::class);

return $messageBus->dispatch($this->envelope, $this->stamps);
}

private function getContainer()
{
$cache = self::$cache ??= new LocalCache();
$container = $cache->get('cache-container');

// if not in cache, create container
if (!$container) {
if (!method_exists(Dotenv::class, 'bootEnv')) {
throw new \LogicException(\sprintf("Method bootEnv de \"%s\" doesn't exist.", Dotenv::class));
}

(new Dotenv())->bootEnv($this->projectDir.'/.env');

if (!class_exists(Kernel::class) && !isset($_ENV['KERNEL_CLASS'])) {
throw new \LogicException('You must set the KERNEL_CLASS environment variable to the fully-qualified class name of your Kernel in .env or have "%s" class.', Kernel::class);
} elseif (isset($_ENV['KERNEL_CLASS'])) {
$kernel = new $_ENV['KERNEL_CLASS']($this->env, $this->isDebug);
} else {
$kernel = new Kernel($this->env, $this->isDebug);
}

$kernel->boot();

$container = $kernel->getContainer();
$cache->set('cache-container', $container);
}

return $container;
}

public function getEnvelope(): Envelope
{
return $this->envelope;
}
}
156 changes: 156 additions & 0 deletions src/Symfony/Component/Messenger/Handler/BatchAsyncHandlerTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Handler;

use Amp\Future;
use Symfony\Component\Messenger\Stamp\FutureStamp;
use Symfony\Component\Messenger\ParallelMessageBus;
use Symfony\Component\Messenger\Envelope;

/**
* A batch handler trait designed for parallel execution using ParallelMessageBus.
*
* This trait collects jobs in worker-specific batches and processes them
* in parallel by dispatching each job individually through ParallelMessageBus.
*/
trait BatchAsyncHandlerTrait
{
/** @var array<string,array> Map of worker IDs to their job batches */
private array $workerJobs = [];

/** @var ParallelMessageBus|null */
private ?ParallelMessageBus $parallelBus = null;

/**
* Set the parallel message bus to use for dispatching jobs.
*/
public function setParallelMessageBus(ParallelMessageBus $bus): void
{
$this->parallelBus = $bus;
}

public function flush(bool $force): void
{
$workerId = $this->getCurrentWorkerId();

if (isset($this->workerJobs[$workerId]) && $jobs = $this->workerJobs[$workerId]) {
$this->workerJobs[$workerId] = [];

if ($this->parallelBus) {
// Process each job in parallel using ParallelMessageBus
$futures = [];

foreach ($jobs as [$message, $ack]) {
// Dispatch each message individually
$envelope = $this->parallelBus->dispatch($message);

$futureStamp = $envelope->last(FutureStamp::class);
if ($futureStamp) {
/** @var Future $future */
$future = $futureStamp->getFuture();
$futures[] = [$future, $ack];
}
}

// If force is true, wait for all results
if ($force && $futures) {
foreach ($futures as [$future, $ack]) {
try {
$result = $future->await();
$ack->ack($result);
} catch (\Throwable $e) {
$ack->nack($e);
}
}
}
} else {
// Fallback to synchronous processing
$this->process($jobs);
}
}
}

/**
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
*
* @return mixed The number of pending messages in the batch if $ack is not null,
* the result from handling the message otherwise
*/
private function handle(object $message, ?Acknowledger $ack): mixed
{
$workerId = $this->getCurrentWorkerId();

if (!isset($this->workerJobs[$workerId])) {
$this->workerJobs[$workerId] = [];
}

if (null === $ack) {
$ack = new Acknowledger(get_debug_type($this));
$this->workerJobs[$workerId][] = [$message, $ack];
$this->flush(true);

return $ack->getResult();
}

$this->workerJobs[$workerId][] = [$message, $ack];
if (!$this->shouldFlush()) {
return \count($this->workerJobs[$workerId]);
}

$this->flush(true);

return 0;
}

private function shouldFlush(): bool
{
$workerId = $this->getCurrentWorkerId();
return $this->getBatchSize() <= \count($this->workerJobs[$workerId] ?? []);
}

/**
* Generates a unique identifier for the current worker context.
*/
private function getCurrentWorkerId(): string
{
// In a worker pool, each worker has a unique ID
return getmypid() ?: 'default-worker';
}

/**
* Cleans up worker-specific resources when a worker completes its job.
*/
public function cleanupWorker(): void
{
$workerId = $this->getCurrentWorkerId();

// Flush any remaining jobs before cleaning up
if (isset($this->workerJobs[$workerId]) && !empty($this->workerJobs[$workerId])) {
$this->flush(true);
}

unset($this->workerJobs[$workerId]);
}

/**
* Completes the jobs in the list.
* This is used as a fallback when ParallelMessageBus is not available.
*
* @param list<array{0: object, 1: Acknowledger}> $jobs A list of pairs of messages and their corresponding acknowledgers
*/
abstract private function process(array $jobs): void;

private function getBatchSize(): int
{
return 10;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
use Symfony\Component\Messenger\Stamp\AckStamp;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;
Expand Down Expand Up @@ -64,6 +65,10 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope

/** @var AckStamp $ackStamp */
if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
if ($envelope->last(BusNameStamp::class) && 'parallel_bus' === $envelope->last(BusNameStamp::class)->getBusName()) {
throw new HandlerFailedException($envelope, [new LogicException("Parallel bus can't be used for batch messages")]);
}

$ack = new Acknowledger(get_debug_type($batchHandler), static function (?\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) {
if (null !== $e) {
$e = new HandlerFailedException($envelope, [$handlerDescriptor->getName() => $e]);
Expand All @@ -75,7 +80,6 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
});

$result = $this->callHandler($handler, $message, $ack, $envelope->last(HandlerArgumentsStamp::class));

if (!\is_int($result) || 0 > $result) {
throw new LogicException(\sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler)));
}
Expand Down
Loading