Skip to content

Commit 75cce6d

Browse files
Steveb-pnicolas-grekas
authored andcommitted
[Messenger] Add option to prevent Redis from deleting messages on rejection
1 parent 174d91c commit 75cce6d

File tree

3 files changed

+36
-3
lines changed

3 files changed

+36
-3
lines changed

src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
CHANGELOG
22
=========
33

4+
5.2.0
5+
-----
6+
7+
* Added a `delete_after_reject` option to the DSN to allow control over message
8+
deletion, similar to `delete_after_ack`.
9+
410
5.1.0
511
-----
612

src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,21 @@ public function testDeleteAfterAck()
346346
$connection->ack('1');
347347
}
348348

349+
public function testDeleteAfterReject()
350+
{
351+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
352+
353+
$redis->expects($this->exactly(1))->method('xack')
354+
->with('queue', 'symfony', ['1'])
355+
->willReturn(1);
356+
$redis->expects($this->exactly(1))->method('xdel')
357+
->with('queue', ['1'])
358+
->willReturn(1);
359+
360+
$connection = Connection::fromDsn('redis://localhost/queue?delete_after_reject=true', [], $redis); // 1 = always
361+
$connection->reject('1');
362+
}
363+
349364
public function testLastErrorGetsCleared()
350365
{
351366
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();

src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class Connection
3333
'consumer' => 'consumer',
3434
'auto_setup' => true,
3535
'delete_after_ack' => false,
36+
'delete_after_reject' => true,
3637
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
3738
'dbindex' => 0,
3839
'tls' => false,
@@ -51,6 +52,7 @@ class Connection
5152
private $nextClaim = 0;
5253
private $claimInterval;
5354
private $deleteAfterAck;
55+
private $deleteAfterReject;
5456
private $couldHavePendingMessages = true;
5557

5658
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
@@ -89,6 +91,7 @@ public function __construct(array $configuration, array $connectionCredentials =
8991
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
9092
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
9193
$this->deleteAfterAck = $configuration['delete_after_ack'] ?? self::DEFAULT_OPTIONS['delete_after_ack'];
94+
$this->deleteAfterReject = $configuration['delete_after_reject'] ?? self::DEFAULT_OPTIONS['delete_after_reject'];
9295
$this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000;
9396
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
9497
}
@@ -128,6 +131,12 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
128131
unset($redisOptions['delete_after_ack']);
129132
}
130133

134+
$deleteAfterReject = null;
135+
if (\array_key_exists('delete_after_reject', $redisOptions)) {
136+
$deleteAfterReject = filter_var($redisOptions['delete_after_reject'], FILTER_VALIDATE_BOOLEAN);
137+
unset($redisOptions['delete_after_reject']);
138+
}
139+
131140
$dbIndex = null;
132141
if (\array_key_exists('dbindex', $redisOptions)) {
133142
$dbIndex = filter_var($redisOptions['dbindex'], \FILTER_VALIDATE_INT);
@@ -159,6 +168,7 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
159168
'auto_setup' => $autoSetup,
160169
'stream_max_entries' => $maxEntries,
161170
'delete_after_ack' => $deleteAfterAck,
171+
'delete_after_reject' => $deleteAfterReject,
162172
'dbindex' => $dbIndex,
163173
'redeliver_timeout' => $redeliverTimeout,
164174
'claim_interval' => $claimInterval,
@@ -348,7 +358,9 @@ public function reject(string $id): void
348358
{
349359
try {
350360
$deleted = $this->connection->xack($this->stream, $this->group, [$id]);
351-
$deleted = $this->connection->xdel($this->stream, [$id]) && $deleted;
361+
if ($this->deleteAfterReject) {
362+
$deleted = $this->connection->xdel($this->stream, [$id]) && $deleted;
363+
}
352364
} catch (\RedisException $e) {
353365
throw new TransportException($e->getMessage(), 0, $e);
354366
}
@@ -426,15 +438,15 @@ public function setup(): void
426438
$this->connection->clearLastError();
427439
}
428440

429-
if ($this->deleteAfterAck) {
441+
if ($this->deleteAfterAck || $this->deleteAfterReject) {
430442
$groups = $this->connection->xinfo('GROUPS', $this->stream);
431443
if (
432444
// support for Redis extension version 5+
433445
(\is_array($groups) && 1 < \count($groups))
434446
// support for Redis extension version 4.x
435447
|| (\is_string($groups) && substr_count($groups, '"name"'))
436448
) {
437-
throw new LogicException(sprintf('More than one group exists for stream "%s", delete_after_ack can not be enabled as it risks deleting messages before all groups could consume them.', $this->stream));
449+
throw new LogicException(sprintf('More than one group exists for stream "%s", delete_after_ack and delete_after_reject can not be enabled as it risks deleting messages before all groups could consume them.', $this->stream));
438450
}
439451
}
440452

0 commit comments

Comments
 (0)