Skip to content

Commit a10725b

Browse files
committed
Add standalone setup broker feature.
1 parent dc0d34d commit a10725b

File tree

8 files changed

+187
-0
lines changed

8 files changed

+187
-0
lines changed

pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public function load(array $configs, ContainerBuilder $container): void
5959
$transportFactory->buildContext($container, []);
6060
$transportFactory->buildQueueConsumer($container, $modules['consumption']);
6161
$transportFactory->buildRpcClient($container, []);
62+
$transportFactory->buildSetupBroker($container, []);
6263

6364
// client
6465
if (isset($modules['client'])) {

pkg/enqueue-bundle/EnqueueBundle.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Enqueue\Symfony\Client\DependencyInjection\BuildTopicSubscriberRoutesPass as BuildClientTopicSubscriberRoutesPass;
1515
use Enqueue\Symfony\DependencyInjection\BuildConsumptionExtensionsPass;
1616
use Enqueue\Symfony\DependencyInjection\BuildProcessorRegistryPass;
17+
use Enqueue\Symfony\DependencyInjection\BuildSetupBrokerPass;
1718
use Symfony\Component\DependencyInjection\Compiler\PassConfig;
1819
use Symfony\Component\DependencyInjection\ContainerBuilder;
1920
use Symfony\Component\HttpKernel\Bundle\Bundle;
@@ -25,6 +26,7 @@ public function build(ContainerBuilder $container): void
2526
//transport passes
2627
$container->addCompilerPass(new BuildConsumptionExtensionsPass());
2728
$container->addCompilerPass(new BuildProcessorRegistryPass());
29+
$container->addCompilerPass(new BuildSetupBrokerPass());
2830

2931
//client passes
3032
$container->addCompilerPass(new BuildClientConsumptionExtensionsPass());

pkg/enqueue-bundle/Resources/config/services.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@ services:
1515
tags:
1616
- { name: 'console.command' }
1717

18+
enqueue.transport.setup_broker:
19+
class: 'Enqueue\Symfony\SetupBrokerCommand'
20+
arguments:
21+
- '@enqueue.locator'
22+
- '%enqueue.default_transport%'
23+
- 'enqueue.transport.%s.setup_broker'
24+
tags:
25+
- { name: 'console.command' }
26+
1827
enqueue.client.consume_command:
1928
class: 'Enqueue\Symfony\Client\ConsumeCommand'
2029
arguments:
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
namespace Enqueue\SetupBroker;
3+
4+
use Psr\Log\LoggerInterface;
5+
6+
class ChainSetupBroker implements SetupBrokerInterface
7+
{
8+
/**
9+
* @var SetupBrokerInterface[]
10+
*/
11+
private $setupBrokers;
12+
13+
/**
14+
* @param SetupBrokerInterface[] $setupBrokers
15+
*/
16+
public function __construct(array $setupBrokers)
17+
{
18+
$this->setupBrokers = [];
19+
20+
array_walk($setupBrokers, function (SetupBrokerInterface $setupBroker) {
21+
$this->setupBrokers[] = $setupBroker;
22+
});
23+
}
24+
25+
public function setupBroker(LoggerInterface $logger = null): void
26+
{
27+
foreach ($this->setupBrokers as $setupBroker) {
28+
$logger->info(get_class($setupBroker));
29+
30+
$setupBroker->setupBroker($logger);
31+
}
32+
}
33+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?php
2+
namespace Enqueue\SetupBroker;
3+
4+
use Psr\Log\LoggerInterface;
5+
6+
interface SetupBrokerInterface
7+
{
8+
public function setupBroker(LoggerInterface $logger = null): void;
9+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?php
2+
3+
namespace Enqueue\Symfony\DependencyInjection;
4+
5+
use Enqueue\Symfony\DiUtils;
6+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
7+
use Symfony\Component\DependencyInjection\ContainerBuilder;
8+
use Symfony\Component\DependencyInjection\Reference;
9+
10+
final class BuildSetupBrokerPass implements CompilerPassInterface
11+
{
12+
public function process(ContainerBuilder $container): void
13+
{
14+
if (false == $container->hasParameter('enqueue.transports')) {
15+
throw new \LogicException('The "enqueue.transports" parameter must be set.');
16+
}
17+
18+
$names = $container->getParameter('enqueue.transports');
19+
$defaultName = $container->getParameter('enqueue.default_transport');
20+
21+
foreach ($names as $name) {
22+
$diUtils = DiUtils::create(TransportFactory::MODULE, $name);
23+
24+
$setupBrokerId = $diUtils->format('setup_broker');
25+
if (false == $container->hasDefinition($setupBrokerId)) {
26+
throw new \LogicException(sprintf('Service "%s" not found', $setupBrokerId));
27+
}
28+
29+
$tag = 'enqueue.transport.setup_broker';
30+
$map = [];
31+
foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) {
32+
foreach ($tagAttributes as $tagAttribute) {
33+
$transport = $tagAttribute['transport'] ?? $defaultName;
34+
35+
if ($transport !== $name && 'all' !== $transport) {
36+
continue;
37+
}
38+
39+
$map[] = new Reference($serviceId);
40+
}
41+
}
42+
43+
$setupBroker = $container->getDefinition($setupBrokerId);
44+
$setupBroker->setArgument(0, $map);
45+
}
46+
}
47+
}

pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Enqueue\Resources;
1212
use Enqueue\Rpc\RpcClient;
1313
use Enqueue\Rpc\RpcFactory;
14+
use Enqueue\SetupBroker\ChainSetupBroker;
1415
use Enqueue\Symfony\ContainerProcessorRegistry;
1516
use Enqueue\Symfony\DiUtils;
1617
use Interop\Queue\ConnectionFactory;
@@ -243,6 +244,15 @@ public function buildRpcClient(ContainerBuilder $container, array $config): void
243244
}
244245
}
245246

247+
public function buildSetupBroker(ContainerBuilder $container, array $config): void
248+
{
249+
$container->register($this->diUtils->format('setup_broker'), ChainSetupBroker::class)
250+
->addArgument([])
251+
;
252+
253+
$this->addServiceToLocator($container, 'setup_broker');
254+
}
255+
246256
private function assertServiceExists(ContainerBuilder $container, string $serviceId): void
247257
{
248258
if (false == $container->hasDefinition($serviceId)) {
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
<?php
2+
3+
namespace Enqueue\Symfony;
4+
5+
use Enqueue\Client\DriverInterface;
6+
use Enqueue\SetupBroker\SetupBrokerInterface;
7+
use Psr\Container\ContainerInterface;
8+
use Psr\Container\NotFoundExceptionInterface;
9+
use Symfony\Component\Console\Command\Command;
10+
use Symfony\Component\Console\Input\InputInterface;
11+
use Symfony\Component\Console\Input\InputOption;
12+
use Symfony\Component\Console\Logger\ConsoleLogger;
13+
use Symfony\Component\Console\Output\OutputInterface;
14+
15+
class SetupBrokerCommand extends Command
16+
{
17+
protected static $defaultName = 'enqueue:transport:setup-broker';
18+
19+
/**
20+
* @var ContainerInterface
21+
*/
22+
private $container;
23+
24+
/**
25+
* @var string
26+
*/
27+
private $defaultTransport;
28+
29+
/**
30+
* @var string
31+
*/
32+
private $setupBrokerIdPattern;
33+
34+
public function __construct(ContainerInterface $container, string $defaultTransport, string $setupBrokerIdPattern = 'enqueue.transport.%s.setup_broker')
35+
{
36+
$this->container = $container;
37+
$this->defaultTransport = $defaultTransport;
38+
$this->setupBrokerIdPattern = $setupBrokerIdPattern;
39+
40+
parent::__construct(static::$defaultName);
41+
}
42+
43+
protected function configure(): void
44+
{
45+
$this
46+
->setDescription('Setup broker. Configure the broker, creates queues, topics and so on.')
47+
->addOption('transport', 't', InputOption::VALUE_OPTIONAL, 'The transport to consume messages from.', $this->defaultTransport)
48+
;
49+
}
50+
51+
protected function execute(InputInterface $input, OutputInterface $output): ?int
52+
{
53+
$transport = $input->getOption('transport');
54+
55+
$logger = new ConsoleLogger($output);
56+
57+
try {
58+
$setupBroker = $this->getSetupBroker($transport);
59+
60+
$logger->info(get_class($setupBroker));
61+
62+
$setupBroker->setupBroker();
63+
} catch (NotFoundExceptionInterface $e) {
64+
throw new \LogicException(sprintf('Transport "%s" is not supported.', $transport), null, $e);
65+
}
66+
67+
$output->writeln('Broker set up');
68+
69+
return null;
70+
}
71+
72+
private function getSetupBroker(string $transport): SetupBrokerInterface
73+
{
74+
return $this->container->get(sprintf($this->setupBrokerIdPattern, $transport));
75+
}
76+
}

0 commit comments

Comments
 (0)