Skip to content

[Messenger] return empty envelopes when RetryableException occurs #32979

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@

namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use Doctrine\DBAL\Driver\PDOException;
use Doctrine\DBAL\Exception\DeadlockException;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
Expand Down Expand Up @@ -68,6 +71,26 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
$receiver->get();
}

public function testOccursRetryableExceptionFromConnection()
{
$serializer = $this->createSerializer();
$connection = $this->createMock(Connection::class);
$driverException = new PDOException(new \PDOException('Deadlock', 40001));
$connection->method('get')->willThrowException(new DeadlockException('Deadlock', $driverException));
$receiver = new DoctrineReceiver($connection, $serializer);
$this->assertSame([], $receiver->get());
$this->assertSame([], $receiver->get());
try {
$receiver->get();
} catch (TransportException $exception) {
// skip, and retry
}
$this->assertSame([], $receiver->get());
$this->assertSame([], $receiver->get());
$this->expectException(TransportException::class);
$receiver->get();
}

public function testAll()
{
$serializer = $this->createSerializer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Doctrine;

use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Exception\RetryableException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
Expand All @@ -30,6 +31,8 @@
*/
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
{
private const MAX_RETRIES = 3;
private $retryingSafetyCounter = 0;
private $connection;
private $serializer;

Expand All @@ -46,6 +49,17 @@ public function get(): iterable
{
try {
$doctrineEnvelope = $this->connection->get();
$this->retryingSafetyCounter = 0; // reset counter
} catch (RetryableException $exception) {
// Do nothing when RetryableException occurs less than "MAX_RETRIES"
// as it will likely be resolved on the next call to get()
// Problem with concurrent consumers and database deadlocks
if (++$this->retryingSafetyCounter >= self::MAX_RETRIES) {
$this->retryingSafetyCounter = 0; // reset counter
throw new TransportException($exception->getMessage(), 0, $exception);
}

return [];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to retry "internally" (i.e. return $this->get()) instead of returning this empty array? Otherwise, in the event of using multiple transports, the worker might start consuming lower priority messages, which would make no sense. Similar with whatever might decorate this receiver... it would "see" an empty array while actually, it was an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not seem better to me. This way we can use the delay that is added after the get which is what we want with a deadlock/timeout problem.

} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/Messenger/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"psr/log": "~1.0"
},
"require-dev": {
"doctrine/dbal": "^2.5",
"doctrine/dbal": "^2.6",
"psr/cache": "~1.0",
"symfony/console": "~3.4|~4.0",
"symfony/debug": "~4.1",
Expand Down