Skip to content

Commit aa04d06

Browse files
committed
feature #26632 [Messenger] Add AMQP adapter (sroze)
This PR was squashed before being merged into the 4.1-dev branch (closes #26632). Discussion ---------- [Messenger] Add AMQP adapter | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | ø | License | MIT - [x] Depends on the Messenger component #24411 - [x] Add tests once we are all happy about the structure --- In order to give a great DX for simple needs such as sending messages through an AMQP broker such as RabbitMq, we should ship an AMQP adapter for the Messenger component within Symfony Core. It should be as simple as this proposal. We don't need to handle more specific use-cases nor brokers as other adapters such as the [enqueue adapter](https://github.com/sroze/enqueue-bridge) can also be used. Configuring the adapter is as simple as the following configuration: ```yaml # config/packages/messenger_adapters.yaml framework: messenger: adapter: "%env(MESSENGER_DSN)%" ``` With the given `.env` for example: ``` MESSENGER_DSN=amqp://guest:guest@localhost:5672/%2f/messages ``` Keep in mind that after having configured the adapter, developers have to route their messages to the given adapter. ```yaml # config/packages/messenger_routes.yaml framework: messenger: routing: producer). 'App\Message\Command\CreateNumber': messenger.default_sender ``` --- Additionally, multiple adapters can be created and messages routed to these ones. ```yaml # config/packages/messenger_routes.yaml framework: messenger: adapters: commands: "amqp://guest:guest@localhost:5672/%2f/commands" maintenance: "amqp://guest:guest@localhost:5672/%2f/maintenance" routing: producer). 'App\Message\Command\CreateNumber': messenger.commands_sender 'App\Message\Command\MaintenanceSpecificCommand': messenger.maintenance_sender ``` Commits ------- 798c230 [Messenger] Add AMQP adapter
2 parents 9a99955 + 798c230 commit aa04d06

33 files changed

+1322
-86
lines changed

.travis.yml

+9
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ addons:
1212
- language-pack-fr-base
1313
- ldap-utils
1414
- slapd
15+
- librabbitmq-dev
1516

1617
env:
1718
global:
1819
- MIN_PHP=7.1.3
1920
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
21+
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
2022

2123
matrix:
2224
include:
@@ -38,6 +40,7 @@ services:
3840
- memcached
3941
- mongodb
4042
- redis-server
43+
- rabbitmq
4144

4245
before_install:
4346
- |
@@ -134,6 +137,11 @@ before_install:
134137
- |
135138
# Install extra PHP extensions
136139
if [[ ! $skip ]]; then
140+
# Install librabbitmq
141+
wget http://ftp.debian.org/debian/pool/main/libr/librabbitmq/librabbitmq-dev_0.5.2-2_amd64.deb
142+
wget http://ftp.debian.org/debian/pool/main/libr/librabbitmq/librabbitmq1_0.5.2-2_amd64.deb
143+
sudo dpkg -i librabbitmq1_0.5.2-2_amd64.deb librabbitmq-dev_0.5.2-2_amd64.deb
144+
137145
# install libsodium
138146
sudo add-apt-repository ppa:ondrej/php -y
139147
sudo apt-get update -q
@@ -142,6 +150,7 @@ before_install:
142150
tfold ext.apcu tpecl apcu-5.1.6 apcu.so $INI
143151
tfold ext.libsodium tpecl libsodium sodium.so $INI
144152
tfold ext.mongodb tpecl mongodb-1.4.0RC1 mongodb.so $INI
153+
tfold ext.amqp tpecl amqp-1.9.3 amqp.so $INI
145154
fi
146155
147156
- |

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

+27
Original file line numberDiff line numberDiff line change
@@ -971,12 +971,17 @@ private function addMessengerSection(ArrayNodeDefinition $rootNode)
971971
->arrayNode('messenger')
972972
->info('Messenger configuration')
973973
->{!class_exists(FullStack::class) && class_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}()
974+
->fixXmlConfig('adapter')
974975
->children()
975976
->arrayNode('routing')
976977
->useAttributeAsKey('message_class')
977978
->beforeNormalization()
978979
->always()
979980
->then(function ($config) {
981+
if (!is_array($config)) {
982+
return array();
983+
}
984+
980985
$newConfig = array();
981986
foreach ($config as $k => $v) {
982987
if (!is_int($k)) {
@@ -1011,6 +1016,28 @@ function ($a) {
10111016
->end()
10121017
->end()
10131018
->end()
1019+
->arrayNode('adapters')
1020+
->useAttributeAsKey('name')
1021+
->arrayPrototype()
1022+
->beforeNormalization()
1023+
->ifString()
1024+
->then(function (string $dsn) {
1025+
return array('dsn' => $dsn);
1026+
})
1027+
->end()
1028+
->fixXmlConfig('option')
1029+
->children()
1030+
->scalarNode('dsn')->end()
1031+
->arrayNode('options')
1032+
->normalizeKeys(false)
1033+
->useAttributeAsKey('name')
1034+
->defaultValue(array())
1035+
->prototype('variable')
1036+
->end()
1037+
->end()
1038+
->end()
1039+
->end()
1040+
->end()
10141041
->end()
10151042
->end()
10161043
->end()

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

+18
Original file line numberDiff line numberDiff line change
@@ -1468,6 +1468,24 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
14681468
} else {
14691469
$container->removeDefinition('messenger.middleware.validator');
14701470
}
1471+
1472+
foreach ($config['adapters'] as $name => $adapter) {
1473+
$container->setDefinition('messenger.sender.'.$name, (new Definition(SenderInterface::class))->setFactory(array(
1474+
new Reference('messenger.adapter_factory'),
1475+
'createSender',
1476+
))->setArguments(array(
1477+
$adapter['dsn'],
1478+
$adapter['options'],
1479+
))->addTag('messenger.sender'));
1480+
1481+
$container->setDefinition('messenger.receiver.'.$name, (new Definition(ReceiverInterface::class))->setFactory(array(
1482+
new Reference('messenger.adapter_factory'),
1483+
'createReceiver',
1484+
))->setArguments(array(
1485+
$adapter['dsn'],
1486+
$adapter['options'],
1487+
))->addTag('messenger.receiver'));
1488+
}
14711489
}
14721490

14731491
private function registerCacheConfiguration(array $config, ContainerBuilder $container)

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

+13
Original file line numberDiff line numberDiff line change
@@ -72,5 +72,18 @@
7272
<tag name="container.service_locator" />
7373
<argument type="collection" />
7474
</service>
75+
76+
<!-- Adapters -->
77+
<service id="messenger.adapter_factory" class="Symfony\Component\Messenger\Adapter\Factory\ChainAdapterFactory">
78+
<argument type="tagged" tag="messenger.adapter_factory" />
79+
</service>
80+
81+
<service id="messenger.adapter.amqp.factory" class="Symfony\Component\Messenger\Adapter\AmqpExt\AmqpAdapterFactory">
82+
<argument type="service" id="messenger.transport.default_encoder" />
83+
<argument type="service" id="messenger.transport.default_decoder" />
84+
<argument>%kernel.debug%</argument>
85+
86+
<tag name="messenger.adapter_factory" />
87+
</service>
7588
</services>
7689
</container>

src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

+14
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@
354354
<xsd:sequence>
355355
<xsd:element name="routing" type="messenger_routing" minOccurs="0" maxOccurs="unbounded" />
356356
<xsd:element name="middlewares" type="messenger_middleware" minOccurs="0" maxOccurs="unbounded" />
357+
<xsd:element name="adapter" type="messenger_adapter" minOccurs="0" maxOccurs="unbounded" />
357358
</xsd:sequence>
358359
</xsd:complexType>
359360

@@ -368,6 +369,19 @@
368369
<xsd:attribute name="service" type="xsd:string" use="required"/>
369370
</xsd:complexType>
370371

372+
<xsd:complexType name="messenger_adapter">
373+
<xsd:sequence>
374+
<xsd:element name="option" type="messenger_adapter_option" minOccurs="0" maxOccurs="unbounded" />
375+
</xsd:sequence>
376+
<xsd:attribute name="name" type="xsd:string" />
377+
<xsd:attribute name="dsn" type="xsd:string" />
378+
</xsd:complexType>
379+
380+
<xsd:complexType name="messenger_adapter_option">
381+
<xsd:attribute name="name" type="xsd:string" />
382+
<xsd:attribute name="value" type="xsd:string" />
383+
</xsd:complexType>
384+
371385
<xsd:complexType name="messenger_middleware">
372386
<xsd:sequence>
373387
<xsd:element name="validation" type="messenger_validation" minOccurs="0" maxOccurs="1" />

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php

+1
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
258258
'enabled' => !class_exists(FullStack::class),
259259
),
260260
),
261+
'adapters' => array(),
261262
),
262263
);
263264
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework', array(
4+
'messenger' => array(
5+
'adapters' => array(
6+
'default' => 'amqp://localhost/%2f/messages',
7+
'customised' => array(
8+
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=exchange_name',
9+
'options' => array('queue_name' => 'Queue'),
10+
),
11+
),
12+
),
13+
));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<container xmlns="http://symfony.com/schema/dic/services"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:framework="http://symfony.com/schema/dic/symfony"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
6+
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
7+
8+
<framework:config>
9+
<framework:messenger>
10+
<framework:adapter name="default" dsn="amqp://localhost/%2f/messages" />
11+
<framework:adapter name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name">
12+
<framework:option name="queue_name" value="Queue" />
13+
</framework:adapter>
14+
</framework:messenger>
15+
</framework:config>
16+
</container>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
framework:
2+
messenger:
3+
adapters:
4+
default: 'amqp://localhost/%2f/messages'
5+
customised:
6+
dsn: 'amqp://localhost/%2f/messages?exchange_name=exchange_name'
7+
options:
8+
queue_name: Queue

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

+28
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,7 @@ public function testWebLink()
523523
public function testMessenger()
524524
{
525525
$container = $this->createContainerFromFile('messenger');
526+
$this->assertTrue($container->hasDefinition('message_bus'));
526527
$this->assertFalse($container->hasDefinition('messenger.middleware.doctrine_transaction'));
527528
}
528529

@@ -538,6 +539,33 @@ public function testMessengerValidationDisabled()
538539
$this->assertFalse($container->hasDefinition('messenger.middleware.validator'));
539540
}
540541

542+
public function testMessengerAdapter()
543+
{
544+
$container = $this->createContainerFromFile('messenger_adapter');
545+
$this->assertTrue($container->hasDefinition('messenger.sender.default'));
546+
$this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender'));
547+
$this->assertTrue($container->hasDefinition('messenger.receiver.default'));
548+
$this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver'));
549+
550+
$this->assertTrue($container->hasDefinition('messenger.sender.customised'));
551+
$senderFactory = $container->getDefinition('messenger.sender.customised')->getFactory();
552+
$senderArguments = $container->getDefinition('messenger.sender.customised')->getArguments();
553+
554+
$this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createSender'), $senderFactory);
555+
$this->assertCount(2, $senderArguments);
556+
$this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name', $senderArguments[0]);
557+
$this->assertEquals(array('queue_name' => 'Queue'), $senderArguments[1]);
558+
559+
$this->assertTrue($container->hasDefinition('messenger.receiver.customised'));
560+
$receiverFactory = $container->getDefinition('messenger.receiver.customised')->getFactory();
561+
$receiverArguments = $container->getDefinition('messenger.receiver.customised')->getArguments();
562+
563+
$this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createReceiver'), $receiverFactory);
564+
$this->assertCount(2, $receiverArguments);
565+
$this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name', $receiverArguments[0]);
566+
$this->assertEquals(array('queue_name' => 'Queue'), $receiverArguments[1]);
567+
}
568+
541569
public function testTranslator()
542570
{
543571
$container = $this->createContainerFromFile('full');
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Adapter\AmqpExt;
13+
14+
use Symfony\Component\Messenger\Adapter\Factory\AdapterFactoryInterface;
15+
use Symfony\Component\Messenger\Transport\ReceiverInterface;
16+
use Symfony\Component\Messenger\Transport\SenderInterface;
17+
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
18+
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
19+
20+
/**
21+
* @author Samuel Roze <samuel.roze@gmail.com>
22+
*/
23+
class AmqpAdapterFactory implements AdapterFactoryInterface
24+
{
25+
private $encoder;
26+
private $decoder;
27+
private $debug;
28+
29+
public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, bool $debug)
30+
{
31+
$this->encoder = $encoder;
32+
$this->decoder = $decoder;
33+
$this->debug = $debug;
34+
}
35+
36+
public function createReceiver(string $dsn, array $options): ReceiverInterface
37+
{
38+
return new AmqpReceiver($this->decoder, Connection::fromDsn($dsn, $options, $this->debug));
39+
}
40+
41+
public function createSender(string $dsn, array $options): SenderInterface
42+
{
43+
return new AmqpSender($this->encoder, Connection::fromDsn($dsn, $options, $this->debug));
44+
}
45+
46+
public function supports(string $dsn, array $options): bool
47+
{
48+
return 0 === strpos($dsn, 'amqp://');
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Adapter\AmqpExt;
13+
14+
class AmqpFactory
15+
{
16+
public function createConnection(array $credentials): \AMQPConnection
17+
{
18+
return new \AMQPConnection($credentials);
19+
}
20+
21+
public function createChannel(\AMQPConnection $connection): \AMQPChannel
22+
{
23+
return new \AMQPChannel($connection);
24+
}
25+
26+
public function createQueue(\AMQPChannel $channel): \AMQPQueue
27+
{
28+
return new \AMQPQueue($channel);
29+
}
30+
31+
public function createExchange(\AMQPChannel $channel): \AMQPExchange
32+
{
33+
return new \AMQPExchange($channel);
34+
}
35+
}

0 commit comments

Comments
 (0)