Skip to content

Commit 786b573

Browse files
committed
feature #60754 [Messenger] Allow SQS to handle its own retry/DLQ (maxbaldanza)
This PR was merged into the 7.4 branch. Discussion ---------- [Messenger] Allow SQS to handle its own retry/DLQ | Q | A | ------------- | --- | Branch? | 7.4 | Bug fix? | no | New feature? | yes | Deprecations? | no | Issues | Fix #45104 | License | MIT As mentioned on the linked issue this has a number of benefits but mainly * The consumer no longer needs to be able to send messages into the queue. * Less chance of message loss Allow SQS to handle retries rather then handling this by Symfony. This allows applications to use the retry strategy from SQS rather then Symfony. The default is for the message to be deleted from SQS at which point Symfony will handle the retry by then adding back in to the queue. If `delete_on_rejection` is set to `false` instead it will change the message visibility of the message on SQS and thus SQS to handle the retry mechanism https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html Commits ------- e9b36d9 [Messenger] Allow SQS to handle it's own retry/DLQ
2 parents d0118a9 + e9b36d9 commit 786b573

File tree

8 files changed

+93
-4
lines changed

8 files changed

+93
-4
lines changed

src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.4
5+
---
6+
7+
* Allow SQS to handle it's own retry/DLQ
8+
49
7.3
510
---
611

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsReceiverTest.php

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
5050
$sqsEnvelop = $this->createSqsEnvelope();
5151
$connection = $this->createMock(Connection::class);
5252
$connection->method('get')->willReturn($sqsEnvelop);
53-
$connection->expects($this->once())->method('delete');
53+
$connection->expects($this->once())->method('reject');
5454

5555
$receiver = new AmazonSqsReceiver($connection, $serializer);
5656
iterator_to_array($receiver->get());
@@ -67,6 +67,17 @@ public function testKeepalive()
6767
$receiver->keepalive(new Envelope(new DummyMessage('foo'), [new AmazonSqsReceivedStamp('123')]), 10);
6868
}
6969

70+
public function testReject()
71+
{
72+
$serializer = $this->createSerializer();
73+
74+
$connection = $this->createMock(Connection::class);
75+
$connection->expects($this->once())->method('reject')->with('123');
76+
77+
$receiver = new AmazonSqsReceiver($connection, $serializer);
78+
$receiver->reject(new Envelope(new DummyMessage('foo'), [new AmazonSqsReceivedStamp('123')]));
79+
}
80+
7081
private function createSqsEnvelope()
7182
{
7283
return [

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportTest.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
2323
use Symfony\Component\Messenger\Envelope;
2424
use Symfony\Component\Messenger\Exception\TransportException;
25+
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
2526
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
2627
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2728
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
@@ -116,6 +117,22 @@ public function testItCanSendAMessageViaTheSender()
116117
$this->assertSame($envelope, $this->transport->send($envelope));
117118
}
118119

120+
public function testItSendsAMessageViaTheSenderWithRedeliveryStamp()
121+
{
122+
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(1)]);
123+
$this->sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
124+
$this->assertSame($envelope, $this->transport->send($envelope));
125+
}
126+
127+
public function testItDoesNotSendRedeliveredMessageWhenNotHandlingRetries()
128+
{
129+
$transport = new AmazonSqsTransport($this->connection, null, $this->receiver, $this->sender, false);
130+
131+
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(1)]);
132+
$this->sender->expects($this->never())->method('send')->with($envelope)->willReturn($envelope);
133+
$this->assertSame($envelope, $transport->send($envelope));
134+
}
135+
119136
public function testItCanSetUpTheConnection()
120137
{
121138
$this->connection->expects($this->once())->method('setup');

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,35 @@ public function testKeepalive()
372372
$connection->keepalive($id);
373373
}
374374

375+
public function testDeleteOnReject()
376+
{
377+
$expectedParams = [
378+
'QueueUrl' => $queueUrl = 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
379+
'ReceiptHandle' => $id = 'abc',
380+
];
381+
382+
$client = $this->createMock(SqsClient::class);
383+
$client->expects($this->once())->method('deleteMessage')->with($expectedParams);
384+
385+
$connection = new Connection([], $client, $queueUrl);
386+
$connection->reject($id);
387+
}
388+
389+
public function testDoNotDeleteOnRejection()
390+
{
391+
$expectedParams = [
392+
'QueueUrl' => $queueUrl = 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
393+
'ReceiptHandle' => $id = 'abc',
394+
'VisibilityTimeout' => $visibilityTimeout = 10,
395+
];
396+
397+
$client = $this->createMock(SqsClient::class);
398+
$client->expects($this->once())->method('changeMessageVisibility')->with($expectedParams);
399+
400+
$connection = new Connection(['delete_on_rejection' => false, 'visibility_timeout' => $visibilityTimeout], $client, $queueUrl);
401+
$connection->reject($id);
402+
}
403+
375404
public function testKeepaliveWithTooSmallTtl()
376405
{
377406
$client = $this->createMock(SqsClient::class);

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public function get(): iterable
5252
'headers' => $sqsEnvelope['headers'],
5353
]);
5454
} catch (MessageDecodingFailedException $exception) {
55-
$this->connection->delete($sqsEnvelope['id']);
55+
$this->connection->reject($sqsEnvelope['id']);
5656

5757
throw $exception;
5858
}
@@ -72,7 +72,7 @@ public function ack(Envelope $envelope): void
7272
public function reject(Envelope $envelope): void
7373
{
7474
try {
75-
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
75+
$this->connection->reject($this->findSqsReceivedStamp($envelope)->getId());
7676
} catch (HttpException $e) {
7777
throw new TransportException($e->getMessage(), 0, $e);
7878
}

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use AsyncAws\Core\Exception\Http\HttpException;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Exception\TransportException;
17+
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
1718
use Symfony\Component\Messenger\Transport\CloseableTransportInterface;
1819
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1920
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
@@ -37,6 +38,7 @@ public function __construct(
3738
?SerializerInterface $serializer = null,
3839
private (ReceiverInterface&MessageCountAwareInterface)|null $receiver = null,
3940
private ?SenderInterface $sender = null,
41+
private bool $handleRetries = true,
4042
) {
4143
$this->serializer = $serializer ?? new PhpSerializer();
4244
}
@@ -71,6 +73,10 @@ public function getMessageCount(): int
7173

7274
public function send(Envelope $envelope): Envelope
7375
{
76+
if (false === $this->handleRetries && $this->isRedelivered($envelope)) {
77+
return $envelope;
78+
}
79+
7480
return $this->getSender()->send($envelope);
7581
}
7682

@@ -106,4 +112,9 @@ private function getSender(): SenderInterface
106112
{
107113
return $this->sender ??= new AmazonSqsSender($this->connection, $this->serializer);
108114
}
115+
116+
private function isRedelivered(Envelope $envelope): bool
117+
{
118+
return null !== $envelope->last(RedeliveryStamp::class);
119+
}
109120
}

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransportFactory.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public function createTransport(#[\SensitiveParameter] string $dsn, array $optio
3232
{
3333
unset($options['transport_name']);
3434

35-
return new AmazonSqsTransport(Connection::fromDsn($dsn, $options, null, $this->logger), $serializer);
35+
return new AmazonSqsTransport(Connection::fromDsn($dsn, $options, null, $this->logger), $serializer, null, null, !($options['delete_on_rejection'] ?? false));
3636
}
3737

3838
public function supports(#[\SensitiveParameter] string $dsn, array $options): bool

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class Connection
3939
'wait_time' => 20,
4040
'poll_timeout' => 0.1,
4141
'visibility_timeout' => null,
42+
'delete_on_rejection' => true,
4243
'auto_setup' => true,
4344
'access_key' => null,
4445
'secret_key' => null,
@@ -101,6 +102,7 @@ public function __destruct()
101102
* * wait_time: long polling duration in seconds (Default: 20)
102103
* * poll_timeout: amount of seconds the transport should wait for new message
103104
* * visibility_timeout: amount of seconds the message won't be visible
105+
* * delete_on_rejection: Whether to delete message on rejection or allow SQS to handle retries. (Default: true).
104106
* * sslmode: Can be "disable" to use http for a custom endpoint
105107
* * auto_setup: Whether the queue should be created automatically during send / get (Default: true)
106108
* * debug: Log all HTTP requests and responses as LoggerInterface::DEBUG (Default: false)
@@ -134,6 +136,7 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option
134136
'wait_time' => (int) $options['wait_time'],
135137
'poll_timeout' => $options['poll_timeout'],
136138
'visibility_timeout' => null !== $options['visibility_timeout'] ? (int) $options['visibility_timeout'] : null,
139+
'delete_on_rejection' => filter_var($options['delete_on_rejection'], \FILTER_VALIDATE_BOOL),
137140
'auto_setup' => filter_var($options['auto_setup'], \FILTER_VALIDATE_BOOL),
138141
'queue_name' => (string) $options['queue_name'],
139142
'queue_attributes' => $options['queue_attributes'],
@@ -312,6 +315,19 @@ public function delete(string $id): void
312315
]);
313316
}
314317

318+
public function reject(string $id): void
319+
{
320+
if ($this->configuration['delete_on_rejection']) {
321+
$this->delete($id);
322+
} else {
323+
$this->client->changeMessageVisibility([
324+
'QueueUrl' => $this->getQueueUrl(),
325+
'ReceiptHandle' => $id,
326+
'VisibilityTimeout' => $this->configuration['visibility_timeout'] ?? 30,
327+
]);
328+
}
329+
}
330+
315331
/**
316332
* @param int|null $seconds the minimum duration the message should be kept alive
317333
*/

0 commit comments

Comments
 (0)