Skip to content

Commit 4543328

Browse files
committed
Factories rework.
1 parent f5bcb3e commit 4543328

File tree

11 files changed

+689
-318
lines changed

11 files changed

+689
-318
lines changed

pkg/enqueue-bundle/DependencyInjection/Configuration.php

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,22 @@
33
namespace Enqueue\Bundle\DependencyInjection;
44

55
use Enqueue\Client\Config;
6-
use Enqueue\Symfony\TransportFactoryInterface;
76
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
87
use Symfony\Component\Config\Definition\ConfigurationInterface;
98

109
class Configuration implements ConfigurationInterface
1110
{
1211
/**
13-
* @var TransportFactoryInterface[]
12+
* @var string[]
1413
*/
15-
private $factories;
14+
private $factoriesNames;
1615

1716
/**
18-
* @param TransportFactoryInterface[] $factories
17+
* @param string[] $factoriesNames
1918
*/
20-
public function __construct(array $factories)
19+
public function __construct(array $factoriesNames)
2120
{
22-
$this->factories = $factories;
21+
$this->factoriesNames = $factoriesNames;
2322
}
2423

2524
/**
@@ -30,17 +29,69 @@ public function getConfigTreeBuilder()
3029
$tb = new TreeBuilder();
3130
$rootNode = $tb->root('enqueue');
3231

33-
$transportChildren = $rootNode->children()
34-
->arrayNode('transport')->isRequired()->children();
32+
$rootNode
33+
->beforeNormalization()
34+
->always(function ($v) {
35+
if (empty($v['transport'])) {
36+
$v['transport'] = [
37+
'default' => ['dsn' => 'null://'],
38+
];
39+
}
3540

36-
foreach ($this->factories as $factory) {
37-
$factory->addConfiguration(
38-
$transportChildren->arrayNode($factory->getName())
39-
);
40-
}
41+
if (is_string($v['transport'])) {
42+
$v['transport'] = [
43+
'default' => ['dsn' => $v['transport']],
44+
];
45+
}
46+
47+
if (is_array($v['transport'])) {
48+
foreach ($v['transport'] as $name => $config) {
49+
if (empty($config)) {
50+
$config = ['dsn' => 'null://'];
51+
}
52+
53+
if (is_string($config)) {
54+
$config = ['dsn' => $config];
55+
}
56+
57+
if (empty($config['dsn']) && empty($config['config'])) {
58+
throw new \LogicException(sprintf('The transport "%s" is incorrectly configured. Either "dsn" or "config" must be set.', $name));
59+
}
60+
61+
$v['transport'][$name] = $config;
62+
}
63+
}
64+
65+
return $v;
66+
})
67+
->end()
68+
->children()
69+
->arrayNode('transport')
70+
->prototype('array')
71+
->beforeNormalization()
72+
->ifString()->then(function ($v) {
73+
return ['dsn' => $v];
74+
})
75+
->ifEmpty()->then(function ($v) {
76+
return ['dsn' => 'null://'];
77+
})
78+
->end()
79+
->children()
80+
->scalarNode('dsn')->end()
81+
->enumNode('factory')->values($this->factoriesNames)->end()
82+
->variableNode('config')
83+
->treatNullLike([])
84+
->info('The place for factory specific options')
85+
->end()
86+
->end()
87+
->end()
88+
->end()
89+
->end()
90+
;
4191

4292
$rootNode->children()
4393
->arrayNode('client')->children()
94+
->scalarNode('transport')->defaultValue('default')->end()
4495
->booleanNode('traceable_producer')->defaultFalse()->end()
4596
->scalarNode('prefix')->defaultValue('enqueue')->end()
4697
->scalarNode('app_name')->defaultValue('app')->end()

pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
use Enqueue\Client\TraceableProducer;
66
use Enqueue\JobQueue\Job;
77
use Enqueue\Null\Symfony\NullTransportFactory;
8+
use Enqueue\Psr\PsrConnectionFactory;
9+
use Enqueue\Psr\PsrContext;
810
use Enqueue\Symfony\DefaultTransportFactory;
911
use Enqueue\Symfony\TransportFactoryInterface;
1012
use Symfony\Component\Config\FileLocator;
@@ -44,25 +46,63 @@ public function addTransportFactory(TransportFactoryInterface $transportFactory)
4446
throw new \LogicException(sprintf('Transport factory with such name already added. Name %s', $name));
4547
}
4648

47-
$this->factories[$name] = $transportFactory;
49+
// $this->factories[$name] = $transportFactory;
50+
}
51+
52+
/**
53+
* @param string $name
54+
* @param string $factoryClass
55+
*/
56+
public function addFactoryClass($name, $factoryClass)
57+
{
58+
if (array_key_exists($name, $this->factories)) {
59+
throw new \LogicException(sprintf('The factory with such name has already been added. Name "%s"', $name));
60+
}
61+
62+
$this->factories[$name] = $factoryClass;
4863
}
4964

5065
/**
5166
* {@inheritdoc}
5267
*/
5368
public function load(array $configs, ContainerBuilder $container)
5469
{
55-
$config = $this->processConfiguration(new Configuration($this->factories), $configs);
70+
$config = $this->processConfiguration(new Configuration(array_keys($this->factories)), $configs);
5671

5772
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
5873
$loader->load('services.yml');
5974

75+
$container->getDefinition('enqueue.connection_factory_factory')
76+
->replaceArgument(0, $this->factories);
77+
6078
foreach ($config['transport'] as $name => $transportConfig) {
61-
$this->factories[$name]->createConnectionFactory($container, $transportConfig);
62-
$this->factories[$name]->createContext($container, $transportConfig);
79+
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $name);
80+
$contextId = sprintf('enqueue.transport.%s.context', $name);
81+
82+
if (isset($transportConfig['dsn'])) {
83+
$transportConfig = $transportConfig['dsn'];
84+
}
85+
86+
$container->register($factoryId, PsrConnectionFactory::class)
87+
->addArgument($transportConfig)
88+
->setFactory([new Reference('enqueue.connection_factory_factory'), 'createFactory'])
89+
;
90+
91+
$container->register($contextId, PsrContext::class)
92+
->setFactory([new Reference($factoryId), 'createContext'])
93+
;
6394
}
6495

6596
if (isset($config['client'])) {
97+
$container->setAlias(
98+
'enqueue.client.transport.connection_factory',
99+
sprintf('enqueue.transport.%s.connection_factory', $config['client']['transport'])
100+
);
101+
$container->setAlias(
102+
'enqueue.client.transport.context',
103+
sprintf('enqueue.transport.%s.context', $config['client']['transport'])
104+
);
105+
66106
$loader->load('client.yml');
67107
$loader->load('extensions/flush_spool_producer_extension.yml');
68108

pkg/enqueue-bundle/EnqueueBundle.php

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
use Enqueue\AmqpExt\AmqpContext;
66
use Enqueue\AmqpExt\Symfony\AmqpTransportFactory;
7-
use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory;
87
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass;
98
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass;
109
use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass;
@@ -15,15 +14,14 @@
1514
use Enqueue\Bundle\Events\DependencyInjection\AsyncEventsPass;
1615
use Enqueue\Bundle\Events\DependencyInjection\AsyncTransformersPass;
1716
use Enqueue\Dbal\DbalContext;
17+
use Enqueue\Dbal\ManagerRegistryConnectionFactory;
1818
use Enqueue\Dbal\Symfony\DbalTransportFactory;
1919
use Enqueue\Fs\FsContext;
2020
use Enqueue\Fs\Symfony\FsTransportFactory;
2121
use Enqueue\Redis\RedisContext;
2222
use Enqueue\Redis\Symfony\RedisTransportFactory;
2323
use Enqueue\Sqs\SqsContext;
24-
use Enqueue\Sqs\Symfony\SqsTransportFactory;
2524
use Enqueue\Stomp\StompContext;
26-
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
2725
use Enqueue\Stomp\Symfony\StompTransportFactory;
2826
use Symfony\Component\DependencyInjection\Compiler\PassConfig;
2927
use Symfony\Component\DependencyInjection\ContainerBuilder;
@@ -47,29 +45,28 @@ public function build(ContainerBuilder $container)
4745
$extension = $container->getExtension('enqueue');
4846

4947
if (class_exists(StompContext::class)) {
50-
$extension->addTransportFactory(new StompTransportFactory());
51-
$extension->addTransportFactory(new RabbitMqStompTransportFactory());
48+
$extension->addFactoryClass('stomp', StompTransportFactory::class);
5249
}
5350

5451
if (class_exists(AmqpContext::class)) {
55-
$extension->addTransportFactory(new AmqpTransportFactory());
56-
$extension->addTransportFactory(new RabbitMqAmqpTransportFactory());
52+
$extension->addFactoryClass('amqp', AmqpTransportFactory::class);
5753
}
5854

5955
if (class_exists(FsContext::class)) {
60-
$extension->addTransportFactory(new FsTransportFactory());
56+
$extension->addFactoryClass('file', FsTransportFactory::class);
6157
}
6258

6359
if (class_exists(RedisContext::class)) {
64-
$extension->addTransportFactory(new RedisTransportFactory());
60+
$extension->addFactoryClass('redis', RedisTransportFactory::class);
6561
}
6662

6763
if (class_exists(DbalContext::class)) {
68-
$extension->addTransportFactory(new DbalTransportFactory());
64+
$extension->addFactoryClass('dbal', DbalTransportFactory::class);
65+
$extension->addFactoryClass('doctrine', ManagerRegistryConnectionFactory::class);
6966
}
7067

7168
if (class_exists(SqsContext::class)) {
72-
$extension->addTransportFactory(new SqsTransportFactory());
69+
$extension->addFactoryClass('amazon_sqs', DbalTransportFactory::class);
7370
}
7471

7572
$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);

pkg/enqueue-bundle/Resources/config/client.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ services:
3030
class: 'Enqueue\Client\RpcClient'
3131
arguments:
3232
- '@enqueue.client.producer'
33-
- '@enqueue.transport.context'
33+
- '@enqueue.client.transport.context'
3434

3535
enqueue.client.router_processor:
3636
class: 'Enqueue\Client\RouterProcessor'
@@ -78,7 +78,7 @@ services:
7878
class: 'Enqueue\Consumption\QueueConsumer'
7979
public: false
8080
arguments:
81-
- '@enqueue.transport.context'
81+
- '@enqueue.client.transport.context'
8282
- '@enqueue.consumption.extensions'
8383

8484
enqueue.client.consume_messages_command:

pkg/enqueue-bundle/Resources/config/services.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
11
services:
2+
enqueue.connection_factory_factory:
3+
class: 'Enqueue\ConnectionFactoryFactory'
4+
public: false
5+
arguments:
6+
- []
7+
28
enqueue.consumption.extensions:
39
class: 'Enqueue\Consumption\ChainExtension'
410
public: false

pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ public function provideEnqueueConfigs()
4949
],
5050
]];
5151

52+
yield 'default_dsn_as_env' => [[
53+
'transport' => [
54+
'default' => '%env(AMQP_DSN)%',
55+
],
56+
]];
57+
5258
yield 'default_dbal_as_dsn' => [[
5359
'transport' => [
5460
'default' => getenv('DOCTINE_DSN'),

0 commit comments

Comments
 (0)