Skip to content

Commit 8c0e3c6

Browse files
committed
snsqs client driver
1 parent 159de19 commit 8c0e3c6

File tree

4 files changed

+105
-0
lines changed

4 files changed

+105
-0
lines changed

phpunit.xml.dist

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@
6565
<directory>pkg/sns/Tests</directory>
6666
</testsuite>
6767

68+
<testsuite name="snsqs transport">
69+
<directory>pkg/snsqs/Tests</directory>
70+
</testsuite>
71+
6872
<testsuite name="pheanstalk transport">
6973
<directory>pkg/pheanstalk/Tests</directory>
7074
</testsuite>
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<?php
2+
3+
namespace Enqueue\Client\Driver;
4+
5+
use Enqueue\SnsQs\SnsQsContext;
6+
use Enqueue\SnsQs\SnsQsTopic;
7+
use Interop\Queue\Destination;
8+
use Psr\Log\LoggerInterface;
9+
use Psr\Log\NullLogger;
10+
11+
/**
12+
* @method SnsQsContext getContext()
13+
* @method SnsQsTopic createRouterTopic()
14+
*/
15+
class SnsQsDriver extends GenericDriver
16+
{
17+
public function __construct(SnsQsContext $context, ...$args)
18+
{
19+
parent::__construct($context, ...$args);
20+
}
21+
22+
public function setupBroker(LoggerInterface $logger = null): void
23+
{
24+
$logger = $logger ?: new NullLogger();
25+
$log = function ($text, ...$args) use ($logger) {
26+
$logger->debug(sprintf('[SqsQsDriver] '.$text, ...$args));
27+
};
28+
29+
// setup router
30+
$routerTopic = $this->createRouterTopic();
31+
$log('Declare router topic: %s', $routerTopic->getTopicName());
32+
$this->getContext()->declareTopic($routerTopic);
33+
34+
$routerQueue = $this->createQueue($this->getConfig()->getRouterQueue());
35+
$log('Declare router queue: %s', $routerQueue->getQueueName());
36+
$this->getContext()->declareQueue($routerQueue);
37+
38+
$log('Bind router queue to topic: %s -> %s', $routerQueue->getQueueName(), $routerTopic->getTopicName());
39+
$this->getContext()->bind($routerTopic, $routerQueue);
40+
41+
// setup queues
42+
$declaredQueues = [];
43+
$declaredTopics = [];
44+
foreach ($this->getRouteCollection()->all() as $route) {
45+
$queue = $this->createRouteQueue($route);
46+
if (false === array_key_exists($queue->getQueueName(), $declaredQueues)) {
47+
$log('Declare processor queue: %s', $queue->getQueueName());
48+
$this->getContext()->declareQueue($queue);
49+
50+
$declaredQueues[$queue->getQueueName()] = true;
51+
}
52+
53+
if ($route->isCommand()) {
54+
continue;
55+
}
56+
57+
$topic = $this->doCreateTopic($this->createTransportQueueName($route->getSource(), true));
58+
if (false === array_key_exists($topic->getTopicName(), $declaredTopics)) {
59+
$log('Declare processor topic: %s', $topic->getTopicName());
60+
$this->getContext()->declareTopic($topic);
61+
62+
$declaredTopics[$topic->getTopicName()] = true;
63+
}
64+
65+
$log('Bind processor queue to topic: %s -> %s', $queue->getQueueName(), $topic->getTopicName());
66+
$this->getContext()->bind($topic, $queue);
67+
}
68+
}
69+
70+
protected function createRouterTopic(): Destination
71+
{
72+
return $this->doCreateTopic(
73+
$this->createTransportRouterTopicName($this->getConfig()->getRouterTopic(), true)
74+
);
75+
}
76+
77+
protected function createTransportRouterTopicName(string $name, bool $prefix): string
78+
{
79+
$name = parent::createTransportRouterTopicName($name, $prefix);
80+
81+
return str_replace('.', '_dot_', $name);
82+
}
83+
84+
protected function createTransportQueueName(string $name, bool $prefix): string
85+
{
86+
$name = parent::createTransportQueueName($name, $prefix);
87+
88+
return str_replace('.', '_dot_', $name);
89+
}
90+
}

pkg/enqueue/Client/Resources.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use Enqueue\Client\Driver\RabbitMqStompDriver;
1313
use Enqueue\Client\Driver\RdKafkaDriver;
1414
use Enqueue\Client\Driver\RedisDriver;
15+
use Enqueue\Client\Driver\SnsQsDriver;
1516
use Enqueue\Client\Driver\SqsDriver;
1617
use Enqueue\Client\Driver\StompDriver;
1718

@@ -92,6 +93,12 @@ public static function getKnownDrivers(): array
9293
'requiredSchemeExtensions' => [],
9394
'packages' => ['enqueue/enqueue', 'enqueue/sqs'],
9495
];
96+
$map[] = [
97+
'schemes' => ['snsqs'],
98+
'driverClass' => SnsQsDriver::class,
99+
'requiredSchemeExtensions' => [],
100+
'packages' => ['enqueue/enqueue', 'enqueue/sqs', 'enqueue/sns', 'enqueue/snsqs'],
101+
];
95102
$map[] = [
96103
'schemes' => ['stomp'],
97104
'driverClass' => StompDriver::class,

pkg/enqueue/Resources.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ public static function getKnownConnections(): array
155155
'schemes' => ['sqs'],
156156
'supportedSchemeExtensions' => [],
157157
'package' => 'enqueue/sqs', ];
158+
$map[SqsConnectionFactory::class] = [
159+
'schemes' => ['snsqs'],
160+
'supportedSchemeExtensions' => [],
161+
'package' => 'enqueue/snsqs', ];
158162
$map[GpsConnectionFactory::class] = [
159163
'schemes' => ['gps'],
160164
'supportedSchemeExtensions' => [],

0 commit comments

Comments
 (0)