Skip to content

Commit 85889d3

Browse files
committed
[Messenger] Support Redis Cluster in DSN
1 parent d5debd4 commit 85889d3

File tree

5 files changed

+156
-37
lines changed

5 files changed

+156
-37
lines changed

src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ CHANGELOG
77
* Add `rediss://` DSN scheme support for TLS protocol
88
* Deprecate TLS option, use `rediss://127.0.0.1` instead of `redis://127.0.0.1?tls=1`
99
* Add support for `\RedisCluster` instance in `Connection` constructor
10+
* Add support for Redis Cluster in DSN
1011

1112
5.2.0
1213
-----

src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php

+30
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,20 @@ public function testFromDsn()
6868
);
6969
}
7070

71+
public function testFromDsnWithMultipleHosts()
72+
{
73+
$this->skipIfRedisClusterUnavailable();
74+
75+
$hosts = explode(' ', getenv('REDIS_CLUSTER_HOSTS'));
76+
77+
$dsn = array_map(function ($host) {
78+
return 'redis://'.$host;
79+
}, $hosts);
80+
$dsn = implode(',', $dsn);
81+
82+
$this->assertInstanceOf(Connection::class, Connection::fromDsn($dsn));
83+
}
84+
7185
public function testFromDsnOnUnixSocket()
7286
{
7387
$this->assertEquals(
@@ -446,4 +460,20 @@ public function testLazy()
446460
$connection->reject($message['id']);
447461
$redis->del('messenger-lazy');
448462
}
463+
464+
public function testLazyCluster()
465+
{
466+
$this->skipIfRedisClusterUnavailable();
467+
468+
$connection = new Connection(
469+
['lazy' => true],
470+
['host' => explode(' ', getenv('REDIS_CLUSTER_HOSTS'))]
471+
);
472+
473+
$connection->add('1', []);
474+
$this->assertNotEmpty($message = $connection->get());
475+
$this->assertSame('1', $message['body']);
476+
$connection->reject($message['id']);
477+
$connection->cleanup();
478+
}
449479
}

src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

+79-36
Original file line numberDiff line numberDiff line change
@@ -74,32 +74,19 @@ public function __construct(array $configuration, array $connectionCredentials =
7474
$auth = null;
7575
}
7676

77-
$initializer = static function ($redis) use ($host, $port, $auth, $serializer, $dbIndex) {
78-
if ($redis instanceof \Redis) {
79-
$redis->connect($host, $port);
80-
}
81-
82-
$redis->setOption(\Redis::OPT_SERIALIZER, $serializer);
83-
84-
if (null !== $auth && $redis instanceof \Redis && !$redis->auth($auth)) {
85-
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
86-
}
87-
88-
if ($dbIndex && $redis instanceof \Redis && !$redis->select($dbIndex)) {
89-
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
90-
}
91-
92-
return true;
93-
};
94-
95-
if (null === $redis) {
96-
$redis = new \Redis();
97-
}
98-
99-
if ($configuration['lazy'] ?? self::DEFAULT_OPTIONS['lazy']) {
100-
$redis = new RedisProxy($redis, $initializer);
77+
$lazy = $configuration['lazy'] ?? self::DEFAULT_OPTIONS['lazy'];
78+
if (\is_array($host) || $redis instanceof \RedisCluster) {
79+
$hosts = \is_string($host) ? [$host.':'.$port] : $host; // Always ensure we have an array
80+
$initializer = static function ($redis) use ($hosts, $auth, $serializer) {
81+
return self::initializeRedisCluster($redis, $hosts, $auth, $serializer);
82+
};
83+
$redis = $lazy ? new RedisClusterProxy($redis, $initializer) : $initializer($redis);
10184
} else {
102-
$initializer($redis);
85+
$redis = $redis ?? new \Redis();
86+
$initializer = static function ($redis) use ($host, $port, $auth, $serializer, $dbIndex) {
87+
return self::initializeRedis($redis, $host, $port, $auth, $serializer, $dbIndex);
88+
};
89+
$redis = $lazy ? new RedisProxy($redis, $initializer) : $initializer($redis);
10390
}
10491

10592
$this->connection = $redis;
@@ -122,21 +109,57 @@ public function __construct(array $configuration, array $connectionCredentials =
122109
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
123110
}
124111

125-
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
112+
private static function initializeRedis(\Redis $redis, string $host, int $port, ?string $auth, int $serializer, int $dbIndex): \Redis
126113
{
127-
$url = $dsn;
128-
$scheme = 0 === strpos($dsn, 'rediss:') ? 'rediss' : 'redis';
114+
$redis->connect($host, $port);
115+
$redis->setOption(\Redis::OPT_SERIALIZER, $serializer);
129116

130-
if (preg_match('#^'.$scheme.':///([^:@])+$#', $dsn)) {
131-
$url = str_replace($scheme.':', 'file:', $dsn);
117+
if (null !== $auth && !$redis->auth($auth)) {
118+
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
132119
}
133120

134-
if (false === $parsedUrl = parse_url($url)) {
135-
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
121+
if ($dbIndex && !$redis->select($dbIndex)) {
122+
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
136123
}
137-
if (isset($parsedUrl['query'])) {
138-
parse_str($parsedUrl['query'], $dsnOptions);
139-
$redisOptions = array_merge($redisOptions, $dsnOptions);
124+
125+
return $redis;
126+
}
127+
128+
private static function initializeRedisCluster(?\RedisCluster $redis, array $hosts, ?string $auth, int $serializer): \RedisCluster
129+
{
130+
if (null === $redis) {
131+
$redis = new \RedisCluster(null, $hosts, 0.0, 0.0, false, $auth);
132+
}
133+
134+
$redis->setOption(\Redis::OPT_SERIALIZER, $serializer);
135+
136+
return $redis;
137+
}
138+
139+
/**
140+
* @param \Redis|\RedisCluster|null $redis
141+
*/
142+
public static function fromDsn(string $dsn, array $redisOptions = [], $redis = null): self
143+
{
144+
if (false === strpos($dsn, ',')) {
145+
$parsedUrl = self::parseDsn($dsn, $redisOptions);
146+
} else {
147+
$dsns = explode(',', $dsn);
148+
$parsedUrls = array_map(function ($dsn) use (&$redisOptions) {
149+
return self::parseDsn($dsn, $redisOptions);
150+
}, $dsns);
151+
152+
// Merge all the URLs, the last one overrides the previous ones
153+
$parsedUrl = array_merge(...$parsedUrls);
154+
155+
// Regroup all the hosts in an array interpretable by RedisCluster
156+
$parsedUrl['host'] = array_map(function ($parsedUrl, $dsn) {
157+
if (!isset($parsedUrl['host'])) {
158+
throw new InvalidArgumentException(sprintf('Missing host in DSN part "%s", it must be defined when using Redis Cluster.', $dsn));
159+
}
160+
161+
return $parsedUrl['host'].':'.($parsedUrl['port'] ?? 6379);
162+
}, $parsedUrls, $dsns);
140163
}
141164

142165
self::validateOptions($redisOptions);
@@ -171,7 +194,7 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
171194
unset($redisOptions['dbindex']);
172195
}
173196

174-
$tls = 'rediss' === $scheme;
197+
$tls = 'rediss' === $parsedUrl['scheme'];
175198
if (\array_key_exists('tls', $redisOptions)) {
176199
trigger_deprecation('symfony/redis-messenger', '5.3', 'Providing "tls" parameter is deprecated, use "rediss://" DSN scheme instead');
177200
$tls = filter_var($redisOptions['tls'], \FILTER_VALIDATE_BOOLEAN);
@@ -229,6 +252,26 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
229252
return new self($configuration, $connectionCredentials, $redisOptions, $redis);
230253
}
231254

255+
private static function parseDsn(string $dsn, array &$redisOptions): array
256+
{
257+
$url = $dsn;
258+
$scheme = 0 === strpos($dsn, 'rediss:') ? 'rediss' : 'redis';
259+
260+
if (preg_match('#^'.$scheme.':///([^:@])+$#', $dsn)) {
261+
$url = str_replace($scheme.':', 'file:', $dsn);
262+
}
263+
264+
if (false === $parsedUrl = parse_url($url)) {
265+
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
266+
}
267+
if (isset($parsedUrl['query'])) {
268+
parse_str($parsedUrl['query'], $dsnOptions);
269+
$redisOptions = array_merge($redisOptions, $dsnOptions);
270+
}
271+
272+
return $parsedUrl;
273+
}
274+
232275
private static function validateOptions(array $options): void
233276
{
234277
$availableOptions = array_keys(self::DEFAULT_OPTIONS);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
13+
14+
/**
15+
* Allow to delay connection to Redis Cluster.
16+
*
17+
* @author Johann Pardanaud <johann@pardanaud.com>
18+
*
19+
* @internal
20+
*/
21+
class RedisClusterProxy
22+
{
23+
private $redis;
24+
private $initializer;
25+
private $ready = false;
26+
27+
public function __construct(?\RedisCluster $redis, \Closure $initializer)
28+
{
29+
$this->redis = $redis;
30+
$this->initializer = $initializer;
31+
}
32+
33+
public function __call(string $method, array $args)
34+
{
35+
if (!$this->ready) {
36+
$this->redis = $this->initializer->__invoke($this->redis);
37+
$this->ready = true;
38+
}
39+
40+
return $this->redis->{$method}(...$args);
41+
}
42+
}

src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisProxy.php

+4-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ public function __construct(\Redis $redis, \Closure $initializer)
3333

3434
public function __call(string $method, array $args)
3535
{
36-
$this->ready ?: $this->ready = $this->initializer->__invoke($this->redis);
36+
if (!$this->ready) {
37+
$this->redis = $this->initializer->__invoke($this->redis);
38+
$this->ready = true;
39+
}
3740

3841
return $this->redis->{$method}(...$args);
3942
}

0 commit comments

Comments
 (0)