From 38b67e7d7f88f1ba441b3a1099c3812c2a024fb1 Mon Sep 17 00:00:00 2001 From: "Jonathan H. Wage" Date: Wed, 28 Feb 2024 10:17:05 -0700 Subject: [PATCH] [Messenger] Improve deadlock handling on `ack()` and `reject()` --- .../Tests/Transport/ConnectionTest.php | 83 +++++++- .../DoctrinePostgreSqlIntegrationTest.php | 13 ++ ...ctrinePostgreSqlRegularIntegrationTest.php | 14 ++ .../Tests/Transport/DoctrineReceiverTest.php | 182 ++++++++++++++++++ .../Bridge/Doctrine/Transport/Connection.php | 12 +- .../Doctrine/Transport/DoctrineReceiver.php | 40 +++- 6 files changed, 331 insertions(+), 13 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php index 02203af1a4a1..fac7a6e34d27 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php @@ -21,6 +21,7 @@ use Doctrine\DBAL\Platforms\OraclePlatform; use Doctrine\DBAL\Platforms\SQLServer2012Platform; use Doctrine\DBAL\Platforms\SQLServerPlatform; +use Doctrine\DBAL\Query\ForUpdate\ConflictResolutionMode; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Result; use Doctrine\DBAL\Schema\AbstractSchemaManager; @@ -99,6 +100,82 @@ public function testGetWithNoPendingMessageWillReturnNull() $this->assertNull($doctrineEnvelope); } + public function testGetWithSkipLockedWithForUpdateMethod() + { + if (!method_exists(QueryBuilder::class, 'forUpdate')) { + $this->markTestSkipped('This test is for when forUpdate method exists.'); + } + + $queryBuilder = $this->getQueryBuilderMock(); + $driverConnection = $this->getDBALConnectionMock(); + $stmt = $this->getResultMock(false); + + $queryBuilder + ->method('getParameters') + ->willReturn([]); + $queryBuilder + ->method('getParameterTypes') + ->willReturn([]); + $queryBuilder + ->method('forUpdate') + ->with(ConflictResolutionMode::SKIP_LOCKED) + ->willReturn($queryBuilder); + $queryBuilder + ->method('getSQL') + ->willReturn('SELECT FOR UPDATE SKIP LOCKED'); + $driverConnection->expects($this->once()) + ->method('createQueryBuilder') + ->willReturn($queryBuilder); + $driverConnection->expects($this->never()) + ->method('update'); + $driverConnection + ->method('executeQuery') + ->with($this->callback(function ($sql) { + return str_contains($sql, 'SKIP LOCKED'); + })) + ->willReturn($stmt); + + $connection = new Connection(['skip_locked' => true], $driverConnection); + $doctrineEnvelope = $connection->get(); + $this->assertNull($doctrineEnvelope); + } + + public function testGetWithSkipLockedWithoutForUpdateMethod() + { + if (method_exists(QueryBuilder::class, 'forUpdate')) { + $this->markTestSkipped('This test is for when forUpdate method does not exist.'); + } + + $queryBuilder = $this->getQueryBuilderMock(); + $driverConnection = $this->getDBALConnectionMock(); + $stmt = $this->getResultMock(false); + + $queryBuilder + ->method('getParameters') + ->willReturn([]); + $queryBuilder + ->method('getParameterTypes') + ->willReturn([]); + $queryBuilder + ->method('getSQL') + ->willReturn('SELECT'); + $driverConnection->expects($this->once()) + ->method('createQueryBuilder') + ->willReturn($queryBuilder); + $driverConnection->expects($this->never()) + ->method('update'); + $driverConnection + ->method('executeQuery') + ->with($this->callback(function ($sql) { + return str_contains($sql, 'SKIP LOCKED'); + })) + ->willReturn($stmt); + + $connection = new Connection(['skip_locked' => true], $driverConnection); + $doctrineEnvelope = $connection->get(); + $this->assertNull($doctrineEnvelope); + } + public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() { $this->expectException(TransportException::class); @@ -507,20 +584,20 @@ class_exists(MySQLPlatform::class) ? new MySQLPlatform() : new MySQL57Platform() yield 'SQL Server' => [ class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::class) ? new SQLServerPlatform() : new SQLServer2012Platform(), - 'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ', + 'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK, READPAST) WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ', ]; if (!class_exists(MySQL57Platform::class)) { // DBAL >= 4 yield 'Oracle' => [ new OraclePlatform(), - 'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC FETCH NEXT 1 ROWS ONLY) FOR UPDATE', + 'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC FETCH NEXT 1 ROWS ONLY) FOR UPDATE SKIP LOCKED', ]; } else { // DBAL < 4 yield 'Oracle' => [ new OraclePlatform(), - 'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE', + 'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE SKIP LOCKED', ]; } } diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlIntegrationTest.php index 8f9fb31499ce..89b932143a1e 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlIntegrationTest.php @@ -66,6 +66,19 @@ public function testPostgreSqlConnectionSendAndGet() $this->assertNull($this->connection->get()); } + public function testSkipLocked() + { + $connection = new PostgreSqlConnection(['table_name' => 'queue_table', 'skip_locked' => true], $this->driverConnection); + + $connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); + + $encoded = $connection->get(); + $this->assertEquals('{"message": "Hi"}', $encoded['body']); + $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); + + $this->assertNull($connection->get()); + } + private function createSchemaManager(): AbstractSchemaManager { return method_exists($this->driverConnection, 'createSchemaManager') diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlRegularIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlRegularIntegrationTest.php index c8abedee48f6..f462d4599a0b 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlRegularIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlRegularIntegrationTest.php @@ -57,6 +57,20 @@ public function testSendAndGetWithAutoSetupEnabledAndSetupAlready() $this->assertNull($this->connection->get()); } + public function testSendAndGetWithSkipLockedEnabled() + { + $connection = new Connection(['table_name' => 'queue_table', 'skip_locked' => true], $this->driverConnection); + $connection->setup(); + + $connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); + + $encoded = $connection->get(); + $this->assertSame('{"message": "Hi"}', $encoded['body']); + $this->assertSame(['type' => DummyMessage::class], $encoded['headers']); + + $this->assertNull($this->connection->get()); + } + protected function setUp(): void { if (!$host = getenv('POSTGRES_HOST')) { diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php index 43a0772371a9..36ee1454703a 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php @@ -128,6 +128,188 @@ public function testFind() $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); } + public function testAck() + { + $serializer = $this->createSerializer(); + $connection = $this->createMock(Connection::class); + + $envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]); + $receiver = new DoctrineReceiver($connection, $serializer); + + $connection + ->expects($this->once()) + ->method('ack') + ->with('1') + ->willReturn(true); + + $receiver->ack($envelope); + } + + public function testAckThrowsRetryableException() + { + $serializer = $this->createSerializer(); + $connection = $this->createMock(Connection::class); + + $envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]); + $receiver = new DoctrineReceiver($connection, $serializer); + + $driverException = class_exists(Exception::class) ? Exception::new(new \PDOException('Deadlock', 40001)) : new PDOException(new \PDOException('Deadlock', 40001)); + if (!class_exists(Version::class)) { + // This is doctrine/dbal 3.x + $deadlockException = new DeadlockException($driverException, null); + } else { + $deadlockException = new DeadlockException('Deadlock', $driverException); + } + + $connection + ->expects($this->exactly(2)) + ->method('ack') + ->with('1') + ->willReturnOnConsecutiveCalls( + $this->throwException($deadlockException), + true, + ); + + $receiver->ack($envelope); + } + + public function testAckThrowsRetryableExceptionAndRetriesFail() + { + $serializer = $this->createSerializer(); + $connection = $this->createMock(Connection::class); + + $envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]); + $receiver = new DoctrineReceiver($connection, $serializer); + + $driverException = class_exists(Exception::class) ? Exception::new(new \PDOException('Deadlock', 40001)) : new PDOException(new \PDOException('Deadlock', 40001)); + if (!class_exists(Version::class)) { + // This is doctrine/dbal 3.x + $deadlockException = new DeadlockException($driverException, null); + } else { + $deadlockException = new DeadlockException('Deadlock', $driverException); + } + + $connection + ->expects($this->exactly(4)) + ->method('ack') + ->with('1') + ->willThrowException($deadlockException); + + self::expectException(TransportException::class); + $receiver->ack($envelope); + } + + public function testAckThrowsException() + { + $serializer = $this->createSerializer(); + $connection = $this->createMock(Connection::class); + + $envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]); + $receiver = new DoctrineReceiver($connection, $serializer); + + $exception = new \RuntimeException(); + + $connection + ->expects($this->once()) + ->method('ack') + ->with('1') + ->willThrowException($exception); + + self::expectException($exception::class); + $receiver->ack($envelope); + } + + public function testReject() + { + $serializer = $this->createSerializer(); + $connection = $this->createMock(Connection::class); + + $envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]); + $receiver = new DoctrineReceiver($connection, $serializer); + + $connection + ->expects($this->once()) + ->method('reject') + ->with('1') + ->willReturn(true); + + $receiver->reject($envelope); + } + + public function testRejectThrowsRetryableException() + { + $serializer = $this->createSerializer(); + $connection = $this->createMock(Connection::class); + + $envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]); + $receiver = new DoctrineReceiver($connection, $serializer); + + $driverException = class_exists(Exception::class) ? Exception::new(new \PDOException('Deadlock', 40001)) : new PDOException(new \PDOException('Deadlock', 40001)); + if (!class_exists(Version::class)) { + // This is doctrine/dbal 3.x + $deadlockException = new DeadlockException($driverException, null); + } else { + $deadlockException = new DeadlockException('Deadlock', $driverException); + } + + $connection + ->expects($this->exactly(2)) + ->method('reject') + ->with('1') + ->willReturnOnConsecutiveCalls( + $this->throwException($deadlockException), + true, + ); + + $receiver->reject($envelope); + } + + public function testRejectThrowsRetryableExceptionAndRetriesFail() + { + $serializer = $this->createSerializer(); + $connection = $this->createMock(Connection::class); + + $envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]); + $receiver = new DoctrineReceiver($connection, $serializer); + + $driverException = class_exists(Exception::class) ? Exception::new(new \PDOException('Deadlock', 40001)) : new PDOException(new \PDOException('Deadlock', 40001)); + if (!class_exists(Version::class)) { + // This is doctrine/dbal 3.x + $deadlockException = new DeadlockException($driverException, null); + } else { + $deadlockException = new DeadlockException('Deadlock', $driverException); + } + + $connection + ->expects($this->exactly(4)) + ->method('reject') + ->with('1') + ->willThrowException($deadlockException); + + self::expectException(TransportException::class); + $receiver->reject($envelope); + } + + public function testRejectThrowsException() + { + $serializer = $this->createSerializer(); + $connection = $this->createMock(Connection::class); + + $envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]); + $receiver = new DoctrineReceiver($connection, $serializer); + + $exception = new \RuntimeException(); + + $connection + ->expects($this->once()) + ->method('reject') + ->with('1') + ->willThrowException($exception); + + self::expectException($exception::class); + $receiver->reject($envelope); + } + private function createDoctrineEnvelope(): array { return [ diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php index e3030d3a1d55..f7cf63a9b830 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php @@ -21,6 +21,7 @@ use Doctrine\DBAL\Platforms\MySQLPlatform; use Doctrine\DBAL\Platforms\OraclePlatform; use Doctrine\DBAL\Platforms\PostgreSQLPlatform; +use Doctrine\DBAL\Query\ForUpdate\ConflictResolutionMode; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Result; use Doctrine\DBAL\Schema\AbstractSchemaManager; @@ -186,15 +187,22 @@ public function get(): ?array ->setParameters($query->getParameters(), $query->getParameterTypes()); if (method_exists(QueryBuilder::class, 'forUpdate')) { - $query->forUpdate(); + $query->forUpdate(ConflictResolutionMode::SKIP_LOCKED); } $sql = $query->getSQL(); } elseif (method_exists(QueryBuilder::class, 'forUpdate')) { - $query->forUpdate(); + $query->forUpdate(ConflictResolutionMode::SKIP_LOCKED); try { $sql = $query->getSQL(); } catch (DBALException $e) { + // If SKIP_LOCKED is not supported, fallback to without SKIP_LOCKED + $query->forUpdate(); + + try { + $sql = $query->getSQL(); + } catch (DBALException $e) { + } } } elseif (preg_match('/FROM (.+) WHERE/', (string) $sql, $matches)) { $fromClause = $matches[1]; diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineReceiver.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineReceiver.php index 2f6e4a5a823a..20bd61151c44 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineReceiver.php @@ -67,20 +67,16 @@ public function get(): iterable public function ack(Envelope $envelope): void { - try { + $this->withRetryableExceptionRetry(function() use ($envelope) { $this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId()); - } catch (DBALException $exception) { - throw new TransportException($exception->getMessage(), 0, $exception); - } + }); } public function reject(Envelope $envelope): void { - try { + $this->withRetryableExceptionRetry(function() use ($envelope) { $this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId()); - } catch (DBALException $exception) { - throw new TransportException($exception->getMessage(), 0, $exception); - } + }); } public function getMessageCount(): int @@ -150,4 +146,32 @@ private function createEnvelopeFromData(array $data): Envelope new TransportMessageIdStamp($data['id']) ); } + + private function withRetryableExceptionRetry(callable $callable): void + { + $delay = 100; + $multiplier = 2; + $jitter = 0.1; + $retries = 0; + + retry: + try { + $callable(); + } catch (RetryableException $exception) { + if (++$retries <= self::MAX_RETRIES) { + $delay *= $multiplier; + + $randomness = (int) ($delay * $jitter); + $delay += random_int(-$randomness, +$randomness); + + usleep($delay * 1000); + + goto retry; + } + + throw new TransportException($exception->getMessage(), 0, $exception); + } catch (DBALException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + } }