Skip to content

Commit 29b4e97

Browse files
committed
Implement BatchAsyncHandlerTrait for parallel message processing
1 parent 522a316 commit 29b4e97

File tree

3 files changed

+397
-0
lines changed

3 files changed

+397
-0
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
CHANGELOG
22
=========
33

4+
7.3
5+
---
6+
7+
* Add `Symfony\Component\Messenger\Handler\BatchAsyncHandlerTrait` designed for parallel execution using ParallelMessageBus
8+
9+
410
7.2
511
---
612

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Amp\Future;
15+
use Symfony\Component\Messenger\Stamp\FutureStamp;
16+
use Symfony\Component\Messenger\ParallelMessageBus;
17+
use Symfony\Component\Messenger\Envelope;
18+
19+
/**
20+
* A batch handler trait designed for parallel execution using ParallelMessageBus.
21+
*
22+
* This trait collects jobs in worker-specific batches and processes them
23+
* in parallel by dispatching each job individually through ParallelMessageBus.
24+
*/
25+
trait BatchAsyncHandlerTrait
26+
{
27+
/** @var array<string,array> Map of worker IDs to their job batches */
28+
private array $workerJobs = [];
29+
30+
/** @var ParallelMessageBus|null */
31+
private ?ParallelMessageBus $parallelBus = null;
32+
33+
/**
34+
* Set the parallel message bus to use for dispatching jobs.
35+
*/
36+
public function setParallelMessageBus(ParallelMessageBus $bus): void
37+
{
38+
$this->parallelBus = $bus;
39+
}
40+
41+
public function flush(bool $force): void
42+
{
43+
$workerId = $this->getCurrentWorkerId();
44+
45+
if (isset($this->workerJobs[$workerId]) && $jobs = $this->workerJobs[$workerId]) {
46+
$this->workerJobs[$workerId] = [];
47+
48+
if ($this->parallelBus) {
49+
// Process each job in parallel using ParallelMessageBus
50+
$futures = [];
51+
52+
foreach ($jobs as [$message, $ack]) {
53+
// Dispatch each message individually
54+
$envelope = $this->parallelBus->dispatch($message);
55+
56+
$futureStamp = $envelope->last(FutureStamp::class);
57+
if ($futureStamp) {
58+
/** @var Future $future */
59+
$future = $futureStamp->getFuture();
60+
$futures[] = [$future, $ack];
61+
}
62+
}
63+
64+
// If force is true, wait for all results
65+
if ($force && $futures) {
66+
foreach ($futures as [$future, $ack]) {
67+
try {
68+
$result = $future->await();
69+
$ack->ack($result);
70+
} catch (\Throwable $e) {
71+
$ack->nack($e);
72+
}
73+
}
74+
}
75+
} else {
76+
// Fallback to synchronous processing
77+
$this->process($jobs);
78+
}
79+
}
80+
}
81+
82+
/**
83+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
84+
*
85+
* @return mixed The number of pending messages in the batch if $ack is not null,
86+
* the result from handling the message otherwise
87+
*/
88+
private function handle(object $message, ?Acknowledger $ack): mixed
89+
{
90+
$workerId = $this->getCurrentWorkerId();
91+
92+
if (!isset($this->workerJobs[$workerId])) {
93+
$this->workerJobs[$workerId] = [];
94+
}
95+
96+
if (null === $ack) {
97+
$ack = new Acknowledger(get_debug_type($this));
98+
$this->workerJobs[$workerId][] = [$message, $ack];
99+
$this->flush(true);
100+
101+
return $ack->getResult();
102+
}
103+
104+
$this->workerJobs[$workerId][] = [$message, $ack];
105+
if (!$this->shouldFlush()) {
106+
return \count($this->workerJobs[$workerId]);
107+
}
108+
109+
$this->flush(true);
110+
111+
return 0;
112+
}
113+
114+
private function shouldFlush(): bool
115+
{
116+
$workerId = $this->getCurrentWorkerId();
117+
return $this->getBatchSize() <= \count($this->workerJobs[$workerId] ?? []);
118+
}
119+
120+
/**
121+
* Generates a unique identifier for the current worker context.
122+
*/
123+
private function getCurrentWorkerId(): string
124+
{
125+
// In a worker pool, each worker has a unique ID
126+
return getmypid() ?: 'default-worker';
127+
}
128+
129+
/**
130+
* Cleans up worker-specific resources when a worker completes its job.
131+
*/
132+
public function cleanupWorker(): void
133+
{
134+
$workerId = $this->getCurrentWorkerId();
135+
136+
// Flush any remaining jobs before cleaning up
137+
if (isset($this->workerJobs[$workerId]) && !empty($this->workerJobs[$workerId])) {
138+
$this->flush(true);
139+
}
140+
141+
unset($this->workerJobs[$workerId]);
142+
}
143+
144+
/**
145+
* Completes the jobs in the list.
146+
* This is used as a fallback when ParallelMessageBus is not available.
147+
*
148+
* @param list<array{0: object, 1: Acknowledger}> $jobs A list of pairs of messages and their corresponding acknowledgers
149+
*/
150+
abstract private function process(array $jobs): void;
151+
152+
private function getBatchSize(): int
153+
{
154+
return 10;
155+
}
156+
}

0 commit comments

Comments
 (0)