Skip to content

Commit 167c1fe

Browse files
committed
chore: add exclude-receivers consume parameters
1 parent 79cd71d commit 167c1fe

File tree

3 files changed

+180
-1
lines changed

3 files changed

+180
-1
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
7.3
55
---
66

7+
* Add `--exclude-receivers` option to the `messenger:consume command`
78
* Add `CloseableTransportInterface` to allow closing the transport
89
* Add `SentForRetryStamp` that identifies whether a failed message was sent for retry
910
* Add `Symfony\Component\Messenger\Middleware\DeduplicateMiddleware` and `Symfony\Component\Messenger\Stamp\DeduplicateStamp`

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ protected function configure(): void
7777
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
7878
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
7979
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
80+
new InputOption('exclude-receivers', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Exclude specific receivers/transports from consumption (can only be used with --all)'),
8081
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
8182
])
8283
->setHelp(<<<'EOF'
@@ -122,6 +123,10 @@ protected function configure(): void
122123
Use the --all option to consume from all receivers:
123124
124125
<info>php %command.full_name% --all</info>
126+
127+
Use the --exclude-receivers option to exclude specific receivers/transports from consumption (can only be used with --all):
128+
129+
<info>php %command.full_name% --all --exclude-receivers=<receiver-name></info>
125130
EOF
126131
)
127132
;
@@ -132,6 +137,10 @@ protected function initialize(InputInterface $input, OutputInterface $output): v
132137
if ($input->hasParameterOption('--keepalive')) {
133138
$this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL));
134139
}
140+
141+
if ($input->getOption('exclude-receivers') && !$input->getOption('all')) {
142+
throw new InvalidOptionException('The --exclude-receivers option can only be used with the --all option.');
143+
}
135144
}
136145

137146
protected function interact(InputInterface $input, OutputInterface $output): void
@@ -169,9 +178,22 @@ protected function interact(InputInterface $input, OutputInterface $output): voi
169178

170179
protected function execute(InputInterface $input, OutputInterface $output): int
171180
{
181+
if ($input->getOption('exclude-receivers') && !$input->getOption('all')) {
182+
throw new InvalidOptionException('The --exclude-receivers option can only be used with the --all option.');
183+
}
184+
172185
$receivers = [];
173186
$rateLimiters = [];
174187
$receiverNames = $input->getOption('all') ? $this->receiverNames : $input->getArgument('receivers');
188+
189+
if ($input->getOption('all') && $excludedTransports = $input->getOption('exclude-receivers')) {
190+
$receiverNames = array_diff($receiverNames, $excludedTransports);
191+
192+
if (!$receiverNames) {
193+
throw new RuntimeException('All transports/receivers have been excluded. Please specify at least one to consume from.');
194+
}
195+
}
196+
175197
foreach ($receiverNames as $receiverName) {
176198
if (!$this->receiverLocator->has($receiverName)) {
177199
$message = \sprintf('The receiver "%s" does not exist.', $receiverName);
@@ -276,6 +298,10 @@ public function complete(CompletionInput $input, CompletionSuggestions $suggesti
276298
if ($input->mustSuggestOptionValuesFor('bus')) {
277299
$suggestions->suggestValues($this->busIds);
278300
}
301+
302+
if ($input->mustSuggestOptionValuesFor('exclude-receivers')) {
303+
$suggestions->suggestValues($this->receiverNames);
304+
}
279305
}
280306

281307
public function getSubscribedSignals(): array

src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ public function testRunWithMemoryLimit()
242242
$busLocator = new Container();
243243
$busLocator->set('dummy-bus', $bus);
244244

245-
$logger = new class() implements LoggerInterface {
245+
$logger = new class implements LoggerInterface {
246246
use LoggerTrait;
247247

248248
public array $logs = [];
@@ -334,4 +334,156 @@ public static function provideCompletionSuggestions()
334334
yield 'receiver (no repeat)' => [['async', ''], ['async_high', 'failed']];
335335
yield 'option --bus' => [['--bus', ''], ['messenger.bus.default']];
336336
}
337+
338+
public function testRunWithExcludeReceiversOption()
339+
{
340+
$envelope1 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
341+
$envelope2 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
342+
$envelope3 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
343+
344+
$receiver1 = $this->createMock(ReceiverInterface::class);
345+
$receiver1->method('get')->willReturn([$envelope1]);
346+
$receiver2 = $this->createMock(ReceiverInterface::class);
347+
$receiver2->method('get')->willReturn([$envelope2]);
348+
$receiver3 = $this->createMock(ReceiverInterface::class);
349+
$receiver3->method('get')->willReturn([$envelope3]);
350+
351+
$receiverLocator = new Container();
352+
$receiverLocator->set('dummy-receiver1', $receiver1);
353+
$receiverLocator->set('dummy-receiver2', $receiver2);
354+
$receiverLocator->set('dummy-receiver3', $receiver3);
355+
356+
$bus = $this->createMock(MessageBusInterface::class);
357+
// Only 2 messages should be dispatched (receiver1 and receiver3, receiver2 is excluded)
358+
$bus->expects($this->exactly(2))->method('dispatch');
359+
360+
$busLocator = new Container();
361+
$busLocator->set('dummy-bus', $bus);
362+
363+
$command = new ConsumeMessagesCommand(
364+
new RoutableMessageBus($busLocator),
365+
$receiverLocator, new EventDispatcher(),
366+
receiverNames: ['dummy-receiver1', 'dummy-receiver2', 'dummy-receiver3']
367+
);
368+
369+
$application = new Application();
370+
if (method_exists($application, 'addCommand')) {
371+
$application->addCommand($command);
372+
} else {
373+
$application->add($command);
374+
}
375+
$tester = new CommandTester($application->get('messenger:consume'));
376+
$tester->execute([
377+
'--all' => true,
378+
'--exclude-receivers' => ['dummy-receiver2'],
379+
'--limit' => 2,
380+
]);
381+
382+
$tester->assertCommandIsSuccessful();
383+
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver1, dummy-receiver3"', $tester->getDisplay());
384+
}
385+
386+
public function testRunWithExcludeReceiversMultipleQueues()
387+
{
388+
$envelope1 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
389+
$envelope2 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
390+
$envelope3 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
391+
$envelope4 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
392+
393+
$receiver1 = $this->createMock(ReceiverInterface::class);
394+
$receiver1->method('get')->willReturn([$envelope1]);
395+
$receiver2 = $this->createMock(ReceiverInterface::class);
396+
$receiver2->method('get')->willReturn([$envelope2]);
397+
$receiver3 = $this->createMock(ReceiverInterface::class);
398+
$receiver3->method('get')->willReturn([$envelope3]);
399+
$receiver4 = $this->createMock(ReceiverInterface::class);
400+
$receiver4->method('get')->willReturn([$envelope4]);
401+
402+
$receiverLocator = new Container();
403+
$receiverLocator->set('dummy-receiver1', $receiver1);
404+
$receiverLocator->set('dummy-receiver2', $receiver2);
405+
$receiverLocator->set('dummy-receiver3', $receiver3);
406+
$receiverLocator->set('dummy-receiver4', $receiver4);
407+
408+
$bus = $this->createMock(MessageBusInterface::class);
409+
// Only 2 messages should be dispatched (receiver1 and receiver4, receiver2 and receiver3 are excluded)
410+
$bus->expects($this->exactly(2))->method('dispatch');
411+
412+
$busLocator = new Container();
413+
$busLocator->set('dummy-bus', $bus);
414+
415+
$command = new ConsumeMessagesCommand(
416+
new RoutableMessageBus($busLocator),
417+
$receiverLocator, new EventDispatcher(),
418+
receiverNames: ['dummy-receiver1', 'dummy-receiver2', 'dummy-receiver3', 'dummy-receiver4']
419+
);
420+
421+
$application = new Application();
422+
if (method_exists($application, 'addCommand')) {
423+
$application->addCommand($command);
424+
} else {
425+
$application->add($command);
426+
}
427+
$tester = new CommandTester($application->get('messenger:consume'));
428+
$tester->execute([
429+
'--all' => true,
430+
'--exclude-receivers' => ['dummy-receiver2', 'dummy-receiver3'],
431+
'--limit' => 2,
432+
]);
433+
434+
$tester->assertCommandIsSuccessful();
435+
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver1, dummy-receiver4"', $tester->getDisplay());
436+
}
437+
438+
public function testExcludeReceiverssWithoutAllOptionThrowsException()
439+
{
440+
$receiverLocator = new Container();
441+
$receiverLocator->set('dummy-receiver', new \stdClass());
442+
443+
$command = new ConsumeMessagesCommand(new RoutableMessageBus(new Container()), $receiverLocator, new EventDispatcher());
444+
445+
$application = new Application();
446+
if (method_exists($application, 'addCommand')) {
447+
$application->addCommand($command);
448+
} else {
449+
$application->add($command);
450+
}
451+
$tester = new CommandTester($application->get('messenger:consume'));
452+
453+
$this->expectException(InvalidOptionException::class);
454+
$this->expectExceptionMessage('The --exclude-receivers option can only be used with the --all option.');
455+
$tester->execute([
456+
'receivers' => ['dummy-receiver'],
457+
'--exclude-receivers' => ['dummy-receiver'],
458+
]);
459+
}
460+
461+
public function testExcludeReceiversWithAllQueuesExcludedThrowsException()
462+
{
463+
$receiverLocator = new Container();
464+
$receiverLocator->set('dummy-receiver1', new \stdClass());
465+
$receiverLocator->set('dummy-receiver2', new \stdClass());
466+
467+
$command = new ConsumeMessagesCommand(
468+
new RoutableMessageBus(new Container()),
469+
$receiverLocator,
470+
new EventDispatcher(),
471+
receiverNames: ['dummy-receiver1', 'dummy-receiver2']
472+
);
473+
474+
$application = new Application();
475+
if (method_exists($application, 'addCommand')) {
476+
$application->addCommand($command);
477+
} else {
478+
$application->add($command);
479+
}
480+
$tester = new CommandTester($application->get('messenger:consume'));
481+
482+
$this->expectException(\Symfony\Component\Console\Exception\RuntimeException::class);
483+
$this->expectExceptionMessage('All transports/receivers have been excluded. Please specify at least one to consume from.');
484+
$tester->execute([
485+
'--all' => true,
486+
'--exclude-receivers' => ['dummy-receiver1', 'dummy-receiver2'],
487+
]);
488+
}
337489
}

0 commit comments

Comments
 (0)