1
+ <?php
2
+
3
+ /*
4
+ * This file is part of the Symfony package.
5
+ *
6
+ * (c) Fabien Potencier <fabien@symfony.com>
7
+ *
8
+ * For the full copyright and license information, please view the LICENSE
9
+ * file that was distributed with this source code.
10
+ */
11
+
12
+ namespace Symfony \Component \Messenger \Handler ;
13
+
14
+ use Amp \Future ;
15
+ use Symfony \Component \Messenger \Stamp \FutureStamp ;
16
+ use Symfony \Component \Messenger \ParallelMessageBus ;
17
+ use Symfony \Component \Messenger \Envelope ;
18
+
19
+ /**
20
+ * A batch handler trait designed for parallel execution using ParallelMessageBus.
21
+ *
22
+ * This trait collects jobs in worker-specific batches and processes them
23
+ * in parallel by dispatching each job individually through ParallelMessageBus.
24
+ */
25
+ trait BatchAsyncHandlerTrait
26
+ {
27
+ /** @var array<string,array> Map of worker IDs to their job batches */
28
+ private array $ workerJobs = [];
29
+
30
+ /** @var ParallelMessageBus|null */
31
+ private ?ParallelMessageBus $ parallelBus = null ;
32
+
33
+ /**
34
+ * Set the parallel message bus to use for dispatching jobs.
35
+ */
36
+ public function setParallelMessageBus (ParallelMessageBus $ bus ): void
37
+ {
38
+ $ this ->parallelBus = $ bus ;
39
+ }
40
+
41
+ public function flush (bool $ force ): void
42
+ {
43
+ $ workerId = $ this ->getCurrentWorkerId ();
44
+
45
+ if (isset ($ this ->workerJobs [$ workerId ]) && $ jobs = $ this ->workerJobs [$ workerId ]) {
46
+ $ this ->workerJobs [$ workerId ] = [];
47
+
48
+ if ($ this ->parallelBus ) {
49
+ // Process each job in parallel using ParallelMessageBus
50
+ $ futures = [];
51
+
52
+ foreach ($ jobs as [$ message , $ ack ]) {
53
+ // Dispatch each message individually
54
+ $ envelope = $ this ->parallelBus ->dispatch ($ message );
55
+
56
+ $ futureStamp = $ envelope ->last (FutureStamp::class);
57
+ if ($ futureStamp ) {
58
+ /** @var Future $future */
59
+ $ future = $ futureStamp ->getFuture ();
60
+ $ futures [] = [$ future , $ ack ];
61
+ }
62
+ }
63
+
64
+ // If force is true, wait for all results
65
+ if ($ force && $ futures ) {
66
+ foreach ($ futures as [$ future , $ ack ]) {
67
+ try {
68
+ $ result = $ future ->await ();
69
+ $ ack ->ack ($ result );
70
+ } catch (\Throwable $ e ) {
71
+ $ ack ->nack ($ e );
72
+ }
73
+ }
74
+ }
75
+ } else {
76
+ // Fallback to synchronous processing
77
+ $ this ->process ($ jobs );
78
+ }
79
+ }
80
+ }
81
+
82
+ /**
83
+ * @param Acknowledger|null $ack The function to call to ack/nack the $message.
84
+ *
85
+ * @return mixed The number of pending messages in the batch if $ack is not null,
86
+ * the result from handling the message otherwise
87
+ */
88
+ private function handle (object $ message , ?Acknowledger $ ack ): mixed
89
+ {
90
+ $ workerId = $ this ->getCurrentWorkerId ();
91
+
92
+ if (!isset ($ this ->workerJobs [$ workerId ])) {
93
+ $ this ->workerJobs [$ workerId ] = [];
94
+ }
95
+
96
+ if (null === $ ack ) {
97
+ $ ack = new Acknowledger (get_debug_type ($ this ));
98
+ $ this ->workerJobs [$ workerId ][] = [$ message , $ ack ];
99
+ $ this ->flush (true );
100
+
101
+ return $ ack ->getResult ();
102
+ }
103
+
104
+ $ this ->workerJobs [$ workerId ][] = [$ message , $ ack ];
105
+ if (!$ this ->shouldFlush ()) {
106
+ return \count ($ this ->workerJobs [$ workerId ]);
107
+ }
108
+
109
+ $ this ->flush (true );
110
+
111
+ return 0 ;
112
+ }
113
+
114
+ private function shouldFlush (): bool
115
+ {
116
+ $ workerId = $ this ->getCurrentWorkerId ();
117
+ return $ this ->getBatchSize () <= \count ($ this ->workerJobs [$ workerId ] ?? []);
118
+ }
119
+
120
+ /**
121
+ * Generates a unique identifier for the current worker context.
122
+ */
123
+ private function getCurrentWorkerId (): string
124
+ {
125
+ // In a worker pool, each worker has a unique ID
126
+ return getmypid () ?: 'default-worker ' ;
127
+ }
128
+
129
+ /**
130
+ * Cleans up worker-specific resources when a worker completes its job.
131
+ */
132
+ public function cleanupWorker (): void
133
+ {
134
+ $ workerId = $ this ->getCurrentWorkerId ();
135
+
136
+ // Flush any remaining jobs before cleaning up
137
+ if (isset ($ this ->workerJobs [$ workerId ]) && !empty ($ this ->workerJobs [$ workerId ])) {
138
+ $ this ->flush (true );
139
+ }
140
+
141
+ unset($ this ->workerJobs [$ workerId ]);
142
+ }
143
+
144
+ /**
145
+ * Completes the jobs in the list.
146
+ * This is used as a fallback when ParallelMessageBus is not available.
147
+ *
148
+ * @param list<array{0: object, 1: Acknowledger}> $jobs A list of pairs of messages and their corresponding acknowledgers
149
+ */
150
+ abstract private function process (array $ jobs ): void ;
151
+
152
+ private function getBatchSize (): int
153
+ {
154
+ return 10 ;
155
+ }
156
+ }
0 commit comments