Skip to content

[Messenger] Add support for multiple Redis Sentinel hosts #51687

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ jobs:
env:
REDIS_HOST: 'localhost:16379'
REDIS_CLUSTER_HOSTS: 'localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
REDIS_SENTINEL_HOSTS: 'localhost:26379 localhost:26379 localhost:26379'
REDIS_SENTINEL_HOSTS: 'unreachable-host:26379 localhost:26379 localhost:26379'
REDIS_SENTINEL_SERVICE: redis_sentinel
MESSENGER_REDIS_DSN: redis://127.0.0.1:7006/messages
MESSENGER_AMQP_DSN: amqp://localhost/%2f/messages
Expand Down
6 changes: 3 additions & 3 deletions src/Symfony/Component/Cache/Traits/RedisTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -225,18 +225,18 @@ public static function createConnection(#[\SensitiveParameter] string $dsn, arra
if (\defined('Redis::OPT_NULL_MULTIBULK_AS_NULL') && isset($params['auth'])) {
$extra = [$params['auth']];
}
$sentinel = new $sentinelClass($host, $port, $params['timeout'], (string) $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...$extra);

try {
$sentinel = new $sentinelClass($host, $port, $params['timeout'], (string) $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...$extra);
if ($address = $sentinel->getMasterAddrByName($params['redis_sentinel'])) {
[$host, $port] = $address;
}
} catch (\RedisException $e) {
} catch (\RedisException|\Relay\Exception $redisException) {
}
} while (++$hostIndex < \count($hosts) && !$address);

if (isset($params['redis_sentinel']) && !$address) {
throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $params['redis_sentinel']));
throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $params['redis_sentinel']), previous: $redisException ?? null);
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,35 @@ public function testConnectionClaimAndRedeliver()
$connection->ack($message['id']);
}

public function testSentinel()
{
if (!$hosts = getenv('REDIS_SENTINEL_HOSTS')) {
$this->markTestSkipped('REDIS_SENTINEL_HOSTS env var is not defined.');
}

if (!getenv('MESSENGER_REDIS_SENTINEL_MASTER')) {
$this->markTestSkipped('MESSENGER_REDIS_SENTINEL_MASTER env var is not defined.');
}

$dsn = 'redis:?host['.str_replace(' ', ']&host[', $hosts).']';

$connection = Connection::fromDsn($dsn,
['delete_after_ack' => true,
'sentinel_master' => getenv('MESSENGER_REDIS_SENTINEL_MASTER') ?: null,
], $this->redis);

$connection->add('1', []);
$this->assertNotEmpty($message = $connection->get());
$this->assertSame([
'message' => json_encode([
'body' => '1',
'headers' => [],
]),
], $message['data']);
$connection->reject($message['id']);
$connection->cleanup();
}

public function testLazySentinel()
{
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,26 +84,45 @@ public function __construct(array $options, \Redis|Relay|\RedisCluster $redis =
throw new InvalidArgumentException('Redis Sentinel support requires ext-redis>=5.2, or ext-relay.');
}

if (null !== $sentinelMaster && ($redis instanceof \RedisCluster || \is_array($host))) {
if (null !== $sentinelMaster && $redis instanceof \RedisCluster) {
throw new InvalidArgumentException('Cannot configure Redis Sentinel and Redis Cluster instance at the same time.');
}

if (\is_array($host) || $redis instanceof \RedisCluster) {
if ((\is_array($host) && null === $sentinelMaster) || $redis instanceof \RedisCluster) {
$hosts = \is_string($host) ? [$host.':'.$port] : $host; // Always ensure we have an array
$this->redis = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options);
} else {
if (null !== $sentinelMaster) {
$sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class;
$sentinelClient = new $sentinelClass($host, $port, $options['timeout'], $options['persistent_id'], $options['retry_interval'], $options['read_timeout']);

if (!$address = $sentinelClient->getMasterAddrByName($sentinelMaster)) {
throw new InvalidArgumentException(sprintf('Failed to retrieve master information from master name "%s" and address "%s:%d".', $sentinelMaster, $host, $port));
$this->redis = static function () use ($redis, $sentinelMaster, $host, $port, $options, $auth) {
if (null !== $sentinelMaster) {
$sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class;
$hostIndex = 0;
$hosts = \is_array($host) ? $host : [['scheme' => 'tcp', 'host' => $host, 'port' => $port]];
do {
$host = $hosts[$hostIndex]['host'];
$port = $hosts[$hostIndex]['port'] ?? 0;
$tls = 'tls' === $hosts[$hostIndex]['scheme'];
$address = false;

if (isset($hosts[$hostIndex]['host']) && $tls) {
$host = 'tls://'.$host;
}

try {
$sentinel = new $sentinelClass($host, $port, $options['timeout'], $options['persistent_id'], $options['retry_interval'], $options['read_timeout']);
if ($address = $sentinel->getMasterAddrByName($sentinelMaster)) {
[$host, $port] = $address;
}
} catch (\RedisException|\Relay\Exception $redisException) {
}
} while (++$hostIndex < \count($hosts) && !$address);

if (!$address) {
throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $sentinelMaster), previous: $redisException ?? null);
}
}

[$host, $port] = $address;
}

$this->redis = static fn () => self::initializeRedis($redis ?? (\extension_loaded('redis') ? new \Redis() : new Relay()), $host, $port, $auth, $options);
return self::initializeRedis($redis ?? (\extension_loaded('redis') ? new \Redis() : new Relay()), $host, $port, $auth, $options);
};
}

if (!$options['lazy']) {
Expand Down Expand Up @@ -207,6 +226,32 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option
$user = '' !== ($parsedUrl['user'] ?? '') ? urldecode($parsedUrl['user']) : null;
$options['auth'] ??= null !== $pass && null !== $user ? [$user, $pass] : ($pass ?? $user);

if (isset($parsedUrl['query'])) {
parse_str($parsedUrl['query'], $query);

if (isset($query['host'])) {
$tls = 'rediss' === $parsedUrl['scheme'];
$tcpScheme = $tls ? 'tls' : 'tcp';

if (!\is_array($hosts = $query['host'])) {
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: "%s".', $dsn));
}
foreach ($hosts as $host => $parameters) {
if (\is_string($parameters)) {
parse_str($parameters, $parameters);
}
if (false === $i = strrpos($host, ':')) {
$hosts[$host] = ['scheme' => $tcpScheme, 'host' => $host, 'port' => 6379] + $parameters;
} elseif ($port = (int) substr($host, 1 + $i)) {
$hosts[$host] = ['scheme' => $tcpScheme, 'host' => substr($host, 0, $i), 'port' => $port] + $parameters;
} else {
$hosts[$host] = ['scheme' => 'unix', 'host' => substr($host, 0, $i)] + $parameters;
}
}
$parsedUrl['host'] = array_values($hosts);
}
}

if (isset($parsedUrl['host'])) {
$options['host'] = $parsedUrl['host'] ?? $options['host'];
$options['port'] = $parsedUrl['port'] ?? $options['port'];
Expand Down
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ CHANGELOG

* Deprecate `StopWorkerOnSignalsListener` in favor of using the `SignalableCommandInterface`
* Add `HandlerDescriptor::getOptions`
* Add support for multiple Redis Sentinel hosts

6.3
---
Expand Down