Skip to content

Commit 88fce4e

Browse files
hgracaqshurick
andcommitted
Add "SKIP LOCKED" to the query that retrieves messages
The current `SELECT ... FOR UPDATE` retrieves rows, using the index to find and lock the retrieved rows. This means that when we have more than one consumer, while one consumer is locking those rows, the whole index is locked thus other queries (consumers) will be put on hold. While with a small table this might not be noticeable, as the table grows the meddling with the index becomes slower and the other consumers have to wait more time, eventually making the MQ inoperable. The `SKIP LOCKED` addition will allow other consumers to query the table and get messages immediately, ignoring the rows that other consumers are locking. Co-authored-by: Alexander Malyk <shu.rick.ifmo@gmail.com>
1 parent b885361 commit 88fce4e

File tree

2 files changed

+37
-43
lines changed

2 files changed

+37
-43
lines changed

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php

+8-6
Original file line numberDiff line numberDiff line change
@@ -419,10 +419,12 @@ class_exists(MySQLPlatform::class) ? new MySQLPlatform() : new MySQL57Platform()
419419
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
420420
];
421421

422-
yield 'Postgres10+' => [
423-
new PostgreSQL100Platform(),
424-
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED',
425-
];
422+
if (class_exists(PostgreSQL100Platform::class)) {
423+
yield 'Postgres10' => [
424+
new PostgreSQL100Platform(),
425+
'SELECT m.* FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED',
426+
];
427+
}
426428

427429
yield 'SQL Server' => [
428430
class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::class) ? new SQLServerPlatform() : new SQLServer2012Platform(),
@@ -433,13 +435,13 @@ class_exists(SQLServerPlatform::class) && !class_exists(SQLServer2012Platform::c
433435
// DBAL >= 4
434436
yield 'Oracle' => [
435437
new OraclePlatform(),
436-
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC FETCH NEXT 1 ROWS ONLY) FOR UPDATE',
438+
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC FETCH NEXT 1 ROWS ONLY) FOR UPDATE SKIP LOCKED',
437439
];
438440
} else {
439441
// DBAL < 4
440442
yield 'Oracle' => [
441443
new OraclePlatform(),
442-
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE',
444+
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.queue_name = ?) AND (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE SKIP LOCKED',
443445
];
444446
}
445447
}

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

+29-37
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,8 @@
1616
use Doctrine\DBAL\Exception as DBALException;
1717
use Doctrine\DBAL\Exception\TableNotFoundException;
1818
use Doctrine\DBAL\LockMode;
19-
use Doctrine\DBAL\Platforms\MariaDb1060Platform;
20-
use Doctrine\DBAL\Platforms\MariaDBPlatform;
21-
use Doctrine\DBAL\Platforms\MySQL57Platform;
22-
use Doctrine\DBAL\Platforms\MySQL80Platform;
2319
use Doctrine\DBAL\Platforms\AbstractMySQLPlatform;
24-
use Doctrine\DBAL\Platforms\MySQLPlatform;
2520
use Doctrine\DBAL\Platforms\OraclePlatform;
26-
use Doctrine\DBAL\Platforms\PostgreSQL100Platform;
27-
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
28-
use Doctrine\DBAL\Platforms\SQLServerPlatform;
2921
use Doctrine\DBAL\Query\ForUpdate\ConflictResolutionMode;
3022
use Doctrine\DBAL\Query\QueryBuilder;
3123
use Doctrine\DBAL\Result;
@@ -189,28 +181,22 @@ public function get(): ?array
189181
->where('w.id IN ('.str_replace('SELECT a.* FROM', 'SELECT a.id FROM', $sql).')')
190182
->setParameters($query->getParameters());
191183

192-
if (method_exists(QueryBuilder::class, 'forUpdate')) {
193-
$query->forUpdate();
194-
}
195-
196184
$sql = $query->getSQL();
197-
} elseif (method_exists(QueryBuilder::class, 'forUpdate')) {
198-
$query->forUpdate(ConflictResolutionMode::SKIP_LOCKED);
199-
try {
200-
$sql = $query->getSQL();
201-
} catch (DBALException $e) {
202-
}
203-
} elseif (preg_match('/FROM (.+) WHERE/', (string) $sql, $matches)) {
204-
$fromClause = $matches[1];
205-
$sql = str_replace(
206-
sprintf('FROM %s WHERE', $fromClause),
207-
sprintf('FROM %s WHERE', $this->driverConnection->getDatabasePlatform()->appendLockHint($fromClause, LockMode::PESSIMISTIC_WRITE)),
208-
$sql
209-
);
210185
}
211186

212-
// use SELECT ... FOR UPDATE to lock table
213-
if (!method_exists(QueryBuilder::class, 'forUpdate') || $this->isDbal38OrHigherAndOldPlatform()) {
187+
if (method_exists(QueryBuilder::class, 'forUpdate')) {
188+
$sql = $this->addLockMode($query, $sql);
189+
} else {
190+
if (preg_match('/FROM (.+) WHERE/', (string) $sql, $matches)) {
191+
$fromClause = $matches[1];
192+
$sql = str_replace(
193+
sprintf('FROM %s WHERE', $fromClause),
194+
sprintf('FROM %s WHERE', $this->driverConnection->getDatabasePlatform()->appendLockHint($fromClause, LockMode::PESSIMISTIC_WRITE)),
195+
$sql
196+
);
197+
}
198+
199+
// use SELECT ... FOR UPDATE to lock table
214200
$sql .= ' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL();
215201
}
216202

@@ -505,17 +491,23 @@ private function updateSchema(): void
505491
}
506492
}
507493

508-
/**
509-
* @throws DBALException
510-
*/
511-
public function isDbal38OrHigherAndOldPlatform(): bool
494+
private function addLockMode(QueryBuilder $query, string $sql): string
512495
{
513-
$platform = $this->driverConnection->getDatabasePlatform();
496+
$query->forUpdate(ConflictResolutionMode::SKIP_LOCKED);
497+
try {
498+
return $query->getSQL();
499+
} catch (DBALException) {
500+
return $this->fallBackToForUpdate($query, $sql);
501+
}
502+
}
514503

515-
return method_exists(QueryBuilder::class, 'forUpdate')
516-
&& method_exists($platform::class, 'getWriteLockSQL')
517-
&& (($platform instanceof MySQLPlatform && !$platform instanceof MySQL80Platform && !$platform instanceof MariaDBPlatform)
518-
|| ($platform instanceof MariaDBPlatform && !$platform instanceof MariaDb1060Platform)
519-
|| ($platform instanceof PostgreSQLPlatform && !$platform instanceof PostgreSQL100Platform));
504+
private function fallBackToForUpdate(QueryBuilder $query, string $sql): string
505+
{
506+
$query->forUpdate();
507+
try {
508+
return $query->getSQL();
509+
} catch (DBALException) {
510+
return $sql;
511+
}
520512
}
521513
}

0 commit comments

Comments
 (0)