Skip to content

Commit 3ca4340

Browse files
committed
chore: add exclude-receivers consume parameters
1 parent 79cd71d commit 3ca4340

File tree

2 files changed

+175
-14
lines changed

2 files changed

+175
-14
lines changed

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ protected function configure(): void
7777
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
7878
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
7979
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
80+
new InputOption('exclude-queues', 'eq', InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Exclude specific queues from consumption (can only be used with --all)'),
8081
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
8182
])
8283
->setHelp(<<<'EOF'
@@ -122,6 +123,10 @@ protected function configure(): void
122123
Use the --all option to consume from all receivers:
123124
124125
<info>php %command.full_name% --all</info>
126+
127+
Use the --exclude-queues option to exclude specific queues from consumption (can only be used with --all):
128+
129+
<info>php %command.full_name% --all --exclude-queues=queue1</info>
125130
EOF
126131
)
127132
;
@@ -132,6 +137,11 @@ protected function initialize(InputInterface $input, OutputInterface $output): v
132137
if ($input->hasParameterOption('--keepalive')) {
133138
$this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL));
134139
}
140+
141+
// Validate that exclude-queues is only used with --all
142+
if ($input->getOption('exclude-queues') && !$input->getOption('all')) {
143+
throw new InvalidOptionException('The --exclude-queues option can only be used with the --all option.');
144+
}
135145
}
136146

137147
protected function interact(InputInterface $input, OutputInterface $output): void
@@ -169,9 +179,24 @@ protected function interact(InputInterface $input, OutputInterface $output): voi
169179

170180
protected function execute(InputInterface $input, OutputInterface $output): int
171181
{
182+
// Validate that exclude-queues is only used with --all
183+
if ($input->getOption('exclude-queues') && !$input->getOption('all')) {
184+
throw new InvalidOptionException('The --exclude-queues option can only be used with the --all option.');
185+
}
186+
172187
$receivers = [];
173188
$rateLimiters = [];
174189
$receiverNames = $input->getOption('all') ? $this->receiverNames : $input->getArgument('receivers');
190+
191+
// Filter out excluded queues when using --all
192+
if ($input->getOption('all') && $excludedQueues = $input->getOption('exclude-queues')) {
193+
$receiverNames = array_diff($receiverNames, $excludedQueues);
194+
195+
if (empty($receiverNames)) {
196+
throw new RuntimeException('All queues have been excluded. Please specify at least one queue to consume from.');
197+
}
198+
}
199+
175200
foreach ($receiverNames as $receiverName) {
176201
if (!$this->receiverLocator->has($receiverName)) {
177202
$message = \sprintf('The receiver "%s" does not exist.', $receiverName);
@@ -276,6 +301,10 @@ public function complete(CompletionInput $input, CompletionSuggestions $suggesti
276301
if ($input->mustSuggestOptionValuesFor('bus')) {
277302
$suggestions->suggestValues($this->busIds);
278303
}
304+
305+
if ($input->mustSuggestOptionValuesFor('exclude-queues')) {
306+
$suggestions->suggestValues($this->receiverNames);
307+
}
279308
}
280309

281310
public function getSubscribedSignals(): array

src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Lines changed: 146 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -315,23 +315,155 @@ public function testRunWithAllOption()
315315
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver1, dummy-receiver2"', $tester->getDisplay());
316316
}
317317

318-
/**
319-
* @dataProvider provideCompletionSuggestions
320-
*/
321-
public function testComplete(array $input, array $expectedSuggestions)
318+
public function testRunWithExcludeQueuesOption()
322319
{
323-
$bus = $this->createMock(RoutableMessageBus::class);
324-
$command = new ConsumeMessagesCommand($bus, new Container(), new EventDispatcher(), null, ['async', 'async_high', 'failed'], null, ['messenger.bus.default']);
325-
$tester = new CommandCompletionTester($command);
326-
$suggestions = $tester->complete($input);
327-
$this->assertSame($expectedSuggestions, $suggestions);
320+
$envelope1 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
321+
$envelope2 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
322+
$envelope3 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
323+
324+
$receiver1 = $this->createMock(ReceiverInterface::class);
325+
$receiver1->method('get')->willReturn([$envelope1]);
326+
$receiver2 = $this->createMock(ReceiverInterface::class);
327+
$receiver2->method('get')->willReturn([$envelope2]);
328+
$receiver3 = $this->createMock(ReceiverInterface::class);
329+
$receiver3->method('get')->willReturn([$envelope3]);
330+
331+
$receiverLocator = new Container();
332+
$receiverLocator->set('dummy-receiver1', $receiver1);
333+
$receiverLocator->set('dummy-receiver2', $receiver2);
334+
$receiverLocator->set('dummy-receiver3', $receiver3);
335+
336+
$bus = $this->createMock(MessageBusInterface::class);
337+
// Only 2 messages should be dispatched (receiver1 and receiver3, receiver2 is excluded)
338+
$bus->expects($this->exactly(2))->method('dispatch');
339+
340+
$busLocator = new Container();
341+
$busLocator->set('dummy-bus', $bus);
342+
343+
$command = new ConsumeMessagesCommand(
344+
new RoutableMessageBus($busLocator),
345+
$receiverLocator, new EventDispatcher(),
346+
receiverNames: ['dummy-receiver1', 'dummy-receiver2', 'dummy-receiver3']
347+
);
348+
349+
$application = new Application();
350+
if (method_exists($application, 'addCommand')) {
351+
$application->addCommand($command);
352+
} else {
353+
$application->add($command);
354+
}
355+
$tester = new CommandTester($application->get('messenger:consume'));
356+
$tester->execute([
357+
'--all' => true,
358+
'--exclude-queues' => ['dummy-receiver2'],
359+
'--limit' => 2,
360+
]);
361+
362+
$tester->assertCommandIsSuccessful();
363+
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver1, dummy-receiver3"', $tester->getDisplay());
328364
}
329365

330-
public static function provideCompletionSuggestions()
366+
public function testRunWithExcludeQueuesMultipleQueues()
331367
{
332-
yield 'receiver' => [[''], ['async', 'async_high', 'failed']];
333-
yield 'receiver (value)' => [['async'], ['async', 'async_high', 'failed']];
334-
yield 'receiver (no repeat)' => [['async', ''], ['async_high', 'failed']];
335-
yield 'option --bus' => [['--bus', ''], ['messenger.bus.default']];
368+
$envelope1 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
369+
$envelope2 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
370+
$envelope3 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
371+
$envelope4 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
372+
373+
$receiver1 = $this->createMock(ReceiverInterface::class);
374+
$receiver1->method('get')->willReturn([$envelope1]);
375+
$receiver2 = $this->createMock(ReceiverInterface::class);
376+
$receiver2->method('get')->willReturn([$envelope2]);
377+
$receiver3 = $this->createMock(ReceiverInterface::class);
378+
$receiver3->method('get')->willReturn([$envelope3]);
379+
$receiver4 = $this->createMock(ReceiverInterface::class);
380+
$receiver4->method('get')->willReturn([$envelope4]);
381+
382+
$receiverLocator = new Container();
383+
$receiverLocator->set('dummy-receiver1', $receiver1);
384+
$receiverLocator->set('dummy-receiver2', $receiver2);
385+
$receiverLocator->set('dummy-receiver3', $receiver3);
386+
$receiverLocator->set('dummy-receiver4', $receiver4);
387+
388+
$bus = $this->createMock(MessageBusInterface::class);
389+
// Only 2 messages should be dispatched (receiver1 and receiver4, receiver2 and receiver3 are excluded)
390+
$bus->expects($this->exactly(2))->method('dispatch');
391+
392+
$busLocator = new Container();
393+
$busLocator->set('dummy-bus', $bus);
394+
395+
$command = new ConsumeMessagesCommand(
396+
new RoutableMessageBus($busLocator),
397+
$receiverLocator, new EventDispatcher(),
398+
receiverNames: ['dummy-receiver1', 'dummy-receiver2', 'dummy-receiver3', 'dummy-receiver4']
399+
);
400+
401+
$application = new Application();
402+
if (method_exists($application, 'addCommand')) {
403+
$application->addCommand($command);
404+
} else {
405+
$application->add($command);
406+
}
407+
$tester = new CommandTester($application->get('messenger:consume'));
408+
$tester->execute([
409+
'--all' => true,
410+
'--exclude-queues' => ['dummy-receiver2', 'dummy-receiver3'],
411+
'--limit' => 2,
412+
]);
413+
414+
$tester->assertCommandIsSuccessful();
415+
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver1, dummy-receiver4"', $tester->getDisplay());
416+
}
417+
418+
public function testExcludeQueuesWithoutAllOptionThrowsException()
419+
{
420+
$receiverLocator = new Container();
421+
$receiverLocator->set('dummy-receiver', new \stdClass());
422+
423+
$command = new ConsumeMessagesCommand(new RoutableMessageBus(new Container()), $receiverLocator, new EventDispatcher());
424+
425+
$application = new Application();
426+
if (method_exists($application, 'addCommand')) {
427+
$application->addCommand($command);
428+
} else {
429+
$application->add($command);
430+
}
431+
$tester = new CommandTester($application->get('messenger:consume'));
432+
433+
$this->expectException(InvalidOptionException::class);
434+
$this->expectExceptionMessage('The --exclude-queues option can only be used with the --all option.');
435+
$tester->execute([
436+
'receivers' => ['dummy-receiver'],
437+
'--exclude-queues' => ['dummy-receiver'],
438+
]);
439+
}
440+
441+
public function testExcludeQueuesWithAllQueuesExcludedThrowsException()
442+
{
443+
$receiverLocator = new Container();
444+
$receiverLocator->set('dummy-receiver1', new \stdClass());
445+
$receiverLocator->set('dummy-receiver2', new \stdClass());
446+
447+
$command = new ConsumeMessagesCommand(
448+
new RoutableMessageBus(new Container()),
449+
$receiverLocator,
450+
new EventDispatcher(),
451+
receiverNames: ['dummy-receiver1', 'dummy-receiver2']
452+
);
453+
454+
$application = new Application();
455+
if (method_exists($application, 'addCommand')) {
456+
$application->addCommand($command);
457+
} else {
458+
$application->add($command);
459+
}
460+
$tester = new CommandTester($application->get('messenger:consume'));
461+
462+
$this->expectException(\Symfony\Component\Console\Exception\RuntimeException::class);
463+
$this->expectExceptionMessage('All queues have been excluded. Please specify at least one queue to consume from.');
464+
$tester->execute([
465+
'--all' => true,
466+
'--exclude-queues' => ['dummy-receiver1', 'dummy-receiver2'],
467+
]);
336468
}
337469
}

0 commit comments

Comments
 (0)