Skip to content

Commit e9a44e6

Browse files
author
Robin Chalas
committed
[Messenger] Fix missing auto_setup for RedisTransport
1 parent 6aacfea commit e9a44e6

File tree

2 files changed

+33
-17
lines changed

2 files changed

+33
-17
lines changed

src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

+2-10
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ public function testFromDsn()
4242
public function testFromDsnWithOptions()
4343
{
4444
$this->assertEquals(
45-
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
45+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false], [
4646
'host' => 'localhost',
4747
'port' => 6379,
4848
], [
4949
'serializer' => 2,
5050
]),
51-
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2])
51+
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2, 'auto_setup' => false])
5252
);
5353
}
5454

@@ -117,10 +117,6 @@ public function testGetAfterReject()
117117
{
118118
$redis = new \Redis();
119119
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
120-
try {
121-
$connection->setup();
122-
} catch (TransportException $e) {
123-
}
124120

125121
$connection->add('1', []);
126122
$connection->add('2', []);
@@ -139,10 +135,6 @@ public function testGetNonBlocking()
139135
$redis = new \Redis();
140136

141137
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
142-
try {
143-
$connection->setup();
144-
} catch (TransportException $e) {
145-
}
146138

147139
$this->assertNull($connection->get()); // no message, should return null immediately
148140
$connection->add('1', []);

src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php

+31-7
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@
2727
*/
2828
class Connection
2929
{
30+
private const DEFAULT_OPTIONS = [
31+
'stream' => 'messages',
32+
'group' => 'symfony',
33+
'consumer' => 'consumer',
34+
'auto_setup' => true,
35+
];
36+
3037
private $connection;
3138
private $stream;
3239
private $group;
@@ -38,9 +45,10 @@ public function __construct(array $configuration, array $connectionCredentials =
3845
$this->connection = $redis ?: new \Redis();
3946
$this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379);
4047
$this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP);
41-
$this->stream = $configuration['stream'] ?? '' ?: 'messages';
42-
$this->group = $configuration['group'] ?? '' ?: 'symfony';
43-
$this->consumer = $configuration['consumer'] ?? '' ?: 'consumer';
48+
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
49+
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
50+
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
51+
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
4452
}
4553

4654
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
@@ -51,9 +59,9 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
5159

5260
$pathParts = explode('/', $parsedUrl['path'] ?? '');
5361

54-
$stream = $pathParts[1] ?? '';
55-
$group = $pathParts[2] ?? '';
56-
$consumer = $pathParts[3] ?? '';
62+
$stream = $pathParts[1] ?? null;
63+
$group = $pathParts[2] ?? null;
64+
$consumer = $pathParts[3] ?? null;
5765

5866
$connectionCredentials = [
5967
'host' => $parsedUrl['host'] ?? '127.0.0.1',
@@ -64,11 +72,21 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
6472
parse_str($parsedUrl['query'], $redisOptions);
6573
}
6674

67-
return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer], $connectionCredentials, $redisOptions, $redis);
75+
$autoSetup = null;
76+
if (\array_key_exists('auto_setup', $redisOptions)) {
77+
$autoSetup = filter_var($redisOptions['auto_setup'], FILTER_VALIDATE_BOOLEAN);
78+
unset($redisOptions['auto_setup']);
79+
}
80+
81+
return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer, 'auto_setup' => $autoSetup], $connectionCredentials, $redisOptions, $redis);
6882
}
6983

7084
public function get(): ?array
7185
{
86+
if ($this->autoSetup) {
87+
$this->setup();
88+
}
89+
7290
$messageId = '>'; // will receive new messages
7391

7492
if ($this->couldHavePendingMessages) {
@@ -141,6 +159,10 @@ public function reject(string $id): void
141159

142160
public function add(string $body, array $headers): void
143161
{
162+
if ($this->autoSetup) {
163+
$this->setup();
164+
}
165+
144166
$e = null;
145167
try {
146168
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
@@ -161,5 +183,7 @@ public function setup(): void
161183
} catch (\RedisException $e) {
162184
throw new TransportException($e->getMessage(), 0, $e);
163185
}
186+
187+
$this->autoSetup = false;
164188
}
165189
}

0 commit comments

Comments
 (0)