Skip to content

Commit 15e05b1

Browse files
committed
Added AMQP component
1 parent 667924c commit 15e05b1

39 files changed

+3188
-2
lines changed

.travis.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ env:
1818
global:
1919
- MIN_PHP=7.1.3
2020
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
21-
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
21+
- MESSENGER_AMQP_DSN=amqp://localhost/
22+
- AMQP_DSN=amqp://localhost/
2223

2324
matrix:
2425
include:
@@ -199,7 +200,6 @@ install:
199200
- if [[ ! $skip ]]; then $COMPOSER_UP; fi
200201
- if [[ ! $skip ]]; then ./phpunit install; fi
201202
- php -i
202-
203203
- |
204204
run_tests () {
205205
set -e

composer.json

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"symfony/polyfill-php72": "~1.5"
3333
},
3434
"replace": {
35+
"symfony/amqp": "self.version",
3536
"symfony/asset": "self.version",
3637
"symfony/browser-kit": "self.version",
3738
"symfony/cache": "self.version",

phpunit.xml.dist

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
<env name="LDAP_PORT" value="3389" />
2020
<env name="REDIS_HOST" value="localhost" />
2121
<env name="MEMCACHED_HOST" value="localhost" />
22+
<env name="AMQP_DSN" value="amqp://localhost/symfony_test_amqp" />
2223
</php>
2324

2425
<testsuites>

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

+149
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public function getConfigTreeBuilder()
104104
$this->addWebLinkSection($rootNode);
105105
$this->addLockSection($rootNode);
106106
$this->addMessengerSection($rootNode);
107+
$this->addAmqpSection($rootNode);
107108

108109
return $treeBuilder;
109110
}
@@ -1066,4 +1067,152 @@ function ($a) {
10661067
->end()
10671068
;
10681069
}
1070+
1071+
private function addAmqpSection($rootNode)
1072+
{
1073+
$rootNode
1074+
->children()
1075+
->arrayNode('amqp')
1076+
->fixXmlConfig('connection')
1077+
->children()
1078+
->arrayNode('connections')
1079+
->addDefaultChildrenIfNoneSet('default')
1080+
->useAttributeAsKey('name')
1081+
->prototype('array')
1082+
->fixXmlConfig('exchange')
1083+
->fixXmlConfig('queue')
1084+
->children()
1085+
->scalarNode('name')
1086+
->cannotBeEmpty()
1087+
->end()
1088+
->scalarNode('dsn')
1089+
->cannotBeEmpty()
1090+
->defaultValue('amqp://guest:guest@localhost:5672/symfony')
1091+
->end()
1092+
->arrayNode('exchanges')
1093+
->prototype('array')
1094+
->fixXmlConfig('argument')
1095+
->children()
1096+
->scalarNode('name')
1097+
->isRequired()
1098+
->cannotBeEmpty()
1099+
->end()
1100+
->variableNode('arguments')
1101+
->defaultValue(array())
1102+
// Deal with XML config
1103+
->beforeNormalization()
1104+
->always()
1105+
->then(function ($v) {
1106+
return $this->fixXmlArguments($v);
1107+
})
1108+
->end()
1109+
->validate()
1110+
->ifTrue(function ($v) {
1111+
return !is_array($v);
1112+
})
1113+
->thenInvalid('Arguments should be an array (got %s).')
1114+
->end()
1115+
->end()
1116+
->end()
1117+
->end()
1118+
->end()
1119+
->arrayNode('queues')
1120+
->prototype('array')
1121+
->fixXmlConfig('argument')
1122+
->children()
1123+
->scalarNode('name')
1124+
->isRequired()
1125+
->cannotBeEmpty()
1126+
->end()
1127+
->variableNode('arguments')
1128+
->defaultValue(array())
1129+
// Deal with XML config
1130+
->beforeNormalization()
1131+
->always()
1132+
->then(function ($v) {
1133+
return $this->fixXmlArguments($v);
1134+
})
1135+
->end()
1136+
->validate()
1137+
->ifTrue(function ($v) {
1138+
return !is_array($v);
1139+
})
1140+
->thenInvalid('Arguments should be an array (got %s).')
1141+
->end()
1142+
->end()
1143+
->enumNode('retry_strategy')
1144+
->values(array(null, 'constant', 'exponential'))
1145+
->defaultNull()
1146+
->end()
1147+
->variableNode('retry_strategy_options')
1148+
->validate()
1149+
->ifTrue(function ($v) {
1150+
return !is_array($v);
1151+
})
1152+
->thenInvalid('Arguments should be an array (got %s).')
1153+
->end()
1154+
->end()
1155+
->arrayNode('thresholds')
1156+
->addDefaultsIfNotSet()
1157+
->children()
1158+
->integerNode('warning')->defaultNull()->end()
1159+
->integerNode('critical')->defaultNull()->end()
1160+
->end()
1161+
->end()
1162+
->end()
1163+
->validate()
1164+
->ifTrue(function ($config) {
1165+
return 'constant' === $config['retry_strategy'] && !array_key_exists('max', $config['retry_strategy_options']);
1166+
})
1167+
->thenInvalid('"max" of "retry_strategy_options" should be set for constant retry strategy.')
1168+
->end()
1169+
->validate()
1170+
->ifTrue(function ($config) {
1171+
return 'constant' === $config['retry_strategy'] && !array_key_exists('time', $config['retry_strategy_options']);
1172+
})
1173+
->thenInvalid('"time" of "retry_strategy_options" should be set for constant retry strategy.')
1174+
->end()
1175+
->validate()
1176+
->ifTrue(function ($config) {
1177+
return 'exponential' === $config['retry_strategy'] && !array_key_exists('max', $config['retry_strategy_options']);
1178+
})
1179+
->thenInvalid('"max" of "retry_strategy_options" should be set for exponential retry strategy.')
1180+
->end()
1181+
->validate()
1182+
->ifTrue(function ($config) {
1183+
return 'exponential' === $config['retry_strategy'] && !array_key_exists('offset', $config['retry_strategy_options']);
1184+
})
1185+
->thenInvalid('"offset" of "retry_strategy_options" should be set for exponential retry strategy.')
1186+
->end()
1187+
->end()
1188+
->end()
1189+
->end()
1190+
->end()
1191+
->end()
1192+
->scalarNode('default_connection')
1193+
->cannotBeEmpty()
1194+
->end()
1195+
->end()
1196+
->end()
1197+
->end()
1198+
;
1199+
}
1200+
1201+
private function fixXmlArguments($v)
1202+
{
1203+
if (!is_array($v)) {
1204+
return $v;
1205+
}
1206+
1207+
$tmp = array();
1208+
1209+
foreach ($v as $key => $value) {
1210+
if (!isset($value['key']) && !isset($value['value'])) {
1211+
return $v;
1212+
}
1213+
$tmp[$value['key']] = $value['value'];
1214+
}
1215+
1216+
return $tmp;
1217+
}
10691218
}

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

+39
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use Symfony\Bundle\FrameworkBundle\Routing\AnnotatedRouteControllerLoader;
2121
use Symfony\Bundle\FullStack;
2222
use Symfony\Component\Cache\Adapter\AbstractAdapter;
23+
use Symfony\Component\Amqp\Broker;
2324
use Symfony\Component\Cache\Adapter\AdapterInterface;
2425
use Symfony\Component\Cache\Adapter\ArrayAdapter;
2526
use Symfony\Component\Cache\ResettableInterface;
@@ -256,6 +257,9 @@ public function load(array $configs, ContainerBuilder $container)
256257
$this->registerRouterConfiguration($config['router'], $container, $loader);
257258
$this->registerAnnotationsConfiguration($config['annotations'], $container, $loader);
258259
$this->registerPropertyAccessConfiguration($config['property_access'], $container, $loader);
260+
if (isset($config['amqp'])) {
261+
$this->registerAmqpConfiguration($config['amqp'], $container, $loader);
262+
}
259263

260264
if ($this->isConfigEnabled($container, $config['serializer'])) {
261265
if (!class_exists('Symfony\Component\Serializer\Serializer')) {
@@ -1223,6 +1227,41 @@ private function registerPropertyAccessConfiguration(array $config, ContainerBui
12231227
;
12241228
}
12251229

1230+
private function registerAmqpConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader)
1231+
{
1232+
$loader->load('amqp.xml');
1233+
1234+
$defaultConnectionName = $config['default_connection'] ?? null;
1235+
1236+
$match = false;
1237+
foreach ($config['connections'] as $name => $connection) {
1238+
$container
1239+
->register("amqp.broker.$name", Broker::class)
1240+
->setFactory(array(Broker::class, 'createWithDsn'))
1241+
->addArgument($connection['dsn'])
1242+
->addArgument($connection['queues'])
1243+
->addArgument($connection['exchanges'])
1244+
->setPublic(false)
1245+
;
1246+
1247+
if (!$defaultConnectionName) {
1248+
$defaultConnectionName = $name;
1249+
}
1250+
if ($defaultConnectionName === $name) {
1251+
$match = true;
1252+
}
1253+
}
1254+
1255+
if (!$match) {
1256+
throw new \InvalidArgumentException(sprintf('The default_connection "%s" does not exist.', $defaultConnectionName));
1257+
}
1258+
1259+
$container
1260+
->setAlias('amqp.broker', "amqp.broker.$defaultConnectionName")
1261+
->setPublic(true)
1262+
;
1263+
}
1264+
12261265
private function registerSecurityCsrfConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader)
12271266
{
12281267
if (!$this->isConfigEnabled($container, $config)) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?xml version="1.0" ?>
2+
3+
<container xmlns="http://symfony.com/schema/dic/services"
4+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd">
6+
7+
<services>
8+
<defaults public="false" />
9+
10+
<service id="amqp.broker_locator" class="Symfony\Component\DependencyInjection\ServiceLocator">
11+
<tag name="container.service_locator" />
12+
<argument type="collection">
13+
<argument key="Symfony\Component\Amqp\Broker" type="service" id="amqp.broker" />
14+
</argument>
15+
</service>
16+
17+
<service id="amqp.command.move" class="Symfony\Component\Amqp\Command\AmqpMoveCommand">
18+
<tag name="console.command" />
19+
<argument type="service" id="amqp.broker_locator" />
20+
<argument type="service" id="logger" on-invalid="null" />
21+
</service>
22+
23+
</services>
24+
</container>

src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

+47
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
<xsd:element name="php-errors" type="php-errors" minOccurs="0" maxOccurs="1" />
3333
<xsd:element name="lock" type="lock" minOccurs="0" maxOccurs="1" />
3434
<xsd:element name="messenger" type="messenger" minOccurs="0" maxOccurs="1" />
35+
<xsd:element name="amqp" type="amqp" minOccurs="0" maxOccurs="1" />
3536
</xsd:choice>
3637

3738
<xsd:attribute name="http-method-override" type="xsd:boolean" />
@@ -401,4 +402,50 @@
401402
<xsd:attribute name="name" type="xsd:string" use="required"/>
402403
<xsd:attribute name="default-middlewares" type="xsd:boolean"/>
403404
</xsd:complexType>
405+
406+
<xsd:complexType name="amqp">
407+
<xsd:choice maxOccurs="unbounded">
408+
<xsd:element name="connection" type="amqp_connection" minOccurs="0" maxOccurs="unbounded" />
409+
</xsd:choice>
410+
411+
<xsd:attribute name="default_connection" type="xsd:string" />
412+
</xsd:complexType>
413+
414+
<xsd:complexType name="amqp_connection">
415+
<xsd:choice maxOccurs="unbounded">
416+
<xsd:element name="queue" type="amqp_queue" minOccurs="0" maxOccurs="unbounded" />
417+
<xsd:element name="exchange" type="amqp_exchange" minOccurs="0" maxOccurs="unbounded" />
418+
</xsd:choice>
419+
420+
<xsd:attribute name="dsn" type="xsd:string" />
421+
<xsd:attribute name="name" type="xsd:string" />
422+
</xsd:complexType>
423+
424+
<xsd:complexType name="amqp_queue">
425+
<xsd:choice maxOccurs="unbounded">
426+
<xsd:element name="argument" type="amqp_argument" minOccurs="0" maxOccurs="unbounded" />
427+
<xsd:element name="retry-strategy-options" type="amqp_queue_retry_strategy_options" minOccurs="0" maxOccurs="1" />
428+
</xsd:choice>
429+
<xsd:attribute name="name" type="xsd:string" />
430+
<xsd:attribute name="retry-strategy" type="xsd:string" />
431+
</xsd:complexType>
432+
433+
<xsd:complexType name="amqp_argument">
434+
<xsd:attribute name="key" type="xsd:string" />
435+
<xsd:attribute name="value" type="xsd:string" />
436+
</xsd:complexType>
437+
438+
<xsd:complexType name="amqp_queue_retry_strategy_options">
439+
<xsd:attribute name="key" type="xsd:string" />
440+
<xsd:attribute name="offset" type="xsd:string" />
441+
<xsd:attribute name="max" type="xsd:string" />
442+
<xsd:attribute name="time" type="xsd:string" />
443+
</xsd:complexType>
444+
445+
<xsd:complexType name="amqp_exchange">
446+
<xsd:choice maxOccurs="unbounded">
447+
<xsd:element name="argument" type="amqp_argument" minOccurs="0" maxOccurs="unbounded" />
448+
</xsd:choice>
449+
<xsd:attribute name="name" type="xsd:string" />
450+
</xsd:complexType>
404451
</xsd:schema>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework', array(
4+
'amqp' => array(
5+
),
6+
));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework', array(
4+
'amqp' => array(
5+
'connections' => array(
6+
'queue_staging' => array(
7+
'dsn' => 'amqp://foo:baz@rabbitmq:1234/staging',
8+
),
9+
'queue_prod' => array(
10+
'dsn' => 'amqp://foo:bar@rabbitmq:1234/prod',
11+
'queues' => array(
12+
array(
13+
'name' => 'retry_strategy_exponential',
14+
'retry_strategy' => 'exponential',
15+
'retry_strategy_options' => array('offset' => 1, 'max' => 3),
16+
),
17+
array(
18+
'name' => 'arguments',
19+
'arguments' => array(
20+
'routing_keys' => 'my_routing_key',
21+
'flags' => 2,
22+
),
23+
),
24+
),
25+
'exchanges' => array(
26+
array(
27+
'name' => 'headers',
28+
'arguments' => array(
29+
'type' => 'headers',
30+
),
31+
),
32+
),
33+
),
34+
),
35+
'default_connection' => 'queue_prod',
36+
),
37+
));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?xml version="1.0" ?>
2+
3+
<container xmlns="http://symfony.com/schema/dic/services"
4+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
5+
xmlns:framework="http://symfony.com/schema/dic/symfony"
6+
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
7+
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
8+
9+
<framework:config>
10+
<framework:amqp>
11+
</framework:amqp>
12+
</framework:config>
13+
</container>

0 commit comments

Comments
 (0)