Skip to content

Commit 9c24923

Browse files
committed
[Messenger] Expose the Doctrine Schema to help with migrations
1 parent 71b3912 commit 9c24923

File tree

3 files changed

+65
-54
lines changed

3 files changed

+65
-54
lines changed

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

+4-28
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,10 @@ public function setup(): void
244244
$this->driverConnection->getConfiguration()->setFilterSchemaAssetsExpression(null);
245245
}
246246

247-
$this->schemaSynchronizer->updateSchema($this->getSchema(), true);
247+
$schema = new Schema([], [], $this->driverConnection->getSchemaManager()->createSchemaConfig());
248+
Migration::up($schema, $this->configuration['table_name']);
249+
250+
$this->schemaSynchronizer->updateSchema($schema, true);
248251

249252
if ($hasFilterCallback) {
250253
$this->driverConnection->getConfiguration()->setSchemaAssetsFilter($assetFilter);
@@ -338,33 +341,6 @@ private function executeQuery(string $sql, array $parameters = [], array $types
338341
return $stmt;
339342
}
340343

341-
private function getSchema(): Schema
342-
{
343-
$schema = new Schema([], [], $this->driverConnection->getSchemaManager()->createSchemaConfig());
344-
$table = $schema->createTable($this->configuration['table_name']);
345-
$table->addColumn('id', self::$useDeprecatedConstants ? Type::BIGINT : Types::BIGINT)
346-
->setAutoincrement(true)
347-
->setNotnull(true);
348-
$table->addColumn('body', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT)
349-
->setNotnull(true);
350-
$table->addColumn('headers', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT)
351-
->setNotnull(true);
352-
$table->addColumn('queue_name', self::$useDeprecatedConstants ? Type::STRING : Types::STRING)
353-
->setNotnull(true);
354-
$table->addColumn('created_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
355-
->setNotnull(true);
356-
$table->addColumn('available_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
357-
->setNotnull(true);
358-
$table->addColumn('delivered_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
359-
->setNotnull(false);
360-
$table->setPrimaryKey(['id']);
361-
$table->addIndex(['queue_name']);
362-
$table->addIndex(['available_at']);
363-
$table->addIndex(['delivered_at']);
364-
365-
return $schema;
366-
}
367-
368344
private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
369345
{
370346
$doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
13+
14+
use Doctrine\DBAL\Connection;
15+
use Doctrine\DBAL\Schema\Schema;
16+
use Doctrine\DBAL\Types\Type;
17+
use Doctrine\DBAL\Types\Types;
18+
19+
final class Migration
20+
{
21+
public static function up(Schema $schema, Connection $connection, string $tableName = 'messenger_messages'): void
22+
{
23+
$useDeprecatedConstants = !class_exists(Types::class);
24+
$table = $schema->createTable($tableName);
25+
$table->addColumn('id', $useDeprecatedConstants ? Type::BIGINT : Types::BIGINT)->setAutoincrement(true)->setNotnull(true);
26+
$table->addColumn('body', $useDeprecatedConstants ? Type::TEXT : Types::TEXT)->setNotnull(true);
27+
$table->addColumn('headers', $useDeprecatedConstants ? Type::TEXT : Types::TEXT)->setNotnull(true);
28+
$table->addColumn('queue_name', $useDeprecatedConstants ? Type::STRING : Types::STRING)->setNotnull(true);
29+
$table->addColumn('created_at', $useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)->setNotnull(true);
30+
$table->addColumn('available_at', $useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)->setNotnull(true);
31+
$table->addColumn('delivered_at', $useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)->setNotnull(false);
32+
$table->setPrimaryKey(['id']);
33+
$table->addIndex(['queue_name']);
34+
$table->addIndex(['available_at']);
35+
$table->addIndex(['delivered_at']);
36+
37+
if ('postgresql' === $connection->getDatabasePlatform()->getName()) {
38+
$sql = sprintf(<<<'SQL'
39+
LOCK TABLE %1$s;
40+
-- create trigger function
41+
CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$
42+
BEGIN
43+
PERFORM pg_notify('%1$s', NEW.queue_name::text);
44+
RETURN NEW;
45+
END;
46+
$$ LANGUAGE plpgsql;
47+
48+
-- register trigger
49+
DROP TRIGGER IF EXISTS notify_trigger ON %1$s;
50+
51+
CREATE TRIGGER notify_trigger
52+
AFTER INSERT
53+
ON %1$s
54+
FOR EACH ROW EXECUTE PROCEDURE notify_%1$s();
55+
SQL
56+
, $tableName);
57+
58+
$connection->exec($sql);
59+
}
60+
}
61+
}

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php

-26
Original file line numberDiff line numberDiff line change
@@ -79,32 +79,6 @@ public function get(): ?array
7979
return parent::get();
8080
}
8181

82-
public function setup(): void
83-
{
84-
parent::setup();
85-
86-
$sql = sprintf(<<<'SQL'
87-
LOCK TABLE %1$s;
88-
-- create trigger function
89-
CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$
90-
BEGIN
91-
PERFORM pg_notify('%1$s', NEW.queue_name::text);
92-
RETURN NEW;
93-
END;
94-
$$ LANGUAGE plpgsql;
95-
96-
-- register trigger
97-
DROP TRIGGER IF EXISTS notify_trigger ON %1$s;
98-
99-
CREATE TRIGGER notify_trigger
100-
AFTER INSERT
101-
ON %1$s
102-
FOR EACH ROW EXECUTE PROCEDURE notify_%1$s();
103-
SQL
104-
, $this->configuration['table_name']);
105-
$this->driverConnection->exec($sql);
106-
}
107-
10882
private function unlisten()
10983
{
11084
if (!$this->listening) {

0 commit comments

Comments
 (0)