Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,26 @@ public function testLastErrorGetsCleared()

$this->assertSame('xack error', $e->getMessage());
}

/**
* @dataProvider provideIdPatterns
*/
public function testAddReturnId(string $expected, \Redis $redis, int $delay = 0)
{
$id = Connection::fromDsn(dsn: 'redis://localhost/queue', redis: $redis)->add('body', [], $delay);

$this->assertMatchesRegularExpression($expected, $id);
}

public function provideIdPatterns(): \Generator
{
$redis = $this->createMock(\Redis::class);
$redis->expects($this->atLeastOnce())->method('xadd')->willReturn('THE_MESSAGE_ID');

yield 'No delay' => ['/^THE_MESSAGE_ID$/', $redis];

$redis = $this->createMock(\Redis::class);
$redis->expects($this->atLeastOnce())->method('rawCommand')->willReturn('1');
yield '100ms delay' => ['/^\w+\.\d+$/', $redis, 100];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisSender;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class RedisSenderTest extends TestCase
Expand All @@ -26,12 +27,17 @@ public function testSend()
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];

$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('add')->with($encoded['body'], $encoded['headers']);
$connection->expects($this->once())->method('add')->with($encoded['body'], $encoded['headers'])->willReturn('THE_MESSAGE_ID');

$serializer = $this->createMock(SerializerInterface::class);
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);

$sender = new RedisSender($connection, $serializer);
$sender->send($envelope);

/** @var TransportMessageIdStamp $stamp */
$stamp = $sender->send($envelope)->last(TransportMessageIdStamp::class);

$this->assertNotNull($stamp, sprintf('A "%s" stamp should be added', TransportMessageIdStamp::class));
$this->assertSame('THE_MESSAGE_ID', $stamp->getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -437,19 +437,20 @@ public function reject(string $id): void
}
}

public function add(string $body, array $headers, int $delayInMs = 0): void
public function add(string $body, array $headers, int $delayInMs = 0): string
{
if ($this->autoSetup) {
$this->setup();
}

try {
if ($delayInMs > 0) { // the delay is <= 0 for queued messages
$id = uniqid('', true);
$message = json_encode([
'body' => $body,
'headers' => $headers,
// Entry need to be unique in the sorted set else it would only be added once to the delayed messages queue
'uniqid' => uniqid('', true),
'uniqid' => $id,
]);

if (false === $message) {
Expand Down Expand Up @@ -483,6 +484,8 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
} else {
$added = $this->connection->xadd($this->stream, '*', ['message' => $message]);
}

$id = $added;
}
} catch (\RedisException $e) {
if ($error = $this->connection->getLastError() ?: null) {
Expand All @@ -497,6 +500,8 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
}
throw new TransportException($error ?? 'Could not add a message to the redis stream.');
}

return $id;
}

public function setup(): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

Expand Down Expand Up @@ -42,9 +43,9 @@ public function send(Envelope $envelope): Envelope
$delayStamp = $envelope->last(DelayStamp::class);
$delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0;

$this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs);
$id = $this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs);

return $envelope;
return $envelope->with(new TransportMessageIdStamp($id));
}
}

Expand Down