Skip to content

Commit a94ce84

Browse files
committed
Fixing a bug where a transport could receive a message and dispatch it to a different bus
1 parent 6ff1185 commit a94ce84

File tree

13 files changed

+272
-36
lines changed

13 files changed

+272
-36
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

+12-2
Original file line numberDiff line numberDiff line change
@@ -1616,8 +1616,14 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16161616
}
16171617

16181618
$defaultMiddleware = [
1619-
'before' => [['id' => 'dispatch_after_current_bus']],
1620-
'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
1619+
'before' => [
1620+
['id' => 'add_bus_name_stamp_middleware'],
1621+
['id' => 'dispatch_after_current_bus'],
1622+
],
1623+
'after' => [
1624+
['id' => 'send_message'],
1625+
['id' => 'handle_message'],
1626+
],
16211627
];
16221628
foreach ($config['buses'] as $busId => $bus) {
16231629
$middleware = $bus['middleware'];
@@ -1628,6 +1634,10 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16281634
} else {
16291635
unset($defaultMiddleware['after'][1]['arguments']);
16301636
}
1637+
1638+
// argument to add_bus_name_stamp_middleware
1639+
$defaultMiddleware['before'][0]['arguments'] = [$busId];
1640+
16311641
$middleware = array_merge($defaultMiddleware['before'], $middleware, $defaultMiddleware['after']);
16321642
}
16331643

src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@
8181
<argument type="service" id="messenger.receiver_locator" />
8282
<argument type="service" id="logger" on-invalid="null" />
8383
<argument type="collection" /> <!-- Receiver names -->
84-
<argument type="collection" /> <!-- Message bus names -->
8584
<argument type="service" id="messenger.retry_strategy_locator" />
8685
<argument type="service" id="event_dispatcher" />
8786

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
</call>
4040
</service>
4141

42+
<service id="messenger.middleware.add_bus_name_stamp_middleware" class="Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware" abstract="true" />
43+
4244
<service id="messenger.middleware.dispatch_after_current_bus" class="Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware" />
4345

4446
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

+2
Original file line numberDiff line numberDiff line change
@@ -727,13 +727,15 @@ public function testMessengerWithMultipleBuses()
727727
$this->assertTrue($container->has('messenger.bus.commands'));
728728
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
729729
$this->assertEquals([
730+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
730731
['id' => 'dispatch_after_current_bus'],
731732
['id' => 'send_message'],
732733
['id' => 'handle_message'],
733734
], $container->getParameter('messenger.bus.commands.middleware'));
734735
$this->assertTrue($container->has('messenger.bus.events'));
735736
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
736737
$this->assertEquals([
738+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
737739
['id' => 'dispatch_after_current_bus'],
738740
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
739741
['id' => 'send_message'],

src/Symfony/Component/Messenger/CHANGELOG.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@ CHANGELOG
33

44
4.3.0
55
-----
6-
6+
7+
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
8+
and `BusNameStamp` were added, which allow you to add a bus identifier
9+
to the `Envelope` then find the correct bus when receiving from
10+
the transport. See `ConsumeMessagesCommand`.
11+
* An optional `ConsumeMessagesCommand` constructor argument was removed.
712
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
813
`ack()` and `reject()`.
914
* [BC BREAK] Error handling was moved from the receivers into

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

+16-29
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use Symfony\Component\Console\Output\OutputInterface;
2222
use Symfony\Component\Console\Style\SymfonyStyle;
2323
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
24+
use Symfony\Component\Messenger\RoutableMessageBus;
2425
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
2526
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
2627
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
@@ -39,17 +40,15 @@ class ConsumeMessagesCommand extends Command
3940
private $receiverLocator;
4041
private $logger;
4142
private $receiverNames;
42-
private $busNames;
4343
private $retryStrategyLocator;
4444
private $eventDispatcher;
4545

46-
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
46+
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
4747
{
4848
$this->busLocator = $busLocator;
4949
$this->receiverLocator = $receiverLocator;
5050
$this->logger = $logger;
5151
$this->receiverNames = $receiverNames;
52-
$this->busNames = $busNames;
5352
$this->retryStrategyLocator = $retryStrategyLocator;
5453
$this->eventDispatcher = $eventDispatcher;
5554

@@ -62,15 +61,14 @@ public function __construct(ContainerInterface $busLocator, ContainerInterface $
6261
protected function configure(): void
6362
{
6463
$defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
65-
$defaultBusName = 1 === \count($this->busNames) ? current($this->busNames) : null;
6664

6765
$this
6866
->setDefinition([
6967
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
7068
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
7169
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
7270
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
73-
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName),
71+
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
7472
])
7573
->setDescription('Consumes messages')
7674
->setHelp(<<<'EOF'
@@ -89,6 +87,12 @@ protected function configure(): void
8987
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
9088
9189
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
90+
91+
Use the --bus option to specify the message bus to dispatch received messages
92+
to instead of trying to determine it automatically. This is required if the
93+
messages didn't originate from Messenger:
94+
95+
<info>php %command.full_name% <receiver-name> --bus=event_bus</info>
9296
EOF
9397
)
9498
;
@@ -112,24 +116,6 @@ protected function interact(InputInterface $input, OutputInterface $output)
112116
}
113117
}
114118
}
115-
116-
$busName = $input->getOption('bus');
117-
if ($this->busNames && !$this->busLocator->has($busName)) {
118-
if (null === $busName) {
119-
$io->block('Missing bus argument.', null, 'error', ' ', true);
120-
$input->setOption('bus', $io->choice('Select one of the available buses', $this->busNames));
121-
} elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
122-
$io->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
123-
124-
if (1 === \count($alternatives)) {
125-
if ($io->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
126-
$input->setOption('bus', $alternatives[0]);
127-
}
128-
} else {
129-
$input->setOption('bus', $io->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
130-
}
131-
}
132-
}
133119
}
134120

135121
/**
@@ -147,18 +133,19 @@ protected function execute(InputInterface $input, OutputInterface $output): void
147133
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
148134
}
149135

150-
if (!$this->busLocator->has($busName = $input->getOption('bus'))) {
151-
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
152-
}
153-
154136
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
155137
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
156138
}
157139

158140
$receiver = $this->receiverLocator->get($receiverName);
159-
$bus = $this->busLocator->get($busName);
160141
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
161142

143+
if (null !== $input->getOption('bus')) {
144+
$bus = $this->busLocator->get($input->getOption('bus'));
145+
} else {
146+
$bus = new RoutableMessageBus($this->busLocator);
147+
}
148+
162149
$stopsWhen = [];
163150
if ($limit = $input->getOption('limit')) {
164151
$stopsWhen[] = "processed {$limit} messages";
@@ -176,7 +163,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
176163
}
177164

178165
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
179-
$io->success(sprintf('Consuming messages from transport "%s" on bus "%s".', $receiverName, $busName));
166+
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
180167

181168
if ($stopsWhen) {
182169
$last = array_pop($stopsWhen);

src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php

+1-2
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,7 @@ private function registerReceivers(ContainerBuilder $container, array $busIds)
248248

249249
$container->getDefinition('console.command.messenger_consume_messages')
250250
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
251-
->replaceArgument(3, array_values($receiverNames))
252-
->replaceArgument(4, $busIds);
251+
->replaceArgument(3, array_values($receiverNames));
253252
}
254253

255254
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
16+
17+
/**
18+
* Adds the BusNameStamp to the bus.
19+
*
20+
* @experimental in Symfony 4.2
21+
*
22+
* @author Ryan Weaver <ryan@symfonycasts.com>
23+
*/
24+
class AddBusNameStampMiddleware implements MiddlewareInterface
25+
{
26+
private $busName;
27+
28+
public function __construct(string $busName)
29+
{
30+
$this->busName = $busName;
31+
}
32+
33+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
34+
{
35+
$envelope = $envelope->with(new BusNameStamp($this->busName));
36+
37+
return $stack->next()->handle($envelope, $stack);
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
use Psr\Container\ContainerInterface;
15+
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
16+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
17+
18+
/**
19+
* Bus of buses that is routable using a BusNameStamp.
20+
*
21+
* This is useful when passed to Worker: messages received
22+
* from the transport can be sent to the correct bus.
23+
*
24+
* @experimental in Symfony 4.2
25+
*
26+
* @author Ryan Weaver <ryan@symfonycasts.com>
27+
*/
28+
class RoutableMessageBus implements MessageBusInterface
29+
{
30+
private $busLocator;
31+
32+
/**
33+
* @param ContainerInterface $busLocator A locator full of MessageBusInterface objects
34+
*/
35+
public function __construct(ContainerInterface $busLocator)
36+
{
37+
$this->busLocator = $busLocator;
38+
}
39+
40+
public function dispatch($envelope): Envelope
41+
{
42+
if (!$envelope instanceof Envelope) {
43+
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
44+
}
45+
46+
/** @var BusNameStamp $busNameStamp */
47+
$busNameStamp = $envelope->last(BusNameStamp::class);
48+
if (null === $busNameStamp) {
49+
throw new InvalidArgumentException('Envelope does not contain a BusNameStamp.');
50+
}
51+
52+
if (!$this->busLocator->has($busNameStamp->getBusName())) {
53+
throw new InvalidArgumentException(sprintf('Invalid bus name "%s" on BusNameStamp.', $busNameStamp->getBusName()));
54+
}
55+
56+
return $this->busLocator->get($busNameStamp->getBusName())->dispatch($envelope);
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
/**
15+
* Stamp used to identify which bus it was passed to.
16+
*
17+
* @experimental in Symfony 4.2
18+
*
19+
* @author Ryan Weaver <ryan@symfonycasts.com>
20+
*/
21+
class BusNameStamp implements StampInterface
22+
{
23+
private $busName;
24+
25+
public function __construct(string $busName)
26+
{
27+
$this->busName = $busName;
28+
}
29+
30+
public function getBusName(): string
31+
{
32+
return $this->busName;
33+
}
34+
}

src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php

-1
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,6 @@ public function testItRegistersMultipleReceiversAndSetsTheReceiverNamesOnTheComm
263263
(new MessengerPass())->process($container);
264264

265265
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
266-
$this->assertSame(['message_bus'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
267266
}
268267

269268
public function testItShouldNotThrowIfGeneratorIsReturnedInsteadOfArray()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
16+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
17+
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
18+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
19+
20+
class AddBusNameStampMiddlewareTest extends MiddlewareTestCase
21+
{
22+
public function testItSendsTheMessageToAssignedSender()
23+
{
24+
$middleware = new AddBusNameStampMiddleware('the_bus_name');
25+
$envelope = new Envelope(new DummyMessage('the message'));
26+
27+
$finalEnvelope = $middleware->handle($envelope, $this->getStackMock());
28+
/** @var BusNameStamp $busNameStamp */
29+
$busNameStamp = $finalEnvelope->last(BusNameStamp::class);
30+
$this->assertNotNull($busNameStamp);
31+
$this->assertSame('the_bus_name', $busNameStamp->getBusName());
32+
}
33+
}

0 commit comments

Comments
 (0)