diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 8928a32f75ea3..5ec0648794019 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -172,7 +172,7 @@ jobs: env: REDIS_HOST: 'localhost:16379' REDIS_CLUSTER_HOSTS: 'localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005' - REDIS_SENTINEL_HOSTS: 'localhost:26379 localhost:26379 localhost:26379' + REDIS_SENTINEL_HOSTS: 'unreachable-host:26379 localhost:26379 localhost:26379' REDIS_SENTINEL_SERVICE: redis_sentinel MESSENGER_REDIS_DSN: redis://127.0.0.1:7006/messages MESSENGER_AMQP_DSN: amqp://localhost/%2f/messages diff --git a/src/Symfony/Component/Cache/Traits/RedisTrait.php b/src/Symfony/Component/Cache/Traits/RedisTrait.php index bf45048590059..c1c581680d7c5 100644 --- a/src/Symfony/Component/Cache/Traits/RedisTrait.php +++ b/src/Symfony/Component/Cache/Traits/RedisTrait.php @@ -225,18 +225,18 @@ public static function createConnection(#[\SensitiveParameter] string $dsn, arra if (\defined('Redis::OPT_NULL_MULTIBULK_AS_NULL') && isset($params['auth'])) { $extra = [$params['auth']]; } - $sentinel = new $sentinelClass($host, $port, $params['timeout'], (string) $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...$extra); try { + $sentinel = new $sentinelClass($host, $port, $params['timeout'], (string) $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...$extra); if ($address = $sentinel->getMasterAddrByName($params['redis_sentinel'])) { [$host, $port] = $address; } - } catch (\RedisException $e) { + } catch (\RedisException|\Relay\Exception $redisException) { } } while (++$hostIndex < \count($hosts) && !$address); if (isset($params['redis_sentinel']) && !$address) { - throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $params['redis_sentinel'])); + throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $params['redis_sentinel']), previous: $redisException ?? null); } try { diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php index 254ee9edb8da5..a80aecd32ecb2 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php @@ -220,6 +220,35 @@ public function testConnectionClaimAndRedeliver() $connection->ack($message['id']); } + public function testSentinel() + { + if (!$hosts = getenv('REDIS_SENTINEL_HOSTS')) { + $this->markTestSkipped('REDIS_SENTINEL_HOSTS env var is not defined.'); + } + + if (!getenv('MESSENGER_REDIS_SENTINEL_MASTER')) { + $this->markTestSkipped('MESSENGER_REDIS_SENTINEL_MASTER env var is not defined.'); + } + + $dsn = 'redis:?host['.str_replace(' ', ']&host[', $hosts).']'; + + $connection = Connection::fromDsn($dsn, + ['delete_after_ack' => true, + 'sentinel_master' => getenv('MESSENGER_REDIS_SENTINEL_MASTER') ?: null, + ], $this->redis); + + $connection->add('1', []); + $this->assertNotEmpty($message = $connection->get()); + $this->assertSame([ + 'message' => json_encode([ + 'body' => '1', + 'headers' => [], + ]), + ], $message['data']); + $connection->reject($message['id']); + $connection->cleanup(); + } + public function testLazySentinel() { $connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php index 0704a9831a26a..63e44ae9f0ed7 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php @@ -84,26 +84,45 @@ public function __construct(array $options, \Redis|Relay|\RedisCluster $redis = throw new InvalidArgumentException('Redis Sentinel support requires ext-redis>=5.2, or ext-relay.'); } - if (null !== $sentinelMaster && ($redis instanceof \RedisCluster || \is_array($host))) { + if (null !== $sentinelMaster && $redis instanceof \RedisCluster) { throw new InvalidArgumentException('Cannot configure Redis Sentinel and Redis Cluster instance at the same time.'); } - if (\is_array($host) || $redis instanceof \RedisCluster) { + if ((\is_array($host) && null === $sentinelMaster) || $redis instanceof \RedisCluster) { $hosts = \is_string($host) ? [$host.':'.$port] : $host; // Always ensure we have an array $this->redis = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options); } else { - if (null !== $sentinelMaster) { - $sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class; - $sentinelClient = new $sentinelClass($host, $port, $options['timeout'], $options['persistent_id'], $options['retry_interval'], $options['read_timeout']); - - if (!$address = $sentinelClient->getMasterAddrByName($sentinelMaster)) { - throw new InvalidArgumentException(sprintf('Failed to retrieve master information from master name "%s" and address "%s:%d".', $sentinelMaster, $host, $port)); + $this->redis = static function () use ($redis, $sentinelMaster, $host, $port, $options, $auth) { + if (null !== $sentinelMaster) { + $sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class; + $hostIndex = 0; + $hosts = \is_array($host) ? $host : [['scheme' => 'tcp', 'host' => $host, 'port' => $port]]; + do { + $host = $hosts[$hostIndex]['host']; + $port = $hosts[$hostIndex]['port'] ?? 0; + $tls = 'tls' === $hosts[$hostIndex]['scheme']; + $address = false; + + if (isset($hosts[$hostIndex]['host']) && $tls) { + $host = 'tls://'.$host; + } + + try { + $sentinel = new $sentinelClass($host, $port, $options['timeout'], $options['persistent_id'], $options['retry_interval'], $options['read_timeout']); + if ($address = $sentinel->getMasterAddrByName($sentinelMaster)) { + [$host, $port] = $address; + } + } catch (\RedisException|\Relay\Exception $redisException) { + } + } while (++$hostIndex < \count($hosts) && !$address); + + if (!$address) { + throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $sentinelMaster), previous: $redisException ?? null); + } } - [$host, $port] = $address; - } - - $this->redis = static fn () => self::initializeRedis($redis ?? (\extension_loaded('redis') ? new \Redis() : new Relay()), $host, $port, $auth, $options); + return self::initializeRedis($redis ?? (\extension_loaded('redis') ? new \Redis() : new Relay()), $host, $port, $auth, $options); + }; } if (!$options['lazy']) { @@ -207,6 +226,32 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option $user = '' !== ($parsedUrl['user'] ?? '') ? urldecode($parsedUrl['user']) : null; $options['auth'] ??= null !== $pass && null !== $user ? [$user, $pass] : ($pass ?? $user); + if (isset($parsedUrl['query'])) { + parse_str($parsedUrl['query'], $query); + + if (isset($query['host'])) { + $tls = 'rediss' === $parsedUrl['scheme']; + $tcpScheme = $tls ? 'tls' : 'tcp'; + + if (!\is_array($hosts = $query['host'])) { + throw new InvalidArgumentException(sprintf('Invalid Redis DSN: "%s".', $dsn)); + } + foreach ($hosts as $host => $parameters) { + if (\is_string($parameters)) { + parse_str($parameters, $parameters); + } + if (false === $i = strrpos($host, ':')) { + $hosts[$host] = ['scheme' => $tcpScheme, 'host' => $host, 'port' => 6379] + $parameters; + } elseif ($port = (int) substr($host, 1 + $i)) { + $hosts[$host] = ['scheme' => $tcpScheme, 'host' => substr($host, 0, $i), 'port' => $port] + $parameters; + } else { + $hosts[$host] = ['scheme' => 'unix', 'host' => substr($host, 0, $i)] + $parameters; + } + } + $parsedUrl['host'] = array_values($hosts); + } + } + if (isset($parsedUrl['host'])) { $options['host'] = $parsedUrl['host'] ?? $options['host']; $options['port'] = $parsedUrl['port'] ?? $options['port']; diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 890e14a0c25ed..72d056f367c9e 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -6,6 +6,7 @@ CHANGELOG * Deprecate `StopWorkerOnSignalsListener` in favor of using the `SignalableCommandInterface` * Add `HandlerDescriptor::getOptions` + * Add support for multiple Redis Sentinel hosts 6.3 ---