Skip to content

Commit 00976ee

Browse files
committed
[Messenger] Support Redis Cluster in DSN
1 parent 839528b commit 00976ee

File tree

4 files changed

+77
-19
lines changed

4 files changed

+77
-19
lines changed

src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
-----
66

77
* Added support for `\RedisCluster` instance in `Connection` constructor
8+
* Added support for Redis Cluster in DSN
89

910
5.2.0
1011
-----

src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php

+30
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,20 @@ public function testFromDsn()
6868
);
6969
}
7070

71+
public function testFromDsnWithMultipleHosts()
72+
{
73+
$this->skipIfRedisClusterUnavailable();
74+
75+
$hosts = explode(' ', getenv('REDIS_CLUSTER_HOSTS'));
76+
77+
$dsn = array_map(function ($host) {
78+
return 'redis://' . $host;
79+
}, $hosts);
80+
$dsn = \implode(',', $dsn);
81+
82+
$this->assertInstanceOf(Connection::class, Connection::fromDsn($dsn));
83+
}
84+
7185
public function testFromDsnOnUnixSocket()
7286
{
7387
$this->assertEquals(
@@ -429,4 +443,20 @@ public function testLazy()
429443
$connection->reject($message['id']);
430444
$redis->del('messenger-lazy');
431445
}
446+
447+
public function testLazyCluster()
448+
{
449+
$this->skipIfRedisClusterUnavailable();
450+
451+
$connection = new Connection(
452+
['lazy' => true],
453+
['host' => explode(' ', getenv('REDIS_CLUSTER_HOSTS'))]
454+
);
455+
456+
$connection->add('1', []);
457+
$this->assertNotEmpty($message = $connection->get());
458+
$this->assertSame('1', $message['body']);
459+
$connection->reject($message['id']);
460+
$connection->cleanup();
461+
}
432462
}

src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

+41-17
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ public function __construct(array $configuration, array $connectionCredentials =
7575
}
7676

7777
$initializer = static function ($redis) use ($host, $port, $auth, $serializer, $dbIndex) {
78+
if (null === $redis) {
79+
$redis = !is_array($host) ? new \Redis() : new \RedisCluster(null, $host);
80+
}
81+
7882
if ($redis instanceof \Redis) {
7983
$redis->connect($host, $port);
8084
}
@@ -89,17 +93,13 @@ public function __construct(array $configuration, array $connectionCredentials =
8993
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
9094
}
9195

92-
return true;
96+
return $redis;
9397
};
9498

95-
if (null === $redis) {
96-
$redis = new \Redis();
97-
}
98-
9999
if ($configuration['lazy'] ?? self::DEFAULT_OPTIONS['lazy']) {
100100
$redis = new RedisProxy($redis, $initializer);
101101
} else {
102-
$initializer($redis);
102+
$redis = $initializer($redis);
103103
}
104104

105105
$this->connection = $redis;
@@ -122,20 +122,25 @@ public function __construct(array $configuration, array $connectionCredentials =
122122
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
123123
}
124124

125-
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
125+
/**
126+
* @param \Redis|\RedisCluster|null $redis
127+
*/
128+
public static function fromDsn(string $dsn, array $redisOptions = [], $redis = null): self
126129
{
127-
$url = $dsn;
130+
if (false === strpos($dsn, ',')) {
131+
$parsedUrl = self::parseDsn($dsn, $redisOptions);
132+
} else {
133+
$parsedUrls = array_map(function ($dsn) use (&$redisOptions) {
134+
return self::parseDsn($dsn, $redisOptions);
135+
}, explode(',', $dsn));
128136

129-
if (preg_match('#^redis:///([^:@])+$#', $dsn)) {
130-
$url = str_replace('redis:', 'file:', $dsn);
131-
}
137+
// Merge all the URLs, the last one overrides the previous ones
138+
$parsedUrl = array_merge(...$parsedUrls);
132139

133-
if (false === $parsedUrl = parse_url($url)) {
134-
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
135-
}
136-
if (isset($parsedUrl['query'])) {
137-
parse_str($parsedUrl['query'], $dsnOptions);
138-
$redisOptions = array_merge($redisOptions, $dsnOptions);
140+
// Regroup all the hosts in an array interpretable by RedisCluster
141+
$parsedUrl['host'] = array_map(function ($parsedUrl) {
142+
return $parsedUrl['host'] . ':' . ($parsedUrl['port'] ?? 6379);
143+
}, $parsedUrls);
139144
}
140145

141146
self::validateOptions($redisOptions);
@@ -227,6 +232,25 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
227232
return new self($configuration, $connectionCredentials, $redisOptions, $redis);
228233
}
229234

235+
private static function parseDsn(string $dsn, array &$redisOptions): array
236+
{
237+
$url = $dsn;
238+
239+
if (preg_match('#^redis:///([^:@])+$#', $dsn)) {
240+
$url = str_replace('redis:', 'file:', $dsn);
241+
}
242+
243+
if (false === $parsedUrl = parse_url($url)) {
244+
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
245+
}
246+
if (isset($parsedUrl['query'])) {
247+
parse_str($parsedUrl['query'], $dsnOptions);
248+
$redisOptions = array_merge($redisOptions, $dsnOptions);
249+
}
250+
251+
return $parsedUrl;
252+
}
253+
230254
private static function validateOptions(array $options): void
231255
{
232256
$availableOptions = array_keys(self::DEFAULT_OPTIONS);

src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisProxy.php

+5-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class RedisProxy
2626
private $ready = false;
2727

2828
/**
29-
* @param \Redis|\RedisCluster $redis
29+
* @param \Redis|\RedisCluster|null $redis
3030
*/
3131
public function __construct($redis, \Closure $initializer)
3232
{
@@ -36,7 +36,10 @@ public function __construct($redis, \Closure $initializer)
3636

3737
public function __call(string $method, array $args)
3838
{
39-
$this->ready ?: $this->ready = $this->initializer->__invoke($this->redis);
39+
if (!$this->ready) {
40+
$this->redis = $this->initializer->__invoke($this->redis);
41+
$this->ready = true;
42+
}
4043

4144
return $this->redis->{$method}(...$args);
4245
}

0 commit comments

Comments
 (0)