Skip to content

Commit 04d44be

Browse files
committed
chore: add exclude-receivers consume parameters
1 parent 79cd71d commit 04d44be

File tree

2 files changed

+123
-30
lines changed

2 files changed

+123
-30
lines changed

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ protected function configure(): void
7777
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
7878
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
7979
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
80+
new InputOption('exclude-queues', 'eq', InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Exclude specific queues from consumption (can only be used with --all)'),
8081
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
8182
])
8283
->setHelp(<<<'EOF'
@@ -122,6 +123,10 @@ protected function configure(): void
122123
Use the --all option to consume from all receivers:
123124
124125
<info>php %command.full_name% --all</info>
126+
127+
Use the --exclude-queues option to exclude specific queues from consumption (can only be used with --all):
128+
129+
<info>php %command.full_name% --all --exclude-queues=queue1</info>
125130
EOF
126131
)
127132
;
@@ -132,6 +137,11 @@ protected function initialize(InputInterface $input, OutputInterface $output): v
132137
if ($input->hasParameterOption('--keepalive')) {
133138
$this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL));
134139
}
140+
141+
// Validate that exclude-queues is only used with --all
142+
if ($input->getOption('exclude-queues') && !$input->getOption('all')) {
143+
throw new InvalidOptionException('The --exclude-queues option can only be used with the --all option.');
144+
}
135145
}
136146

137147
protected function interact(InputInterface $input, OutputInterface $output): void
@@ -169,9 +179,24 @@ protected function interact(InputInterface $input, OutputInterface $output): voi
169179

170180
protected function execute(InputInterface $input, OutputInterface $output): int
171181
{
182+
// Validate that exclude-queues is only used with --all
183+
if ($input->getOption('exclude-queues') && !$input->getOption('all')) {
184+
throw new InvalidOptionException('The --exclude-queues option can only be used with the --all option.');
185+
}
186+
172187
$receivers = [];
173188
$rateLimiters = [];
174189
$receiverNames = $input->getOption('all') ? $this->receiverNames : $input->getArgument('receivers');
190+
191+
// Filter out excluded queues when using --all
192+
if ($input->getOption('all') && $excludedQueues = $input->getOption('exclude-queues')) {
193+
$receiverNames = array_diff($receiverNames, $excludedQueues);
194+
195+
if (empty($receiverNames)) {
196+
throw new RuntimeException('All queues have been excluded. Please specify at least one queue to consume from.');
197+
}
198+
}
199+
175200
foreach ($receiverNames as $receiverName) {
176201
if (!$this->receiverLocator->has($receiverName)) {
177202
$message = \sprintf('The receiver "%s" does not exist.', $receiverName);
@@ -276,6 +301,10 @@ public function complete(CompletionInput $input, CompletionSuggestions $suggesti
276301
if ($input->mustSuggestOptionValuesFor('bus')) {
277302
$suggestions->suggestValues($this->busIds);
278303
}
304+
305+
if ($input->mustSuggestOptionValuesFor('exclude-queues')) {
306+
$suggestions->suggestValues($this->receiverNames);
307+
}
279308
}
280309

281310
public function getSubscribedSignals(): array

src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Lines changed: 94 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,7 @@ public function testRunWithBusOption()
9595
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());
9696

9797
$application = new Application();
98-
if (method_exists($application, 'addCommand')) {
99-
$application->addCommand($command);
100-
} else {
101-
$application->add($command);
102-
}
98+
$application->add($command);
10399
$tester = new CommandTester($application->get('messenger:consume'));
104100
$tester->execute([
105101
'receivers' => ['dummy-receiver'],
@@ -142,11 +138,7 @@ public function testRunWithResetServicesOption(bool $shouldReset)
142138
$command = new ConsumeMessagesCommand($bus, $receiverLocator, new EventDispatcher(), null, [], new ResetServicesListener($servicesResetter));
143139

144140
$application = new Application();
145-
if (method_exists($application, 'addCommand')) {
146-
$application->addCommand($command);
147-
} else {
148-
$application->add($command);
149-
}
141+
$application->add($command);
150142
$tester = new CommandTester($application->get('messenger:consume'));
151143
$tester->execute(array_merge([
152144
'receivers' => ['dummy-receiver'],
@@ -170,11 +162,7 @@ public function testRunWithInvalidOption(string $option, string $value, string $
170162
$command = new ConsumeMessagesCommand(new RoutableMessageBus(new Container()), $receiverLocator, new EventDispatcher());
171163

172164
$application = new Application();
173-
if (method_exists($application, 'addCommand')) {
174-
$application->addCommand($command);
175-
} else {
176-
$application->add($command);
177-
}
165+
$application->add($command);
178166
$tester = new CommandTester($application->get('messenger:consume'));
179167

180168
$this->expectException(InvalidOptionException::class);
@@ -212,11 +200,7 @@ public function testRunWithTimeLimit()
212200
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());
213201

214202
$application = new Application();
215-
if (method_exists($application, 'addCommand')) {
216-
$application->addCommand($command);
217-
} else {
218-
$application->add($command);
219-
}
203+
$application->add($command);
220204
$tester = new CommandTester($application->get('messenger:consume'));
221205
$tester->execute([
222206
'receivers' => ['dummy-receiver'],
@@ -255,11 +239,7 @@ public function log(...$args): void
255239
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher(), $logger);
256240

257241
$application = new Application();
258-
if (method_exists($application, 'addCommand')) {
259-
$application->addCommand($command);
260-
} else {
261-
$application->add($command);
262-
}
242+
$application->add($command);
263243
$tester = new CommandTester($application->get('messenger:consume'));
264244
$tester->execute([
265245
'receivers' => ['dummy-receiver'],
@@ -300,11 +280,7 @@ public function testRunWithAllOption()
300280
);
301281

302282
$application = new Application();
303-
if (method_exists($application, 'addCommand')) {
304-
$application->addCommand($command);
305-
} else {
306-
$application->add($command);
307-
}
283+
$application->add($command);
308284
$tester = new CommandTester($application->get('messenger:consume'));
309285
$tester->execute([
310286
'--all' => true,
@@ -315,6 +291,92 @@ public function testRunWithAllOption()
315291
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver1, dummy-receiver2"', $tester->getDisplay());
316292
}
317293

294+
public function testRunWithAllAndExcludeQueuesOption()
295+
{
296+
$envelope1 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
297+
$envelope2 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
298+
$envelope3 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
299+
300+
$receiver1 = $this->createMock(ReceiverInterface::class);
301+
$receiver1->method('get')->willReturn([$envelope1]);
302+
$receiver2 = $this->createMock(ReceiverInterface::class);
303+
$receiver2->method('get')->willReturn([$envelope2]);
304+
$receiver3 = $this->createMock(ReceiverInterface::class);
305+
$receiver3->method('get')->willReturn([$envelope3]);
306+
307+
$receiverLocator = new Container();
308+
$receiverLocator->set('dummy-receiver1', $receiver1);
309+
$receiverLocator->set('dummy-receiver2', $receiver2);
310+
$receiverLocator->set('dummy-receiver3', $receiver3);
311+
312+
$bus = $this->createMock(MessageBusInterface::class);
313+
$bus->expects($this->exactly(2))->method('dispatch');
314+
315+
$busLocator = new Container();
316+
$busLocator->set('dummy-bus', $bus);
317+
318+
$command = new ConsumeMessagesCommand(
319+
new RoutableMessageBus($busLocator),
320+
$receiverLocator, new EventDispatcher(),
321+
receiverNames: ['dummy-receiver1', 'dummy-receiver2', 'dummy-receiver3']
322+
);
323+
324+
$application = new Application();
325+
$application->add($command);
326+
$tester = new CommandTester($application->get('messenger:consume'));
327+
$tester->execute([
328+
'--all' => true,
329+
'--exclude-queues' => ['dummy-receiver2'],
330+
'--limit' => 2,
331+
]);
332+
333+
$tester->assertCommandIsSuccessful();
334+
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver1, dummy-receiver3"', $tester->getDisplay());
335+
}
336+
337+
public function testRunWithExcludeQueuesWithoutAllOption()
338+
{
339+
$receiverLocator = new Container();
340+
$receiverLocator->set('dummy-receiver', new \stdClass());
341+
342+
$command = new ConsumeMessagesCommand(new RoutableMessageBus(new Container()), $receiverLocator, new EventDispatcher());
343+
344+
$application = new Application();
345+
$application->add($command);
346+
$tester = new CommandTester($application->get('messenger:consume'));
347+
348+
$this->expectException(InvalidOptionException::class);
349+
$this->expectExceptionMessage('The --exclude-queues option can only be used with the --all option.');
350+
$tester->execute([
351+
'receivers' => ['dummy-receiver'],
352+
'--exclude-queues' => ['dummy-receiver'],
353+
]);
354+
}
355+
356+
public function testRunWithAllAndExcludeAllQueues()
357+
{
358+
$receiverLocator = new Container();
359+
$receiverLocator->set('dummy-receiver1', new \stdClass());
360+
$receiverLocator->set('dummy-receiver2', new \stdClass());
361+
362+
$command = new ConsumeMessagesCommand(
363+
new RoutableMessageBus(new Container()),
364+
$receiverLocator, new EventDispatcher(),
365+
receiverNames: ['dummy-receiver1', 'dummy-receiver2']
366+
);
367+
368+
$application = new Application();
369+
$application->add($command);
370+
$tester = new CommandTester($application->get('messenger:consume'));
371+
372+
$this->expectException(\Symfony\Component\Console\Exception\RuntimeException::class);
373+
$this->expectExceptionMessage('All queues have been excluded. Please specify at least one queue to consume from.');
374+
$tester->execute([
375+
'--all' => true,
376+
'--exclude-queues' => ['dummy-receiver1', 'dummy-receiver2'],
377+
]);
378+
}
379+
318380
/**
319381
* @dataProvider provideCompletionSuggestions
320382
*/
@@ -333,5 +395,7 @@ public static function provideCompletionSuggestions()
333395
yield 'receiver (value)' => [['async'], ['async', 'async_high', 'failed']];
334396
yield 'receiver (no repeat)' => [['async', ''], ['async_high', 'failed']];
335397
yield 'option --bus' => [['--bus', ''], ['messenger.bus.default']];
398+
yield 'option --exclude-queues' => [['--exclude-queues', ''], ['async', 'async_high', 'failed']];
399+
yield 'option --exclude-queues (value)' => [['--exclude-queues', 'async'], ['async', 'async_high', 'failed']];
336400
}
337401
}

0 commit comments

Comments
 (0)