diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php index 6e1d74edeedc9..f66214f3d4e73 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php @@ -359,4 +359,26 @@ public function testLastErrorGetsCleared() $this->assertSame('xack error', $e->getMessage()); } + + /** + * @dataProvider provideIdPatterns + */ + public function testAddReturnId(string $expected, \Redis $redis, int $delay = 0) + { + $id = Connection::fromDsn(dsn: 'redis://localhost/queue', redis: $redis)->add('body', [], $delay); + + $this->assertMatchesRegularExpression($expected, $id); + } + + public function provideIdPatterns(): \Generator + { + $redis = $this->createMock(\Redis::class); + $redis->expects($this->atLeastOnce())->method('xadd')->willReturn('THE_MESSAGE_ID'); + + yield 'No delay' => ['/^THE_MESSAGE_ID$/', $redis]; + + $redis = $this->createMock(\Redis::class); + $redis->expects($this->atLeastOnce())->method('rawCommand')->willReturn('1'); + yield '100ms delay' => ['/^\w+\.\d+$/', $redis, 100]; + } } diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisSenderTest.php b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisSenderTest.php index 3a4d817acc140..133cedef0798a 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisSenderTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisSenderTest.php @@ -16,6 +16,7 @@ use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection; use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisSender; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; class RedisSenderTest extends TestCase @@ -26,12 +27,17 @@ public function testSend() $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; $connection = $this->createMock(Connection::class); - $connection->expects($this->once())->method('add')->with($encoded['body'], $encoded['headers']); + $connection->expects($this->once())->method('add')->with($encoded['body'], $encoded['headers'])->willReturn('THE_MESSAGE_ID'); $serializer = $this->createMock(SerializerInterface::class); $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); $sender = new RedisSender($connection, $serializer); - $sender->send($envelope); + + /** @var TransportMessageIdStamp $stamp */ + $stamp = $sender->send($envelope)->last(TransportMessageIdStamp::class); + + $this->assertNotNull($stamp, sprintf('A "%s" stamp should be added', TransportMessageIdStamp::class)); + $this->assertSame('THE_MESSAGE_ID', $stamp->getId()); } } diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php index 1a38315925b41..6473a258a0292 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php @@ -437,7 +437,7 @@ public function reject(string $id): void } } - public function add(string $body, array $headers, int $delayInMs = 0): void + public function add(string $body, array $headers, int $delayInMs = 0): string { if ($this->autoSetup) { $this->setup(); @@ -445,11 +445,12 @@ public function add(string $body, array $headers, int $delayInMs = 0): void try { if ($delayInMs > 0) { // the delay is <= 0 for queued messages + $id = uniqid('', true); $message = json_encode([ 'body' => $body, 'headers' => $headers, // Entry need to be unique in the sorted set else it would only be added once to the delayed messages queue - 'uniqid' => uniqid('', true), + 'uniqid' => $id, ]); if (false === $message) { @@ -483,6 +484,8 @@ public function add(string $body, array $headers, int $delayInMs = 0): void } else { $added = $this->connection->xadd($this->stream, '*', ['message' => $message]); } + + $id = $added; } } catch (\RedisException $e) { if ($error = $this->connection->getLastError() ?: null) { @@ -497,6 +500,8 @@ public function add(string $body, array $headers, int $delayInMs = 0): void } throw new TransportException($error ?? 'Could not add a message to the redis stream.'); } + + return $id; } public function setup(): void diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisSender.php b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisSender.php index 79212a88e69c8..c81979335edea 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisSender.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisSender.php @@ -13,6 +13,7 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -42,9 +43,9 @@ public function send(Envelope $envelope): Envelope $delayStamp = $envelope->last(DelayStamp::class); $delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0; - $this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs); + $id = $this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs); - return $envelope; + return $envelope->with(new TransportMessageIdStamp($id)); } }