Skip to content

[Lock] Split PdoStore into DoctrineDbalStore #43332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions UPGRADE-5.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ HttpFoundation

* Mark `Request::get()` internal, use explicit input sources instead

Lock
----

* Deprecate usage of `PdoStore` with a `Doctrine\DBAL\Connection` or a DBAL url, use the new `DoctrineDbalStore` instead
* Deprecate usage of `PostgreSqlStore` with a `Doctrine\DBAL\Connection` or a DBAL url, use the new `DoctrineDbalPostgreSqlStore` instead

Messenger
---------

Expand Down
2 changes: 2 additions & 0 deletions UPGRADE-6.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ Lock

* Removed the `NotSupportedException`. It shouldn't be thrown anymore.
* Removed the `RetryTillSaveStore`. Logic has been moved in `Lock` and is not needed anymore.
* Removed usage of `PdoStore` with a `Doctrine\DBAL\Connection` or a DBAL url, use the new `DoctrineDbalStore` instead
* Removed usage of `PostgreSqlStore` with a `Doctrine\DBAL\Connection` or a DBAL url, use the new `DoctrineDbalPostgreSqlStore` instead

Mailer
------
Expand Down
8 changes: 8 additions & 0 deletions src/Symfony/Component/Lock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
CHANGELOG
=========

5.4.0
-----

* added `DoctrineDbalStore` identical to `PdoStore` for `Doctrine\DBAL\Connection` or DBAL url
* deprecated usage of `PdoStore` with `Doctrine\DBAL\Connection` or DBAL url
* added `DoctrineDbalPostgreSqlStore` identical to `PdoPostgreSqlStore` for `Doctrine\DBAL\Connection` or DBAL url
* deprecated usage of `PdoPostgreSqlStore` with `Doctrine\DBAL\Connection` or DBAL url

5.2.0
-----

Expand Down
75 changes: 75 additions & 0 deletions src/Symfony/Component/Lock/Store/DatabaseTableTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Lock\Store;

use Symfony\Component\Lock\Exception\InvalidArgumentException;
use Symfony\Component\Lock\Exception\InvalidTtlException;
use Symfony\Component\Lock\Key;

/**
* @internal
*/
trait DatabaseTableTrait
{
private $table = 'lock_keys';
private $idCol = 'key_id';
private $tokenCol = 'key_token';
private $expirationCol = 'key_expiration';
private $gcProbability;
private $initialTtl;

private function init(array $options, float $gcProbability, int $initialTtl)
{
if ($gcProbability < 0 || $gcProbability > 1) {
throw new InvalidArgumentException(sprintf('"%s" requires gcProbability between 0 and 1, "%f" given.', __METHOD__, $gcProbability));
}
if ($initialTtl < 1) {
throw new InvalidTtlException(sprintf('"%s()" expects a strictly positive TTL, "%d" given.', __METHOD__, $initialTtl));
}

$this->table = $options['db_table'] ?? $this->table;
$this->idCol = $options['db_id_col'] ?? $this->idCol;
$this->tokenCol = $options['db_token_col'] ?? $this->tokenCol;
$this->expirationCol = $options['db_expiration_col'] ?? $this->expirationCol;

$this->gcProbability = $gcProbability;
$this->initialTtl = $initialTtl;
}

/**
* Returns a hashed version of the key.
*/
private function getHashedKey(Key $key): string
{
return hash('sha256', (string) $key);
}

private function getUniqueToken(Key $key): string
{
if (!$key->hasState(__CLASS__)) {
$token = base64_encode(random_bytes(32));
$key->setState(__CLASS__, $token);
}

return $key->getState(__CLASS__);
}

/**
* Prune the table randomly, based on GC probability.
*/
private function randomlyPrune(): void
{
if ($this->gcProbability > 0 && (1.0 === $this->gcProbability || (random_int(0, \PHP_INT_MAX) / \PHP_INT_MAX) <= $this->gcProbability)) {
$this->prune();
}
}
}
235 changes: 235 additions & 0 deletions src/Symfony/Component/Lock/Store/DoctrineDbalPostgreSqlStore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Lock\Store;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Symfony\Component\Lock\BlockingSharedLockStoreInterface;
use Symfony\Component\Lock\BlockingStoreInterface;
use Symfony\Component\Lock\Exception\InvalidArgumentException;
use Symfony\Component\Lock\Exception\LockConflictedException;
use Symfony\Component\Lock\Key;
use Symfony\Component\Lock\SharedLockStoreInterface;

/**
* DoctrineDbalPostgreSqlStore is a PersistingStoreInterface implementation using
* PostgreSql advisory locks with a Doctrine DBAL Connection.
*
* @author Jérémy Derussé <jeremy@derusse.com>
*/
class DoctrineDbalPostgreSqlStore implements BlockingSharedLockStoreInterface, BlockingStoreInterface
{
private $conn;
private static $storeRegistry = [];

/**
* You can either pass an existing database connection a Doctrine DBAL Connection
* or a URL that will be used to connect to the database.
*
* @param Connection|string $connOrUrl A Connection instance or Doctrine URL
*
* @throws InvalidArgumentException When first argument is not Connection nor string
*/
public function __construct($connOrUrl)
{
if ($connOrUrl instanceof Connection) {
if (!$connOrUrl->getDatabasePlatform() instanceof PostgreSQLPlatform) {
throw new InvalidArgumentException(sprintf('The adapter "%s" does not support the "%s" platform.', __CLASS__, \get_class($connOrUrl->getDatabasePlatform())));
}
$this->conn = $connOrUrl;
} elseif (\is_string($connOrUrl)) {
if (!class_exists(DriverManager::class)) {
throw new InvalidArgumentException(sprintf('Failed to parse the DSN "%s". Try running "composer require doctrine/dbal".', $connOrUrl));
}
$this->conn = DriverManager::getConnection(['url' => $this->filterDsn($connOrUrl)]);
} else {
throw new \TypeError(sprintf('Argument 1 passed to "%s()" must be "%s" or string, "%s" given.', Connection::class, __METHOD__, get_debug_type($connOrUrl)));
}
}

public function save(Key $key)
{
// prevent concurrency within the same connection
$this->getInternalStore()->save($key);

$sql = 'SELECT pg_try_advisory_lock(:key)';
$result = $this->conn->executeQuery($sql, [
'key' => $this->getHashedKey($key),
]);

// Check if lock is acquired
if (true === $result->fetchOne()) {
$key->markUnserializable();
// release sharedLock in case of promotion
$this->unlockShared($key);

return;
}

throw new LockConflictedException();
}

public function saveRead(Key $key)
{
// prevent concurrency within the same connection
$this->getInternalStore()->saveRead($key);

$sql = 'SELECT pg_try_advisory_lock_shared(:key)';
$result = $this->conn->executeQuery($sql, [
'key' => $this->getHashedKey($key),
]);

// Check if lock is acquired
if (true === $result->fetchOne()) {
$key->markUnserializable();
// release lock in case of demotion
$this->unlock($key);

return;
}

throw new LockConflictedException();
}

public function putOffExpiration(Key $key, float $ttl)
{
// postgresql locks forever.
// check if lock still exists
if (!$this->exists($key)) {
throw new LockConflictedException();
}
}

public function delete(Key $key)
{
// Prevent deleting locks own by an other key in the same connection
if (!$this->exists($key)) {
return;
}

$this->unlock($key);

// Prevent deleting Readlocks own by current key AND an other key in the same connection
$store = $this->getInternalStore();
try {
// If lock acquired = there is no other ReadLock
$store->save($key);
$this->unlockShared($key);
} catch (LockConflictedException $e) {
// an other key exists in this ReadLock
}

$store->delete($key);
}

public function exists(Key $key)
{
$sql = "SELECT count(*) FROM pg_locks WHERE locktype='advisory' AND objid=:key AND pid=pg_backend_pid()";
$result = $this->conn->executeQuery($sql, [
'key' => $this->getHashedKey($key),
]);

if ($result->fetchOne() > 0) {
// connection is locked, check for lock in internal store
return $this->getInternalStore()->exists($key);
}

return false;
}

public function waitAndSave(Key $key)
{
// prevent concurrency within the same connection
// Internal store does not allow blocking mode, because there is no way to acquire one in a single process
$this->getInternalStore()->save($key);

$sql = 'SELECT pg_advisory_lock(:key)';
$this->conn->executeStatement($sql, [
'key' => $this->getHashedKey($key),
]);

// release lock in case of promotion
$this->unlockShared($key);
}

public function waitAndSaveRead(Key $key)
{
// prevent concurrency within the same connection
// Internal store does not allow blocking mode, because there is no way to acquire one in a single process
$this->getInternalStore()->saveRead($key);

$sql = 'SELECT pg_advisory_lock_shared(:key)';
$this->conn->executeStatement($sql, [
'key' => $this->getHashedKey($key),
]);

// release lock in case of demotion
$this->unlock($key);
}

/**
* Returns a hashed version of the key.
*/
private function getHashedKey(Key $key): int
{
return crc32((string) $key);
}

private function unlock(Key $key): void
{
do {
$sql = "SELECT pg_advisory_unlock(objid::bigint) FROM pg_locks WHERE locktype='advisory' AND mode='ExclusiveLock' AND objid=:key AND pid=pg_backend_pid()";
$result = $this->conn->executeQuery($sql, [
'key' => $this->getHashedKey($key),
]);
} while (0 !== $result->rowCount());
}

private function unlockShared(Key $key): void
{
do {
$sql = "SELECT pg_advisory_unlock_shared(objid::bigint) FROM pg_locks WHERE locktype='advisory' AND mode='ShareLock' AND objid=:key AND pid=pg_backend_pid()";
$result = $this->conn->executeQuery($sql, [
'key' => $this->getHashedKey($key),
]);
} while (0 !== $result->rowCount());
}

/**
* Check driver and remove scheme extension from DSN.
* From pgsql+advisory://server/ to pgsql://server/.
*
* @throws InvalidArgumentException when driver is not supported
*/
private function filterDsn(string $dsn): string
{
if (!str_contains($dsn, '://')) {
throw new InvalidArgumentException(sprintf('String "%" is not a valid DSN for Doctrine DBAL.', $dsn));
}

[$scheme, $rest] = explode(':', $dsn, 2);
$driver = strtok($scheme, '+');
if (!\in_array($driver, ['pgsql', 'postgres', 'postgresql'])) {
throw new InvalidArgumentException(sprintf('The adapter "%s" does not support the "%s" driver.', __CLASS__, $driver));
}

return sprintf('%s:%s', $driver, $rest);
}

private function getInternalStore(): SharedLockStoreInterface
{
$namespace = spl_object_hash($this->conn);

return self::$storeRegistry[$namespace] ?? self::$storeRegistry[$namespace] = new InMemoryStore();
}
}
Loading