From 8c990f0cda1704eefef36c3b7a47b2d9ad94b17b Mon Sep 17 00:00:00 2001 From: Fabien Potencier Date: Wed, 29 Apr 2020 23:01:17 +0200 Subject: [PATCH] [Messenger] Expose the Doctrine Schema to help with migrations --- .../Bridge/Doctrine/Transport/Connection.php | 33 ++------- .../Bridge/Doctrine/Transport/Migration.php | 68 +++++++++++++++++++ .../Transport/PostgreSqlConnection.php | 26 ------- 3 files changed, 73 insertions(+), 54 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Migration.php diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php index 50218e2bb74a1..2322224b5adc9 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php @@ -244,7 +244,11 @@ public function setup(): void $this->driverConnection->getConfiguration()->setFilterSchemaAssetsExpression(null); } - $this->schemaSynchronizer->updateSchema($this->getSchema(), true); + $schema = new Schema([], [], $this->driverConnection->getSchemaManager()->createSchemaConfig()); + $migration = new Migration($this->configuration['table_name']); + $migration->up($schema, $this->driverConnection); + + $this->schemaSynchronizer->updateSchema($schema, true); if ($hasFilterCallback) { $this->driverConnection->getConfiguration()->setSchemaAssetsFilter($assetFilter); @@ -338,33 +342,6 @@ private function executeQuery(string $sql, array $parameters = [], array $types return $stmt; } - private function getSchema(): Schema - { - $schema = new Schema([], [], $this->driverConnection->getSchemaManager()->createSchemaConfig()); - $table = $schema->createTable($this->configuration['table_name']); - $table->addColumn('id', self::$useDeprecatedConstants ? Type::BIGINT : Types::BIGINT) - ->setAutoincrement(true) - ->setNotnull(true); - $table->addColumn('body', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT) - ->setNotnull(true); - $table->addColumn('headers', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT) - ->setNotnull(true); - $table->addColumn('queue_name', self::$useDeprecatedConstants ? Type::STRING : Types::STRING) - ->setNotnull(true); - $table->addColumn('created_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE) - ->setNotnull(true); - $table->addColumn('available_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE) - ->setNotnull(true); - $table->addColumn('delivered_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE) - ->setNotnull(false); - $table->setPrimaryKey(['id']); - $table->addIndex(['queue_name']); - $table->addIndex(['available_at']); - $table->addIndex(['delivered_at']); - - return $schema; - } - private function decodeEnvelopeHeaders(array $doctrineEnvelope): array { $doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true); diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Migration.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Migration.php new file mode 100644 index 0000000000000..91c2a4c545ab7 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Migration.php @@ -0,0 +1,68 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport; + +use Doctrine\DBAL\Connection; +use Doctrine\DBAL\Schema\Schema; +use Doctrine\DBAL\Types\Type; +use Doctrine\DBAL\Types\Types; + +final class Migration +{ + private $tableName; + + public function __construct(string $tableName = 'messenger_messages') + { + $this->tableName = $tableName; + } + + public static function up(Schema $schema, Connection $connection): void + { + $useDeprecatedConstants = !class_exists(Types::class); + $table = $schema->createTable($this->tableName); + $table->addColumn('id', $useDeprecatedConstants ? Type::BIGINT : Types::BIGINT)->setAutoincrement(true)->setNotnull(true); + $table->addColumn('body', $useDeprecatedConstants ? Type::TEXT : Types::TEXT)->setNotnull(true); + $table->addColumn('headers', $useDeprecatedConstants ? Type::TEXT : Types::TEXT)->setNotnull(true); + $table->addColumn('queue_name', $useDeprecatedConstants ? Type::STRING : Types::STRING)->setNotnull(true); + $table->addColumn('created_at', $useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)->setNotnull(true); + $table->addColumn('available_at', $useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)->setNotnull(true); + $table->addColumn('delivered_at', $useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)->setNotnull(false); + $table->setPrimaryKey(['id']); + $table->addIndex(['queue_name']); + $table->addIndex(['available_at']); + $table->addIndex(['delivered_at']); + + if ('postgresql' === $connection->getDatabasePlatform()->getName()) { + $sql = sprintf(<<<'SQL' +LOCK TABLE %1$s; +-- create trigger function +CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('%1$s', NEW.queue_name::text); + RETURN NEW; + END; +$$ LANGUAGE plpgsql; + +-- register trigger +DROP TRIGGER IF EXISTS notify_trigger ON %1$s; + +CREATE TRIGGER notify_trigger +AFTER INSERT +ON %1$s +FOR EACH ROW EXECUTE PROCEDURE notify_%1$s(); +SQL + , $this->tableName); + + $connection->exec($sql); + } + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php index ae15a40f4cd89..b35cc50836856 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php @@ -79,32 +79,6 @@ public function get(): ?array return parent::get(); } - public function setup(): void - { - parent::setup(); - - $sql = sprintf(<<<'SQL' -LOCK TABLE %1$s; --- create trigger function -CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$ - BEGIN - PERFORM pg_notify('%1$s', NEW.queue_name::text); - RETURN NEW; - END; -$$ LANGUAGE plpgsql; - --- register trigger -DROP TRIGGER IF EXISTS notify_trigger ON %1$s; - -CREATE TRIGGER notify_trigger -AFTER INSERT -ON %1$s -FOR EACH ROW EXECUTE PROCEDURE notify_%1$s(); -SQL - , $this->configuration['table_name']); - $this->driverConnection->exec($sql); - } - private function unlisten() { if (!$this->listening) {