Skip to content

[Messenger] Improve deadlock handling on ack() and reject() #54105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Doctrine\DBAL\Platforms\OraclePlatform;
use Doctrine\DBAL\Platforms\SQLServer2012Platform;
use Doctrine\DBAL\Platforms\SQLServerPlatform;
use Doctrine\DBAL\Query\ForUpdate\ConflictResolutionMode;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
Expand Down Expand Up @@ -99,6 +100,82 @@ public function testGetWithNoPendingMessageWillReturnNull()
$this->assertNull($doctrineEnvelope);
}

public function testGetWithSkipLockedWithForUpdateMethod()
{
if (!method_exists(QueryBuilder::class, 'forUpdate')) {
$this->markTestSkipped('This test is for when forUpdate method exists.');
}

$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();
$stmt = $this->getResultMock(false);

$queryBuilder
->method('getParameters')
->willReturn([]);
$queryBuilder
->method('getParameterTypes')
->willReturn([]);
$queryBuilder
->method('forUpdate')
->with(ConflictResolutionMode::SKIP_LOCKED)
->willReturn($queryBuilder);
$queryBuilder
->method('getSQL')
->willReturn('SELECT FOR UPDATE SKIP LOCKED');
$driverConnection->expects($this->once())
->method('createQueryBuilder')
->willReturn($queryBuilder);
$driverConnection->expects($this->never())
->method('update');
$driverConnection
->method('executeQuery')
->with($this->callback(function ($sql) {
return str_contains($sql, 'SKIP LOCKED');
}))
->willReturn($stmt);

$connection = new Connection(['skip_locked' => true], $driverConnection);
$doctrineEnvelope = $connection->get();
$this->assertNull($doctrineEnvelope);
}

public function testGetWithSkipLockedWithoutForUpdateMethod()
{
if (method_exists(QueryBuilder::class, 'forUpdate')) {
$this->markTestSkipped('This test is for when forUpdate method does not exist.');
}

$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();
$stmt = $this->getResultMock(false);

$queryBuilder
->method('getParameters')
->willReturn([]);
$queryBuilder
->method('getParameterTypes')
->willReturn([]);
$queryBuilder
->method('getSQL')
->willReturn('SELECT');
$driverConnection->expects($this->once())
->method('createQueryBuilder')
->willReturn($queryBuilder);
$driverConnection->expects($this->never())
->method('update');
$driverConnection
->method('executeQuery')
->with($this->callback(function ($sql) {
return str_contains($sql, 'SKIP LOCKED');
}))
->willReturn($stmt);

$connection = new Connection(['skip_locked' => true], $driverConnection);
$doctrineEnvelope = $connection->get();
$this->assertNull($doctrineEnvelope);
}

public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
{
$this->expectException(TransportException::class);
Expand Down Expand Up @@ -507,20 +584,20 @@ class_exists(MySQLPlatform::class) ? new MySQLPlatform() : new MySQL57Platform()

yield 'SQL Server' => [
class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::class) ? new SQLServerPlatform() : new SQLServer2012Platform(),
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK, READPAST) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
];

if (!class_exists(MySQL57Platform::class)) {
// DBAL >= 4
yield 'Oracle' => [
new OraclePlatform(),
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC FETCH NEXT 1 ROWS ONLY) FOR UPDATE',
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC FETCH NEXT 1 ROWS ONLY) FOR UPDATE SKIP LOCKED',
];
} else {
// DBAL < 4
yield 'Oracle' => [
new OraclePlatform(),
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE',
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE SKIP LOCKED',
];
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ public function testPostgreSqlConnectionSendAndGet()
$this->assertNull($this->connection->get());
}

public function testSkipLocked()
{
$connection = new PostgreSqlConnection(['table_name' => 'queue_table', 'skip_locked' => true], $this->driverConnection);

$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);

$encoded = $connection->get();
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);

$this->assertNull($connection->get());
}

private function createSchemaManager(): AbstractSchemaManager
{
return method_exists($this->driverConnection, 'createSchemaManager')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ public function testSendAndGetWithAutoSetupEnabledAndSetupAlready()
$this->assertNull($this->connection->get());
}

public function testSendAndGetWithSkipLockedEnabled()
{
$connection = new Connection(['table_name' => 'queue_table', 'skip_locked' => true], $this->driverConnection);
$connection->setup();

$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);

$encoded = $connection->get();
$this->assertSame('{"message": "Hi"}', $encoded['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);

$this->assertNull($this->connection->get());
}

protected function setUp(): void
{
if (!$host = getenv('POSTGRES_HOST')) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,188 @@ public function testFind()
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());
}

public function testAck()
{
$serializer = $this->createSerializer();
$connection = $this->createMock(Connection::class);

$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
$receiver = new DoctrineReceiver($connection, $serializer);

$connection
->expects($this->once())
->method('ack')
->with('1')
->willReturn(true);

$receiver->ack($envelope);
}

public function testAckThrowsRetryableException()
{
$serializer = $this->createSerializer();
$connection = $this->createMock(Connection::class);

$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
$receiver = new DoctrineReceiver($connection, $serializer);

$driverException = class_exists(Exception::class) ? Exception::new(new \PDOException('Deadlock', 40001)) : new PDOException(new \PDOException('Deadlock', 40001));
if (!class_exists(Version::class)) {
// This is doctrine/dbal 3.x
$deadlockException = new DeadlockException($driverException, null);
} else {
$deadlockException = new DeadlockException('Deadlock', $driverException);
}

$connection
->expects($this->exactly(2))
->method('ack')
->with('1')
->willReturnOnConsecutiveCalls(
$this->throwException($deadlockException),
true,
);

$receiver->ack($envelope);
}

public function testAckThrowsRetryableExceptionAndRetriesFail()
{
$serializer = $this->createSerializer();
$connection = $this->createMock(Connection::class);

$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
$receiver = new DoctrineReceiver($connection, $serializer);

$driverException = class_exists(Exception::class) ? Exception::new(new \PDOException('Deadlock', 40001)) : new PDOException(new \PDOException('Deadlock', 40001));
if (!class_exists(Version::class)) {
// This is doctrine/dbal 3.x
$deadlockException = new DeadlockException($driverException, null);
} else {
$deadlockException = new DeadlockException('Deadlock', $driverException);
}

$connection
->expects($this->exactly(4))
->method('ack')
->with('1')
->willThrowException($deadlockException);

self::expectException(TransportException::class);
$receiver->ack($envelope);
}

public function testAckThrowsException()
{
$serializer = $this->createSerializer();
$connection = $this->createMock(Connection::class);

$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
$receiver = new DoctrineReceiver($connection, $serializer);

$exception = new \RuntimeException();

$connection
->expects($this->once())
->method('ack')
->with('1')
->willThrowException($exception);

self::expectException($exception::class);
$receiver->ack($envelope);
}

public function testReject()
{
$serializer = $this->createSerializer();
$connection = $this->createMock(Connection::class);

$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
$receiver = new DoctrineReceiver($connection, $serializer);

$connection
->expects($this->once())
->method('reject')
->with('1')
->willReturn(true);

$receiver->reject($envelope);
}

public function testRejectThrowsRetryableException()
{
$serializer = $this->createSerializer();
$connection = $this->createMock(Connection::class);

$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
$receiver = new DoctrineReceiver($connection, $serializer);

$driverException = class_exists(Exception::class) ? Exception::new(new \PDOException('Deadlock', 40001)) : new PDOException(new \PDOException('Deadlock', 40001));
if (!class_exists(Version::class)) {
// This is doctrine/dbal 3.x
$deadlockException = new DeadlockException($driverException, null);
} else {
$deadlockException = new DeadlockException('Deadlock', $driverException);
}

$connection
->expects($this->exactly(2))
->method('reject')
->with('1')
->willReturnOnConsecutiveCalls(
$this->throwException($deadlockException),
true,
);

$receiver->reject($envelope);
}

public function testRejectThrowsRetryableExceptionAndRetriesFail()
{
$serializer = $this->createSerializer();
$connection = $this->createMock(Connection::class);

$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
$receiver = new DoctrineReceiver($connection, $serializer);

$driverException = class_exists(Exception::class) ? Exception::new(new \PDOException('Deadlock', 40001)) : new PDOException(new \PDOException('Deadlock', 40001));
if (!class_exists(Version::class)) {
// This is doctrine/dbal 3.x
$deadlockException = new DeadlockException($driverException, null);
} else {
$deadlockException = new DeadlockException('Deadlock', $driverException);
}

$connection
->expects($this->exactly(4))
->method('reject')
->with('1')
->willThrowException($deadlockException);

self::expectException(TransportException::class);
$receiver->reject($envelope);
}

public function testRejectThrowsException()
{
$serializer = $this->createSerializer();
$connection = $this->createMock(Connection::class);

$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
$receiver = new DoctrineReceiver($connection, $serializer);

$exception = new \RuntimeException();

$connection
->expects($this->once())
->method('reject')
->with('1')
->willThrowException($exception);

self::expectException($exception::class);
$receiver->reject($envelope);
}

private function createDoctrineEnvelope(): array
{
return [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Doctrine\DBAL\Platforms\OraclePlatform;
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Doctrine\DBAL\Query\ForUpdate\ConflictResolutionMode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as i can see, this was added in dbal 3.8, but we support versions prior to that
https://github.com/symfony/doctrine-messenger/blob/6.4/composer.json#L20C10-L20C23

Copy link
Contributor

@bendavies bendavies Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may have to manually construct the sql, as is done below with the method_exists(QueryBuilder::class, 'forUpdate') checks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will see. The sql for each platform is not as simple as just appending SKIP LOCKED. And in fact now that you point it out, I suspect this code might be broken for other platforms when you are using older versions of doctrine/dbal where the forUpdate() abstraction does not exist yet.

use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
Expand Down Expand Up @@ -186,15 +187,22 @@ public function get(): ?array
->setParameters($query->getParameters(), $query->getParameterTypes());

if (method_exists(QueryBuilder::class, 'forUpdate')) {
$query->forUpdate();
$query->forUpdate(ConflictResolutionMode::SKIP_LOCKED);
}

$sql = $query->getSQL();
} elseif (method_exists(QueryBuilder::class, 'forUpdate')) {
$query->forUpdate();
$query->forUpdate(ConflictResolutionMode::SKIP_LOCKED);
try {
$sql = $query->getSQL();
} catch (DBALException $e) {
// If SKIP_LOCKED is not supported, fallback to without SKIP_LOCKED
$query->forUpdate();

try {
$sql = $query->getSQL();
} catch (DBALException $e) {
}
}
} elseif (preg_match('/FROM (.+) WHERE/', (string) $sql, $matches)) {
$fromClause = $matches[1];
Expand Down
Loading
Loading