Skip to content

Commit d642f76

Browse files
committed
feature #42163 [Messenger] [Redis] Prepare turning delete_after_ack to true in 6.0 (chalasr)
This PR was merged into the 5.4 branch. Discussion ---------- [Messenger] [Redis] Prepare turning `delete_after_ack` to `true` in 6.0 | Q | A | ------------- | --- | Branch? | 5.4 | Bug fix? | no | New feature? | no | Deprecations? | yes | Tickets | Fix #42122 | License | MIT | Docs PR | todo Having this option turned on makes redis-messenger much more consistent with other transports, more adapted to the 80% use case and more importantly, prevents having to deal with OOM issues. Commits ------- b439a11 [Messenger][Redis] Prepare turning `delete_after_ack` to `true` in 6.0
2 parents 3955d39 + b439a11 commit d642f76

File tree

7 files changed

+70
-44
lines changed

7 files changed

+70
-44
lines changed

UPGRADE-5.4.md

+6
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ HttpKernel
2323

2424
* Deprecate `AbstractTestSessionListener::getSession` inject a session in the request instead
2525

26+
Messenger
27+
---------
28+
29+
* Deprecate not setting the `delete_after_ack` config option (or DSN parameter) using the Redis transport,
30+
its default value will change to `true` in 6.0
31+
2632
SecurityBundle
2733
--------------
2834

UPGRADE-6.0.md

+1
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ Messenger
152152
* The signature of method `RetryStrategyInterface::getWaitingTime()` has been updated to `RetryStrategyInterface::getWaitingTime(Envelope $message, \Throwable $throwable = null)`.
153153
* Removed the `prefetch_count` parameter in the AMQP bridge.
154154
* Removed the use of TLS option for Redis Bridge, use `rediss://127.0.0.1` instead of `redis://127.0.0.1?tls=1`
155+
* The `delete_after_ack` config option of the Redis transport now defaults to `true`
155156

156157
Mime
157158
----

src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
CHANGELOG
22
=========
33

4+
5.4
5+
---
6+
7+
* Deprecate not setting the `delete_after_ack` config option (or DSN parameter),
8+
its default value will change to `true` in 6.0
9+
410
5.3
511
---
612

src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php

+50-39
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class ConnectionTest extends TestCase
2828
public static function setUpBeforeClass(): void
2929
{
3030
try {
31-
$redis = Connection::fromDsn('redis://localhost/queue');
31+
$redis = Connection::fromDsn('redis://localhost/queue?delete_after_ack=true');
3232
$redis->get();
3333
} catch (TransportException $e) {
3434
if (0 === strpos($e->getMessage(), 'ERR unknown command \'X')) {
@@ -61,11 +61,11 @@ public function testFromInvalidDsn()
6161
public function testFromDsn()
6262
{
6363
$this->assertEquals(
64-
new Connection(['stream' => 'queue'], [
64+
new Connection(['stream' => 'queue', 'delete_after_ack' => true], [
6565
'host' => 'localhost',
6666
'port' => 6379,
6767
]),
68-
Connection::fromDsn('redis://localhost/queue')
68+
Connection::fromDsn('redis://localhost/queue?delete_after_ack=1')
6969
);
7070
}
7171

@@ -80,33 +80,33 @@ public function testFromDsnWithMultipleHosts()
8080
}, $hosts);
8181
$dsn = implode(',', $dsn);
8282

83-
$this->assertInstanceOf(Connection::class, Connection::fromDsn($dsn));
83+
$this->assertInstanceOf(Connection::class, Connection::fromDsn($dsn, ['delete_after_ack' => true]));
8484
}
8585

8686
public function testFromDsnOnUnixSocket()
8787
{
8888
$this->assertEquals(
89-
new Connection(['stream' => 'queue'], [
89+
new Connection(['stream' => 'queue', 'delete_after_ack' => true], [
9090
'host' => '/var/run/redis/redis.sock',
9191
'port' => 0,
9292
], [], $redis = $this->createMock(\Redis::class)),
93-
Connection::fromDsn('redis:///var/run/redis/redis.sock', ['stream' => 'queue'], $redis)
93+
Connection::fromDsn('redis:///var/run/redis/redis.sock', ['stream' => 'queue', 'delete_after_ack' => true], $redis)
9494
);
9595
}
9696

9797
public function testFromDsnWithOptions()
9898
{
9999
$this->assertEquals(
100-
Connection::fromDsn('redis://localhost', ['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'serializer' => 2]),
101-
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2&auto_setup=0')
100+
Connection::fromDsn('redis://localhost', ['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'serializer' => 2, 'delete_after_ack' => true]),
101+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2&auto_setup=0&delete_after_ack=1')
102102
);
103103
}
104104

105105
public function testFromDsnWithOptionsAndTrailingSlash()
106106
{
107107
$this->assertEquals(
108-
Connection::fromDsn('redis://localhost/', ['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'serializer' => 2]),
109-
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2&auto_setup=0')
108+
Connection::fromDsn('redis://localhost/', ['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'serializer' => 2, 'delete_after_ack' => true]),
109+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2&auto_setup=0&delete_after_ack=1')
110110
);
111111
}
112112

@@ -146,32 +146,32 @@ public function testFromDsnWithRedissScheme()
146146
->with('tls://127.0.0.1', 6379)
147147
->willReturn(null);
148148

149-
Connection::fromDsn('rediss://127.0.0.1', [], $redis);
149+
Connection::fromDsn('rediss://127.0.0.1?delete_after_ack=true', [], $redis);
150150
}
151151

152152
public function testFromDsnWithQueryOptions()
153153
{
154154
$this->assertEquals(
155-
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
155+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'delete_after_ack' => true], [
156156
'host' => 'localhost',
157157
'port' => 6379,
158158
], [
159159
'serializer' => 2,
160160
]),
161-
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2')
161+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2&delete_after_ack=1')
162162
);
163163
}
164164

165165
public function testFromDsnWithMixDsnQueryOptions()
166166
{
167167
$this->assertEquals(
168-
Connection::fromDsn('redis://localhost/queue/group1?serializer=2', ['consumer' => 'specific-consumer']),
169-
Connection::fromDsn('redis://localhost/queue/group1/specific-consumer?serializer=2')
168+
Connection::fromDsn('redis://localhost/queue/group1?serializer=2', ['consumer' => 'specific-consumer', 'delete_after_ack' => true]),
169+
Connection::fromDsn('redis://localhost/queue/group1/specific-consumer?serializer=2&delete_after_ack=1')
170170
);
171171

172172
$this->assertEquals(
173-
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['consumer' => 'specific-consumer']),
174-
Connection::fromDsn('redis://localhost/queue/group1/consumer1')
173+
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['consumer' => 'specific-consumer', 'delete_after_ack' => true]),
174+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?delete_after_ack=1')
175175
);
176176
}
177177

@@ -200,7 +200,7 @@ public function testKeepGettingPendingMessages()
200200
->with('symfony', 'consumer', ['queue' => 0], 1, null)
201201
->willReturn(['queue' => [['message' => '{"body":"Test","headers":[]}']]]);
202202

203-
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
203+
$connection = Connection::fromDsn('redis://localhost/queue', ['delete_after_ack' => true], $redis);
204204
$this->assertNotNull($connection->get());
205205
$this->assertNotNull($connection->get());
206206
$this->assertNotNull($connection->get());
@@ -214,7 +214,7 @@ public function testAuth()
214214
->with('password')
215215
->willReturn(true);
216216

217-
Connection::fromDsn('redis://password@localhost/queue', [], $redis);
217+
Connection::fromDsn('redis://password@localhost/queue', ['delete_after_ack' => true], $redis);
218218
}
219219

220220
public function testAuthFromOptions()
@@ -225,7 +225,7 @@ public function testAuthFromOptions()
225225
->with('password')
226226
->willReturn(true);
227227

228-
Connection::fromDsn('redis://localhost/queue', ['auth' => 'password'], $redis);
228+
Connection::fromDsn('redis://localhost/queue', ['auth' => 'password', 'delete_after_ack' => true], $redis);
229229
}
230230

231231
public function testAuthFromOptionsAndDsn()
@@ -236,7 +236,7 @@ public function testAuthFromOptionsAndDsn()
236236
->with('password2')
237237
->willReturn(true);
238238

239-
Connection::fromDsn('redis://password1@localhost/queue', ['auth' => 'password2'], $redis);
239+
Connection::fromDsn('redis://password1@localhost/queue', ['auth' => 'password2', 'delete_after_ack' => true], $redis);
240240
}
241241

242242
public function testNoAuthWithEmptyPassword()
@@ -247,7 +247,7 @@ public function testNoAuthWithEmptyPassword()
247247
->with('')
248248
->willThrowException(new \RuntimeException());
249249

250-
Connection::fromDsn('redis://@localhost/queue', [], $redis);
250+
Connection::fromDsn('redis://@localhost/queue', ['delete_after_ack' => true], $redis);
251251
}
252252

253253
public function testAuthZeroPassword()
@@ -258,7 +258,7 @@ public function testAuthZeroPassword()
258258
->with('0')
259259
->willReturn(true);
260260

261-
Connection::fromDsn('redis://0@localhost/queue', [], $redis);
261+
Connection::fromDsn('redis://0@localhost/queue', ['delete_after_ack' => true], $redis);
262262
}
263263

264264
public function testFailedAuth()
@@ -271,14 +271,14 @@ public function testFailedAuth()
271271
->with('password')
272272
->willReturn(false);
273273

274-
Connection::fromDsn('redis://password@localhost/queue', [], $redis);
274+
Connection::fromDsn('redis://password@localhost/queue', ['delete_after_ack' => true], $redis);
275275
}
276276

277277
public function testDbIndex()
278278
{
279279
$redis = new \Redis();
280280

281-
Connection::fromDsn('redis://localhost/queue?dbindex=2', [], $redis);
281+
Connection::fromDsn('redis://localhost/queue?dbindex=2', ['delete_after_ack' => true], $redis);
282282

283283
$this->assertSame(2, $redis->getDbNum());
284284
}
@@ -291,7 +291,7 @@ public function testGetPendingMessageFirst()
291291
->with('symfony', 'consumer', ['queue' => '0'], 1, null)
292292
->willReturn(['queue' => [['message' => '{"body":"1","headers":[]}']]]);
293293

294-
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
294+
$connection = Connection::fromDsn('redis://localhost/queue', ['delete_after_ack' => true], $redis);
295295
$connection->get();
296296
}
297297

@@ -317,7 +317,7 @@ public function testClaimAbandonedMessageWithRaceCondition()
317317
->with('queue', 'symfony', 'consumer', 3600000, ['redisid-123'], ['JUSTID'])
318318
->willReturn([]);
319319

320-
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
320+
$connection = Connection::fromDsn('redis://localhost/queue', ['delete_after_ack' => true], $redis);
321321
$connection->get();
322322
}
323323

@@ -345,7 +345,7 @@ public function testClaimAbandonedMessage()
345345
->with('queue', 'symfony', 'consumer', 3600000, ['redisid-123'], ['JUSTID'])
346346
->willReturn([]);
347347

348-
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
348+
$connection = Connection::fromDsn('redis://localhost/queue', ['delete_after_ack' => true], $redis);
349349
$connection->get();
350350
}
351351

@@ -357,22 +357,22 @@ public function testUnexpectedRedisError()
357357
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
358358
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');
359359

360-
$connection = Connection::fromDsn('redis://localhost/queue', ['auto_setup' => false], $redis);
360+
$connection = Connection::fromDsn('redis://localhost/queue', ['auto_setup' => false, 'delete_after_ack' => true], $redis);
361361
$connection->get();
362362
}
363363

364364
public function testGetAfterReject()
365365
{
366366
$redis = new \Redis();
367-
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
367+
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', ['delete_after_ack' => true], $redis);
368368

369369
$connection->add('1', []);
370370
$connection->add('2', []);
371371

372372
$failing = $connection->get();
373373
$connection->reject($failing['id']);
374374

375-
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
375+
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', ['delete_after_ack' => true]);
376376
$this->assertNotNull($connection->get());
377377

378378
$redis->del('messenger-rejectthenget');
@@ -382,7 +382,7 @@ public function testGetNonBlocking()
382382
{
383383
$redis = new \Redis();
384384

385-
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
385+
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', ['delete_after_ack' => true], $redis);
386386

387387
$this->assertNull($connection->get()); // no message, should return null immediately
388388
$connection->add('1', []);
@@ -394,7 +394,7 @@ public function testGetNonBlocking()
394394
public function testJsonError()
395395
{
396396
$redis = new \Redis();
397-
$connection = Connection::fromDsn('redis://localhost/json-error', [], $redis);
397+
$connection = Connection::fromDsn('redis://localhost/json-error', ['delete_after_ack' => true], $redis);
398398
try {
399399
$connection->add("\xB1\x31", []);
400400
} catch (TransportException $e) {
@@ -411,7 +411,7 @@ public function testMaxEntries()
411411
->with('queue', '*', ['message' => '{"body":"1","headers":[]}'], 20000, true)
412412
->willReturn(1);
413413

414-
$connection = Connection::fromDsn('redis://localhost/queue?stream_max_entries=20000', [], $redis); // 1 = always
414+
$connection = Connection::fromDsn('redis://localhost/queue?stream_max_entries=20000', ['delete_after_ack' => true], $redis);
415415
$connection->add('1', []);
416416
}
417417

@@ -426,10 +426,20 @@ public function testDeleteAfterAck()
426426
->with('queue', ['1'])
427427
->willReturn(1);
428428

429-
$connection = Connection::fromDsn('redis://localhost/queue?delete_after_ack=true', [], $redis); // 1 = always
429+
$connection = Connection::fromDsn('redis://localhost/queue?delete_after_ack=true', [], $redis);
430430
$connection->ack('1');
431431
}
432432

433+
/**
434+
* @group legacy
435+
*/
436+
public function testLegacyOmitDeleteAfterAck()
437+
{
438+
$this->expectDeprecation('Since symfony/redis-messenger 5.4: Not setting the "delete_after_ack" boolean option explicitly is deprecated, its default value will change to true in 6.0.');
439+
440+
Connection::fromDsn('redis://localhost/queue');
441+
}
442+
433443
public function testDeleteAfterReject()
434444
{
435445
$redis = $this->createMock(\Redis::class);
@@ -441,7 +451,7 @@ public function testDeleteAfterReject()
441451
->with('queue', ['1'])
442452
->willReturn(1);
443453

444-
$connection = Connection::fromDsn('redis://localhost/queue?delete_after_reject=true', [], $redis); // 1 = always
454+
$connection = Connection::fromDsn('redis://localhost/queue?delete_after_reject=true', ['delete_after_ack' => true], $redis);
445455
$connection->reject('1');
446456
}
447457

@@ -455,7 +465,7 @@ public function testLastErrorGetsCleared()
455465
$redis->method('getLastError')->willReturnOnConsecutiveCalls('xadd error', 'xack error');
456466
$redis->expects($this->exactly(2))->method('clearLastError');
457467

458-
$connection = Connection::fromDsn('redis://localhost/messenger-clearlasterror', ['auto_setup' => false], $redis);
468+
$connection = Connection::fromDsn('redis://localhost/messenger-clearlasterror', ['auto_setup' => false, 'delete_after_ack' => true], $redis);
459469

460470
try {
461471
$connection->add('message', []);
@@ -475,7 +485,7 @@ public function testLastErrorGetsCleared()
475485
public function testLazy()
476486
{
477487
$redis = new \Redis();
478-
$connection = Connection::fromDsn('redis://localhost/messenger-lazy?lazy=1', [], $redis);
488+
$connection = Connection::fromDsn('redis://localhost/messenger-lazy?lazy=1', ['delete_after_ack' => true], $redis);
479489

480490
$connection->add('1', []);
481491
$this->assertNotEmpty($message = $connection->get());
@@ -490,7 +500,8 @@ public function testLazyCluster()
490500

491501
$connection = new Connection(
492502
['lazy' => true],
493-
['host' => explode(' ', getenv('REDIS_CLUSTER_HOSTS'))]
503+
['host' => explode(' ', getenv('REDIS_CLUSTER_HOSTS'))],
504+
['delete_after_ack' => true]
494505
);
495506

496507
$connection->add('1', []);

src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php

+3-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ protected function setUp(): void
3333

3434
try {
3535
$this->redis = new \Redis();
36-
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
36+
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), ['delete_after_ack' => true], $this->redis);
3737
$this->connection->cleanup();
3838
$this->connection->setup();
3939
} catch (\Exception $e) {
@@ -109,7 +109,7 @@ public function testConnectionSendDelayedMessagesWithSameContent()
109109
public function testConnectionBelowRedeliverTimeout()
110110
{
111111
// lower redeliver timeout and claim interval
112-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
112+
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), ['delete_after_ack' => true], $this->redis);
113113

114114
$connection->cleanup();
115115
$connection->setup();
@@ -137,7 +137,7 @@ public function testConnectionClaimAndRedeliver()
137137
// lower redeliver timeout and claim interval
138138
$connection = Connection::fromDsn(
139139
getenv('MESSENGER_REDIS_DSN'),
140-
['redeliver_timeout' => 0, 'claim_interval' => 500],
140+
['redeliver_timeout' => 0, 'claim_interval' => 500, 'delete_after_ack' => true],
141141
$this->redis
142142
);
143143

src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisTransportFactoryTest.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ public function testCreateTransport()
4040

4141
$factory = new RedisTransportFactory();
4242
$serializer = $this->createMock(SerializerInterface::class);
43-
$expectedTransport = new RedisTransport(Connection::fromDsn('redis://'.getenv('REDIS_HOST'), ['stream' => 'bar']), $serializer);
43+
$expectedTransport = new RedisTransport(Connection::fromDsn('redis://'.getenv('REDIS_HOST'), ['stream' => 'bar', 'delete_after_ack' => true]), $serializer);
4444

45-
$this->assertEquals($expectedTransport, $factory->createTransport('redis://'.getenv('REDIS_HOST'), ['stream' => 'bar'], $serializer));
45+
$this->assertEquals($expectedTransport, $factory->createTransport('redis://'.getenv('REDIS_HOST'), ['stream' => 'bar', 'delete_after_ack' => true], $serializer));
4646
}
4747

4848
private function skipIfRedisUnavailable()

src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

+2
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ public static function fromDsn(string $dsn, array $redisOptions = [], $redis = n
182182
if (\array_key_exists('delete_after_ack', $redisOptions)) {
183183
$deleteAfterAck = filter_var($redisOptions['delete_after_ack'], \FILTER_VALIDATE_BOOLEAN);
184184
unset($redisOptions['delete_after_ack']);
185+
} else {
186+
trigger_deprecation('symfony/redis-messenger', '5.4', 'Not setting the "delete_after_ack" boolean option explicitly is deprecated, its default value will change to true in 6.0.');
185187
}
186188

187189
$deleteAfterReject = null;

0 commit comments

Comments
 (0)