Skip to content

Commit 3935548

Browse files
committed
[Messenger] implement several senders using a ChainSender
1 parent 5824aab commit 3935548

File tree

10 files changed

+131
-65
lines changed

10 files changed

+131
-65
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,7 @@ function ($a) {
10061006
->requiresAtLeastOneElement()
10071007
->prototype('scalar')->end()
10081008
->end()
1009+
->booleanNode('send_and_handle')->defaultFalse()->end()
10091010
->end()
10101011
->end()
10111012
->end()

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
6464
use Symfony\Component\Messenger\MessageBus;
6565
use Symfony\Component\Messenger\MessageBusInterface;
66+
use Symfony\Component\Messenger\Transport\ChainSender;
6667
use Symfony\Component\Messenger\Transport\ReceiverInterface;
6768
use Symfony\Component\Messenger\Transport\SenderInterface;
6869
use Symfony\Component\PropertyAccess\PropertyAccessor;
@@ -1494,16 +1495,30 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
14941495
throw new LogicException(sprintf('The default bus named "%s" is not defined. Define it or change the default bus name.', $config['default_bus']));
14951496
}
14961497

1497-
$messageToSenderIdsMapping = array();
1498+
$messageToSenderIdMapping = array();
1499+
$messageToSendAndHandleMapping = array();
14981500
foreach ($config['routing'] as $message => $messageConfiguration) {
14991501
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
15001502
throw new LogicException(sprintf('Messenger routing configuration contains a mistake: message "%s" does not exist. It needs to match an existing class or interface.', $message));
15011503
}
15021504

1503-
$messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
1505+
if (1 < count($messageConfiguration['senders'])) {
1506+
$senders = array_map(function ($sender) { return new Reference($sender); }, $messageConfiguration['senders']);
1507+
$chainSenderDefinition = new Definition(ChainSender::class, array($senders));
1508+
$chainSenderId = 'messenger.chain_sender.'.$message;
1509+
$container->setDefinition($chainSenderId, $chainSenderDefinition);
1510+
$messageToSenderIdMapping[$message] = $chainSenderId;
1511+
} else {
1512+
$messageToSenderIdMapping[$message] = $messageConfiguration['senders'][0];
1513+
}
1514+
1515+
$messageToSendAndHandleMapping[$message] = $messageConfiguration['send_and_handle'];
15041516
}
15051517

1506-
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping);
1518+
$container->getDefinition('messenger.asynchronous.routing.sender_locator')
1519+
->replaceArgument(1, $messageToSenderIdMapping)
1520+
->replaceArgument(2, $messageToSendAndHandleMapping)
1521+
;
15071522

15081523
foreach ($config['transports'] as $name => $transport) {
15091524
$senderDefinition = (new Definition(SenderInterface::class))

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
<!-- Asynchronous -->
1616
<service id="messenger.asynchronous.routing.sender_locator" class="Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator">
1717
<argument type="service" id="messenger.sender_locator" />
18-
<argument /> <!-- Message to sender ID mapping -->
18+
<argument type="collection" /> <!-- Message to sender ID mapping -->
19+
<argument type="collection" /> <!-- Message to send and handle mapping -->
1920
</service>
2021
<service id="messenger.middleware.route_messages" class="Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware">
2122
<argument type="service" id="messenger.asynchronous.routing.sender_locator" />

src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@
375375
<xsd:element name="sender" type="messenger_routing_sender" />
376376
</xsd:choice>
377377
<xsd:attribute name="message-class" type="xsd:string" use="required"/>
378+
<xsd:attribute name="send-and-handle" type="xsd:boolean" default="false"/>
378379
</xsd:complexType>
379380

380381
<xsd:complexType name="messenger_routing_sender">

src/Symfony/Component/Messenger/Asynchronous/Middleware/SendMessageMiddleware.php

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,16 @@ public function handle($message, callable $next)
3636
return $next($message->getMessage());
3737
}
3838

39-
$hasSender = false;
40-
$forwardToSenderAndHandler = false;
41-
foreach ($this->senderLocator->getSendersForMessage($message) as $sender) {
42-
$hasSender = true;
43-
44-
if (null === $sender) {
45-
$forwardToSenderAndHandler = true;
46-
continue;
47-
}
39+
$sender = $this->senderLocator->getSenderForMessage($message);
4840

41+
if ($sender) {
4942
$sender->send($message);
50-
}
5143

52-
if (!$hasSender || $forwardToSenderAndHandler) {
53-
return $next($message);
44+
if (!$this->senderLocator->forwardToSenderAndHandler($message)) {
45+
return;
46+
}
5447
}
48+
49+
return $next($message);
5550
}
5651
}

src/Symfony/Component/Messenger/Asynchronous/Routing/SenderLocator.php

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,39 @@
1212
namespace Symfony\Component\Messenger\Asynchronous\Routing;
1313

1414
use Psr\Container\ContainerInterface;
15+
use Symfony\Component\Messenger\Transport\SenderInterface;
1516

1617
/**
1718
* @author Samuel Roze <samuel.roze@gmail.com>
1819
*/
1920
class SenderLocator implements SenderLocatorInterface
2021
{
2122
private $senderServiceLocator;
22-
private $messageToSenderIdsMapping;
23+
private $messageToSenderIdMapping;
24+
private $messagesToSendAndHandleMapping;
2325

24-
public function __construct(ContainerInterface $senderServiceLocator, array $messageToSenderIdsMapping)
26+
public function __construct(ContainerInterface $senderServiceLocator, array $messageToSenderIdMapping, array $messagesToSendAndHandleMapping = array())
2527
{
2628
$this->senderServiceLocator = $senderServiceLocator;
27-
$this->messageToSenderIdsMapping = $messageToSenderIdsMapping;
29+
$this->messageToSenderIdMapping = $messageToSenderIdMapping;
30+
$this->messagesToSendAndHandleMapping = $messagesToSendAndHandleMapping;
2831
}
2932

3033
/**
3134
* {@inheritdoc}
3235
*/
33-
public function getSendersForMessage($message): iterable
36+
public function getSenderForMessage($message): ?SenderInterface
3437
{
35-
$senderIds = $this->messageToSenderIdsMapping[\get_class($message)] ?? $this->messageToSenderIdsMapping['*'] ?? array();
38+
$senderId = $this->messageToSenderIdMapping[\get_class($message)] ?? $this->messageToSenderIdMapping['*'] ?? null;
3639

37-
foreach ($senderIds as $senderId) {
38-
yield $this->senderServiceLocator->get($senderId);
39-
}
40+
return $senderId ? $this->senderServiceLocator->get($senderId) : null;
41+
}
42+
43+
/**
44+
* {@inheritdoc}
45+
*/
46+
public function forwardToSenderAndHandler($message): bool
47+
{
48+
return $this->messagesToSendAndHandleMapping[\get_class($message)] ?? $this->messagesToSendAndHandleMapping['*'] ?? false;
4049
}
4150
}

src/Symfony/Component/Messenger/Asynchronous/Routing/SenderLocatorInterface.php

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,27 @@
1515

1616
/**
1717
* @author Samuel Roze <samuel.roze@gmail.com>
18+
* @author Tobias Schultze <http://tobion.de>
1819
*
1920
* @experimental in 4.1
2021
*/
2122
interface SenderLocatorInterface
2223
{
2324
/**
24-
* Gets the senders (if applicable) for the given message object.
25+
* Gets the sender (if applicable) for the given message object.
2526
*
2627
* @param object $message
2728
*
28-
* @return SenderInterface[]
29+
* @return SenderInterface|null
2930
*/
30-
public function getSendersForMessage($message): iterable;
31+
public function getSenderForMessage($message): ?SenderInterface;
32+
33+
/**
34+
* Whether to route the message to a sender and still have it passed to its respective handler.
35+
*
36+
* @param object $message
37+
*
38+
* @return bool
39+
*/
40+
public function forwardToSenderAndHandler($message): bool;
3141
}

src/Symfony/Component/Messenger/Tests/Asynchronous/Middleware/SendMessageMiddlewareTest.php

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,26 +26,21 @@ public function testItSendsTheMessageToAssignedSender()
2626
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
2727
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
2828

29-
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
30-
$sender,
31-
)));
29+
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender));
3230

3331
$sender->expects($this->once())->method('send')->with($message);
3432
$next->expects($this->never())->method($this->anything());
3533

3634
$middleware->handle($message, $next);
3735
}
3836

39-
public function testItAlsoCallsTheNextMiddlewareIfASenderIsNull()
37+
public function testItAlsoCallsTheNextMiddlewareIfForwardToSenderAndHandler()
4038
{
4139
$message = new DummyMessage('Hey');
4240
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
4341
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
4442

45-
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
46-
$sender,
47-
null,
48-
)));
43+
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender, true));
4944

5045
$sender->expects($this->once())->method('send')->with($message);
5146
$next->expects($this->once())->method($this->anything());
@@ -58,7 +53,7 @@ public function testItCallsTheNextMiddlewareWhenNoSenderForThisMessage()
5853
$message = new DummyMessage('Hey');
5954
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
6055

61-
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array()));
56+
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(null));
6257

6358
$next->expects($this->once())->method($this->anything());
6459

@@ -73,9 +68,7 @@ public function testItSkipsReceivedMessages()
7368
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
7469
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
7570

76-
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
77-
$sender,
78-
)));
71+
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender));
7972

8073
$sender->expects($this->never())->method('send');
8174
$next->expects($this->once())->method('__invoke')->with($innerMessage);
@@ -86,15 +79,22 @@ public function testItSkipsReceivedMessages()
8679

8780
class InMemorySenderLocator implements SenderLocatorInterface
8881
{
89-
private $senders;
82+
private $sender;
83+
private $sendAndHandle;
9084

91-
public function __construct(iterable $senders)
85+
public function __construct(?SenderInterface $sender, bool $sendAndHandle = false)
9286
{
93-
$this->senders = $senders;
87+
$this->sender = $sender;
88+
$this->sendAndHandle = $sendAndHandle;
9489
}
9590

96-
public function getSendersForMessage($message): iterable
91+
public function getSenderForMessage($message): ?SenderInterface
9792
{
98-
return $this->senders;
93+
return $this->sender;
94+
}
95+
96+
public function forwardToSenderAndHandler($message): bool
97+
{
98+
return $this->sendAndHandle;
9999
}
100100
}

src/Symfony/Component/Messenger/Tests/Asynchronous/Routing/SenderLocatorTest.php

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Tests\Asynchronous\Routing;
1313

1414
use PHPUnit\Framework\TestCase;
15+
use Psr\Container\ContainerInterface;
1516
use Symfony\Component\DependencyInjection\Container;
1617
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator;
1718
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
@@ -27,13 +28,11 @@ public function testItReturnsTheSenderBasedOnTheMessageClass()
2728
$container->set('my_amqp_sender', $sender);
2829

2930
$locator = new SenderLocator($container, array(
30-
DummyMessage::class => array(
31-
'my_amqp_sender',
32-
),
31+
DummyMessage::class => 'my_amqp_sender',
3332
));
3433

35-
$this->assertIterableEquals(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
36-
$this->assertIterableEquals(array(), $locator->getSendersForMessage(new SecondMessage()));
34+
$this->assertSame($sender, $locator->getSenderForMessage(new DummyMessage('Hello')));
35+
$this->assertNull($locator->getSenderForMessage(new SecondMessage()));
3736
}
3837

3938
public function testItSupportsAWildcardInsteadOfTheMessageClass()
@@ -47,27 +46,24 @@ public function testItSupportsAWildcardInsteadOfTheMessageClass()
4746
$container->set('my_api_sender', $apiSender);
4847

4948
$locator = new SenderLocator($container, array(
50-
DummyMessage::class => array(
51-
'my_amqp_sender',
52-
),
53-
'*' => array(
54-
'my_api_sender',
55-
),
49+
DummyMessage::class => 'my_amqp_sender',
50+
'*' => 'my_api_sender',
5651
));
5752

58-
$this->assertIterableEquals(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
59-
$this->assertIterableEquals(array($apiSender), $locator->getSendersForMessage(new SecondMessage()));
53+
$this->assertSame($sender, $locator->getSenderForMessage(new DummyMessage('Hello')));
54+
$this->assertSame($apiSender, $locator->getSenderForMessage(new SecondMessage()));
6055
}
6156

62-
private function assertIterableEquals(iterable $expected, iterable $actual)
57+
public function testForwardToSenderAndHandler()
6358
{
64-
if ($actual instanceof \Traversable) {
65-
$actual = iterator_to_array($actual, false);
66-
}
67-
if ($expected instanceof \Traversable) {
68-
$expected = iterator_to_array($expected, false);
69-
}
59+
$container = $this->getMockBuilder(ContainerInterface::class)->getMock();
7060

71-
$this->assertEquals($expected, $actual);
61+
$locator = new SenderLocator($container, array(), array(
62+
DummyMessage::class => false,
63+
'*' => true,
64+
));
65+
66+
$this->assertFalse($locator->forwardToSenderAndHandler(new DummyMessage('Hello')));
67+
$this->assertTrue($locator->forwardToSenderAndHandler(new SecondMessage()));
7268
}
7369
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport;
13+
14+
/**
15+
* @author Tobias Schultze <http://tobion.de>
16+
*/
17+
class ChainSender implements SenderInterface
18+
{
19+
private $senders;
20+
21+
/**
22+
* @param SenderInterface[] $senders
23+
*/
24+
public function __construct(iterable $senders)
25+
{
26+
$this->senders = $senders;
27+
}
28+
29+
/**
30+
* {@inheritdoc}
31+
*/
32+
public function send($message)
33+
{
34+
foreach ($this->senders as $sender) {
35+
$sender->send($message);
36+
}
37+
}
38+
}

0 commit comments

Comments
 (0)