@@ -33,6 +33,7 @@ class Connection
33
33
'consumer ' => 'consumer ' ,
34
34
'auto_setup ' => true ,
35
35
'delete_after_ack ' => false ,
36
+ 'delete_after_reject ' => true ,
36
37
'stream_max_entries ' => 0 , // any value higher than 0 defines an approximate maximum number of stream entries
37
38
'dbindex ' => 0 ,
38
39
'tls ' => false ,
@@ -51,6 +52,7 @@ class Connection
51
52
private $ nextClaim = 0 ;
52
53
private $ claimInterval ;
53
54
private $ deleteAfterAck ;
55
+ private $ deleteAfterReject ;
54
56
private $ couldHavePendingMessages = true ;
55
57
56
58
public function __construct (array $ configuration , array $ connectionCredentials = [], array $ redisOptions = [], \Redis $ redis = null )
@@ -89,6 +91,7 @@ public function __construct(array $configuration, array $connectionCredentials =
89
91
$ this ->autoSetup = $ configuration ['auto_setup ' ] ?? self ::DEFAULT_OPTIONS ['auto_setup ' ];
90
92
$ this ->maxEntries = $ configuration ['stream_max_entries ' ] ?? self ::DEFAULT_OPTIONS ['stream_max_entries ' ];
91
93
$ this ->deleteAfterAck = $ configuration ['delete_after_ack ' ] ?? self ::DEFAULT_OPTIONS ['delete_after_ack ' ];
94
+ $ this ->deleteAfterReject = $ configuration ['delete_after_reject ' ] ?? self ::DEFAULT_OPTIONS ['delete_after_reject ' ];
92
95
$ this ->redeliverTimeout = ($ configuration ['redeliver_timeout ' ] ?? self ::DEFAULT_OPTIONS ['redeliver_timeout ' ]) * 1000 ;
93
96
$ this ->claimInterval = $ configuration ['claim_interval ' ] ?? self ::DEFAULT_OPTIONS ['claim_interval ' ];
94
97
}
@@ -128,6 +131,12 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
128
131
unset($ redisOptions ['delete_after_ack ' ]);
129
132
}
130
133
134
+ $ deleteAfterReject = null ;
135
+ if (\array_key_exists ('delete_after_reject ' , $ redisOptions )) {
136
+ $ deleteAfterReject = filter_var ($ redisOptions ['delete_after_reject ' ], FILTER_VALIDATE_BOOLEAN );
137
+ unset($ redisOptions ['delete_after_reject ' ]);
138
+ }
139
+
131
140
$ dbIndex = null ;
132
141
if (\array_key_exists ('dbindex ' , $ redisOptions )) {
133
142
$ dbIndex = filter_var ($ redisOptions ['dbindex ' ], \FILTER_VALIDATE_INT );
@@ -159,6 +168,7 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
159
168
'auto_setup ' => $ autoSetup ,
160
169
'stream_max_entries ' => $ maxEntries ,
161
170
'delete_after_ack ' => $ deleteAfterAck ,
171
+ 'delete_after_reject ' => $ deleteAfterReject ,
162
172
'dbindex ' => $ dbIndex ,
163
173
'redeliver_timeout ' => $ redeliverTimeout ,
164
174
'claim_interval ' => $ claimInterval ,
@@ -348,7 +358,9 @@ public function reject(string $id): void
348
358
{
349
359
try {
350
360
$ deleted = $ this ->connection ->xack ($ this ->stream , $ this ->group , [$ id ]);
351
- $ deleted = $ this ->connection ->xdel ($ this ->stream , [$ id ]) && $ deleted ;
361
+ if ($ this ->deleteAfterReject ) {
362
+ $ deleted = $ this ->connection ->xdel ($ this ->stream , [$ id ]) && $ deleted ;
363
+ }
352
364
} catch (\RedisException $ e ) {
353
365
throw new TransportException ($ e ->getMessage (), 0 , $ e );
354
366
}
@@ -426,15 +438,15 @@ public function setup(): void
426
438
$ this ->connection ->clearLastError ();
427
439
}
428
440
429
- if ($ this ->deleteAfterAck ) {
441
+ if ($ this ->deleteAfterAck || $ this -> deleteAfterReject ) {
430
442
$ groups = $ this ->connection ->xinfo ('GROUPS ' , $ this ->stream );
431
443
if (
432
444
// support for Redis extension version 5+
433
445
(\is_array ($ groups ) && 1 < \count ($ groups ))
434
446
// support for Redis extension version 4.x
435
447
|| (\is_string ($ groups ) && substr_count ($ groups , '"name" ' ))
436
448
) {
437
- throw new LogicException (sprintf ('More than one group exists for stream "%s", delete_after_ack can not be enabled as it risks deleting messages before all groups could consume them. ' , $ this ->stream ));
449
+ throw new LogicException (sprintf ('More than one group exists for stream "%s", delete_after_ack and delete_after_reject can not be enabled as it risks deleting messages before all groups could consume them. ' , $ this ->stream ));
438
450
}
439
451
}
440
452
0 commit comments