Skip to content

Commit a4f9bf3

Browse files
committed
Fixing a bug where a transport could receive a message and dispatch it to a different bus
1 parent 522594a commit a4f9bf3

File tree

11 files changed

+269
-34
lines changed

11 files changed

+269
-34
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

+12-2
Original file line numberDiff line numberDiff line change
@@ -1616,8 +1616,14 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16161616
}
16171617

16181618
$defaultMiddleware = [
1619-
'before' => [['id' => 'dispatch_after_current_bus']],
1620-
'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
1619+
'before' => [
1620+
['id' => 'add_bus_name_stamp_middleware'],
1621+
['id' => 'dispatch_after_current_bus'],
1622+
],
1623+
'after' => [
1624+
['id' => 'send_message'],
1625+
['id' => 'handle_message'],
1626+
],
16211627
];
16221628
foreach ($config['buses'] as $busId => $bus) {
16231629
$middleware = $bus['middleware'];
@@ -1628,6 +1634,10 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16281634
} else {
16291635
unset($defaultMiddleware['after'][1]['arguments']);
16301636
}
1637+
1638+
// argument to add_bus_name_stamp_middleware
1639+
$defaultMiddleware['before'][0]['arguments'] = [$busId];
1640+
16311641
$middleware = array_merge($defaultMiddleware['before'], $middleware, $defaultMiddleware['after']);
16321642
}
16331643

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
</call>
4040
</service>
4141

42+
<service id="messenger.middleware.add_bus_name_stamp_middleware" class="Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware" abstract="true" />
43+
4244
<service id="messenger.middleware.dispatch_after_current_bus" class="Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware" />
4345

4446
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">

src/Symfony/Component/Messenger/CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ CHANGELOG
44
4.3.0
55
-----
66

7+
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
8+
and `BusNameStamp` were added, which allow you to add a bus identifier
9+
to the `Envelope` then find the correct bus when receiving from
10+
the transport. See `ConsumeMessagesCommand`.
11+
* An optional `ConsumeMessagesCommand` constructor argument was removed.
712
* Added `PhpSerializer` which uses PHP's native `serialize()` and
813
`unserialize()` to serialize messages to a transport
914
* [BC BREAK] If no serializer were passed, the default serializer

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

+16-29
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use Symfony\Component\Console\Input\InputOption;
2121
use Symfony\Component\Console\Output\OutputInterface;
2222
use Symfony\Component\Console\Style\SymfonyStyle;
23+
use Symfony\Component\Messenger\RoutableMessageBus;
2324
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
2425
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
2526
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
@@ -38,15 +39,13 @@ class ConsumeMessagesCommand extends Command
3839
private $receiverLocator;
3940
private $logger;
4041
private $receiverNames;
41-
private $busNames;
4242

43-
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [])
43+
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [])
4444
{
4545
$this->busLocator = $busLocator;
4646
$this->receiverLocator = $receiverLocator;
4747
$this->logger = $logger;
4848
$this->receiverNames = $receiverNames;
49-
$this->busNames = $busNames;
5049

5150
parent::__construct();
5251
}
@@ -57,15 +56,14 @@ public function __construct(ContainerInterface $busLocator, ContainerInterface $
5756
protected function configure(): void
5857
{
5958
$defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
60-
$defaultBusName = 1 === \count($this->busNames) ? current($this->busNames) : null;
6159

6260
$this
6361
->setDefinition([
6462
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
6563
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
6664
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
6765
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
68-
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName),
66+
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
6967
])
7068
->setDescription('Consumes messages')
7169
->setHelp(<<<'EOF'
@@ -84,6 +82,12 @@ protected function configure(): void
8482
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
8583
8684
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
85+
86+
Use the --bus option to specify the message bus to dispatch received messages
87+
to instead of trying to determine it automatically. This is required if the
88+
messages didn't originate from Messenger:
89+
90+
<info>php %command.full_name% <receiver-name> --bus=event_bus</info>
8791
EOF
8892
)
8993
;
@@ -107,24 +111,6 @@ protected function interact(InputInterface $input, OutputInterface $output)
107111
}
108112
}
109113
}
110-
111-
$busName = $input->getOption('bus');
112-
if ($this->busNames && !$this->busLocator->has($busName)) {
113-
if (null === $busName) {
114-
$io->block('Missing bus argument.', null, 'error', ' ', true);
115-
$input->setOption('bus', $io->choice('Select one of the available buses', $this->busNames));
116-
} elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
117-
$io->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
118-
119-
if (1 === \count($alternatives)) {
120-
if ($io->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
121-
$input->setOption('bus', $alternatives[0]);
122-
}
123-
} else {
124-
$input->setOption('bus', $io->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
125-
}
126-
}
127-
}
128114
}
129115

130116
/**
@@ -136,12 +122,13 @@ protected function execute(InputInterface $input, OutputInterface $output): void
136122
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
137123
}
138124

139-
if (!$this->busLocator->has($busName = $input->getOption('bus'))) {
140-
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
141-
}
142-
143125
$receiver = $this->receiverLocator->get($receiverName);
144-
$bus = $this->busLocator->get($busName);
126+
127+
if (null !== $input->getOption('bus')) {
128+
$bus = $this->busLocator->get($input->getOption('bus'));
129+
} else {
130+
$bus = new RoutableMessageBus($this->busLocator);
131+
}
145132

146133
$stopsWhen = [];
147134
if ($limit = $input->getOption('limit')) {
@@ -160,7 +147,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
160147
}
161148

162149
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
163-
$io->success(sprintf('Consuming messages from transport "%s" on bus "%s".', $receiverName, $busName));
150+
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
164151

165152
if ($stopsWhen) {
166153
$last = array_pop($stopsWhen);

src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php

+1-2
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,7 @@ private function registerReceivers(ContainerBuilder $container, array $busIds)
248248

249249
$container->getDefinition('console.command.messenger_consume_messages')
250250
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
251-
->replaceArgument(3, array_values($receiverNames))
252-
->replaceArgument(4, $busIds);
251+
->replaceArgument(3, array_values($receiverNames));
253252
}
254253

255254
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
16+
17+
/**
18+
* Adds the BusNameStamp to the bus.
19+
*
20+
* @experimental in Symfony 4.2
21+
*
22+
* @author Ryan Weaver <ryan@symfonycasts.com>
23+
*/
24+
class AddBusNameStampMiddleware implements MiddlewareInterface
25+
{
26+
private $busName;
27+
28+
public function __construct(string $busName)
29+
{
30+
$this->busName = $busName;
31+
}
32+
33+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
34+
{
35+
$envelope = $envelope->with(new BusNameStamp($this->busName));
36+
37+
return $stack->next()->handle($envelope, $stack);
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
use Psr\Container\ContainerInterface;
15+
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
16+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
17+
18+
/**
19+
* Bus of buses that is routable using a BusNameStamp.
20+
*
21+
* This is useful when passed to Worker: messages received
22+
* from the transport can be sent to the correct bus.
23+
*
24+
* @experimental in Symfony 4.2
25+
*
26+
* @author Ryan Weaver <ryan@symfonycasts.com>
27+
*/
28+
class RoutableMessageBus implements MessageBusInterface
29+
{
30+
private $busLocator;
31+
32+
/**
33+
* @param ContainerInterface $busLocator A locator full of MessageBusInterface objects
34+
*/
35+
public function __construct(ContainerInterface $busLocator)
36+
{
37+
$this->busLocator = $busLocator;
38+
}
39+
40+
public function dispatch($envelope): Envelope
41+
{
42+
if (!$envelope instanceof Envelope) {
43+
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
44+
}
45+
46+
/** @var BusNameStamp $busNameStamp */
47+
$busNameStamp = $envelope->last(BusNameStamp::class);
48+
if (null === $busNameStamp) {
49+
throw new InvalidArgumentException('Envelope does not contain a BusNameStamp.');
50+
}
51+
52+
if (!$this->busLocator->has($busNameStamp->getBusName())) {
53+
throw new InvalidArgumentException(sprintf('Invalid bus name "%s" on BusNameStamp.', $busNameStamp->getBusName()));
54+
}
55+
56+
return $this->busLocator->get($busNameStamp->getBusName())->dispatch($envelope);
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
/**
15+
* Stamp used to identify which bus it was passed to.
16+
*
17+
* @experimental in Symfony 4.2
18+
*
19+
* @author Ryan Weaver <ryan@symfonycasts.com>
20+
*/
21+
class BusNameStamp implements StampInterface
22+
{
23+
private $busName;
24+
25+
public function __construct(string $busName)
26+
{
27+
$this->busName = $busName;
28+
}
29+
30+
public function getBusName(): string
31+
{
32+
return $this->busName;
33+
}
34+
}

src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php

-1
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,6 @@ public function testItRegistersMultipleReceiversAndSetsTheReceiverNamesOnTheComm
262262
(new MessengerPass())->process($container);
263263

264264
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
265-
$this->assertSame(['message_bus'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
266265
}
267266

268267
public function testItShouldNotThrowIfGeneratorIsReturnedInsteadOfArray()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
16+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
17+
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
18+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
19+
20+
class AddBusNameStampMiddlewareTest extends MiddlewareTestCase
21+
{
22+
public function testItSendsTheMessageToAssignedSender()
23+
{
24+
$middleware = new AddBusNameStampMiddleware('the_bus_name');
25+
$envelope = new Envelope(new DummyMessage('the message'));
26+
27+
$finalEnvelope = $middleware->handle($envelope, $this->getStackMock());
28+
/** @var BusNameStamp $busNameStamp */
29+
$busNameStamp = $finalEnvelope->last(BusNameStamp::class);
30+
$this->assertNotNull($busNameStamp);
31+
$this->assertSame('the_bus_name', $busNameStamp->getBusName());
32+
}
33+
}

0 commit comments

Comments
 (0)