Skip to content

Commit 77ec2ad

Browse files
feature #44522 [Messenger] add TransportMessageIdStamp to RedisSender (GaryPEGEOT)
This PR was squashed before being merged into the 6.1 branch. Discussion ---------- [Messenger] add TransportMessageIdStamp to RedisSender | Q | A | ------------- | --- | Branch? | 6.1 | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | | License | MIT | Doc PR | Commits ------- 5d385a1 [Messenger] add TransportMessageIdStamp to RedisSender
2 parents dd1f998 + 5d385a1 commit 77ec2ad

File tree

4 files changed

+40
-6
lines changed

4 files changed

+40
-6
lines changed

src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,4 +359,26 @@ public function testLastErrorGetsCleared()
359359

360360
$this->assertSame('xack error', $e->getMessage());
361361
}
362+
363+
/**
364+
* @dataProvider provideIdPatterns
365+
*/
366+
public function testAddReturnId(string $expected, \Redis $redis, int $delay = 0)
367+
{
368+
$id = Connection::fromDsn(dsn: 'redis://localhost/queue', redis: $redis)->add('body', [], $delay);
369+
370+
$this->assertMatchesRegularExpression($expected, $id);
371+
}
372+
373+
public function provideIdPatterns(): \Generator
374+
{
375+
$redis = $this->createMock(\Redis::class);
376+
$redis->expects($this->atLeastOnce())->method('xadd')->willReturn('THE_MESSAGE_ID');
377+
378+
yield 'No delay' => ['/^THE_MESSAGE_ID$/', $redis];
379+
380+
$redis = $this->createMock(\Redis::class);
381+
$redis->expects($this->atLeastOnce())->method('rawCommand')->willReturn('1');
382+
yield '100ms delay' => ['/^\w+\.\d+$/', $redis, 100];
383+
}
362384
}

src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisSenderTest.php

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
1717
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisSender;
1818
use Symfony\Component\Messenger\Envelope;
19+
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
1920
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2021

2122
class RedisSenderTest extends TestCase
@@ -26,12 +27,17 @@ public function testSend()
2627
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
2728

2829
$connection = $this->createMock(Connection::class);
29-
$connection->expects($this->once())->method('add')->with($encoded['body'], $encoded['headers']);
30+
$connection->expects($this->once())->method('add')->with($encoded['body'], $encoded['headers'])->willReturn('THE_MESSAGE_ID');
3031

3132
$serializer = $this->createMock(SerializerInterface::class);
3233
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
3334

3435
$sender = new RedisSender($connection, $serializer);
35-
$sender->send($envelope);
36+
37+
/** @var TransportMessageIdStamp $stamp */
38+
$stamp = $sender->send($envelope)->last(TransportMessageIdStamp::class);
39+
40+
$this->assertNotNull($stamp, sprintf('A "%s" stamp should be added', TransportMessageIdStamp::class));
41+
$this->assertSame('THE_MESSAGE_ID', $stamp->getId());
3642
}
3743
}

src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,19 +439,20 @@ public function reject(string $id): void
439439
}
440440
}
441441

442-
public function add(string $body, array $headers, int $delayInMs = 0): void
442+
public function add(string $body, array $headers, int $delayInMs = 0): string
443443
{
444444
if ($this->autoSetup) {
445445
$this->setup();
446446
}
447447

448448
try {
449449
if ($delayInMs > 0) { // the delay is <= 0 for queued messages
450+
$id = uniqid('', true);
450451
$message = json_encode([
451452
'body' => $body,
452453
'headers' => $headers,
453454
// Entry need to be unique in the sorted set else it would only be added once to the delayed messages queue
454-
'uniqid' => uniqid('', true),
455+
'uniqid' => $id,
455456
]);
456457

457458
if (false === $message) {
@@ -485,6 +486,8 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
485486
} else {
486487
$added = $this->connection->xadd($this->stream, '*', ['message' => $message]);
487488
}
489+
490+
$id = $added;
488491
}
489492
} catch (\RedisException $e) {
490493
if ($error = $this->connection->getLastError() ?: null) {
@@ -499,6 +502,8 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
499502
}
500503
throw new TransportException($error ?? 'Could not add a message to the redis stream.');
501504
}
505+
506+
return $id;
502507
}
503508

504509
public function setup(): void

src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisSender.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Stamp\DelayStamp;
16+
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
1617
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1718
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1819

@@ -42,9 +43,9 @@ public function send(Envelope $envelope): Envelope
4243
$delayStamp = $envelope->last(DelayStamp::class);
4344
$delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0;
4445

45-
$this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs);
46+
$id = $this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs);
4647

47-
return $envelope;
48+
return $envelope->with(new TransportMessageIdStamp($id));
4849
}
4950
}
5051

0 commit comments

Comments
 (0)