Skip to content

Commit 4fae182

Browse files
committed
[Messenger] Add a Doctrine transport
1 parent edcd627 commit 4fae182

File tree

8 files changed

+347
-0
lines changed

8 files changed

+347
-0
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

+1
Original file line numberDiff line numberDiff line change
@@ -1499,6 +1499,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
14991499
if (empty($config['transports'])) {
15001500
$container->removeDefinition('messenger.transport.symfony_serializer');
15011501
$container->removeDefinition('messenger.transport.amqp.factory');
1502+
$container->removeDefinition('messenger.transport.doctrine.factory');
15021503
} else {
15031504
if ('messenger.transport.symfony_serializer' === $config['serializer']['id']) {
15041505
if (!$this->isConfigEnabled($container, $serializerConfig)) {

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

+8
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,13 @@
6161

6262
<tag name="messenger.transport_factory" />
6363
</service>
64+
65+
<service id="messenger.transport.doctrine.factory" class="Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory">
66+
<argument type="service" id="doctrine" on-invalid="ignore"/>
67+
<argument type="service" id="messenger.transport.serializer" />
68+
<argument>%kernel.debug%</argument>
69+
70+
<tag name="messenger.transport_factory" />
71+
</service>
6472
</services>
6573
</container>

src/Symfony/Component/Messenger/CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
4.3.0
5+
-----
6+
7+
* Add a `Doctrine` transport
8+
49
4.2.0
510
-----
611

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Doctrine;
13+
14+
use Doctrine\DBAL\Connection as DriverConnection;
15+
use Doctrine\DBAL\Schema\Schema;
16+
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
17+
use Doctrine\DBAL\Types\Type;
18+
19+
/**
20+
* @author Vincent Touzet <vincent.touzet@gmail.com>
21+
*/
22+
class Connection
23+
{
24+
/** @var string */
25+
private $tableName;
26+
/** @var \Doctrine\DBAL\Driver\Connection */
27+
private $driverConnection;
28+
/** @var bool */
29+
private $debug;
30+
31+
public function __construct(string $tableName, DriverConnection $driverConnection, bool $debug = false)
32+
{
33+
$this->tableName = $tableName;
34+
$this->driverConnection = $driverConnection;
35+
$this->debug = $debug;
36+
}
37+
38+
public function publish(string $body, array $headers): void
39+
{
40+
if (!$this->debug) {
41+
$this->setup();
42+
}
43+
$stmt = $this->driverConnection->prepare('INSERT INTO '.$this->tableName.' (`body`, `headers`) VALUES (:body, :headers)');
44+
$stmt->execute([
45+
':body' => $body,
46+
':headers' => $headers,
47+
]);
48+
}
49+
50+
public function get(): array
51+
{
52+
if (!$this->debug) {
53+
$this->setup();
54+
}
55+
$stmt = $this->driverConnection->prepare("SELECT * FROM {$this->tableName} WHERE status = 'pending' ORDER BY id ASC LIMIT 0,1");
56+
$stmt->execute();
57+
58+
$doctrineEnvelop = $stmt->fetch();
59+
60+
$this->changeStatus($doctrineEnvelop['id'], 'processing');
61+
62+
return $doctrineEnvelop;
63+
}
64+
65+
public function ack($id): bool
66+
{
67+
return $this->changeStatus($id, 'ack');
68+
}
69+
70+
public function nack($id): bool
71+
{
72+
return $this->changeStatus($id, 'pending');
73+
}
74+
75+
private function changeStatus($id, $status): bool
76+
{
77+
return $this->driverConnection->update($this->tableName, array('status' => $status), array('id' => $id)) > 0;
78+
}
79+
80+
private function setup(): void
81+
{
82+
$synchronizer = new SingleDatabaseSynchronizer($this->driverConnection);
83+
$synchronizer->updateSchema($this->getSchema());
84+
}
85+
86+
private function getSchema(): Schema
87+
{
88+
$schema = new Schema();
89+
$table = $schema->createTable($this->tableName);
90+
$table->addColumn('id', Type::BIGINT)
91+
->setAutoincrement(true)
92+
->setNotnull(true);
93+
$table->addColumn('body', Type::TEXT)
94+
->setNotnull(true);
95+
$table->addColumn('headers', Type::TARRAY)
96+
->setNotnull(true);
97+
$table->addColumn('status', Type::STRING)
98+
->setLength(32)
99+
->setDefault('pending');
100+
$table->setPrimaryKey(array('id'));
101+
102+
return $schema;
103+
}
104+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Doctrine;
13+
14+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
15+
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
16+
17+
/**
18+
* @author Vincent Touzet <vincent.touzet@gmail.com>
19+
*/
20+
class DoctrineReceiver implements ReceiverInterface
21+
{
22+
/** @var \Symfony\Component\Messenger\Transport\Doctrine\Connection */
23+
private $connection;
24+
/** @var \Symfony\Component\Messenger\Transport\Serialization\Serializer */
25+
private $serializer;
26+
/** @var bool */
27+
private $shouldStop = false;
28+
29+
public function __construct(Connection $connection, Serializer $serializer = null)
30+
{
31+
$this->connection = $connection;
32+
$this->serializer = $serializer ?? Serializer::create();
33+
}
34+
35+
public function receive(callable $handler): void
36+
{
37+
while (!$this->shouldStop) {
38+
$doctrineEnvelop = $this->connection->get();
39+
40+
if (null === $doctrineEnvelop) {
41+
$handler(null);
42+
43+
usleep(200000);
44+
if (\function_exists('pcntl_signal_dispatch')) {
45+
pcntl_signal_dispatch();
46+
}
47+
48+
continue;
49+
}
50+
51+
try {
52+
$handler($this->serializer->decode(array(
53+
'body' => $doctrineEnvelop['body'],
54+
'headers' => $doctrineEnvelop['headers'],
55+
)));
56+
57+
$this->connection->ack($doctrineEnvelop['id']);
58+
} catch (\Throwable $e) {
59+
$this->connection->nack($doctrineEnvelop['id']);
60+
61+
throw $e;
62+
} finally {
63+
if (\function_exists('pcntl_signal_dispatch')) {
64+
pcntl_signal_dispatch();
65+
}
66+
}
67+
}
68+
}
69+
70+
public function stop(): void
71+
{
72+
$this->shouldStop = true;
73+
}
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Doctrine;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
16+
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
17+
18+
/**
19+
* @author Vincent Touzet <vincent.touzet@gmail.com>
20+
*/
21+
class DoctrineSender implements SenderInterface
22+
{
23+
/** @var \Symfony\Component\Messenger\Transport\Doctrine\Connection */
24+
private $connection;
25+
/** @var \Symfony\Component\Messenger\Transport\Serialization\Serializer */
26+
private $serializer;
27+
28+
public function __construct(Connection $connection, Serializer $serializer = null)
29+
{
30+
$this->connection = $connection;
31+
$this->serializer = $serializer ?? Serializer::create();
32+
}
33+
34+
public function send(Envelope $envelope): Envelope
35+
{
36+
$encodedMessage = $this->serializer->encode($envelope);
37+
38+
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
39+
40+
return $envelope;
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Doctrine;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
16+
use Symfony\Component\Messenger\Transport\TransportInterface;
17+
18+
/**
19+
* @author Vincent Touzet <vincent.touzet@gmail.com>
20+
*/
21+
class DoctrineTransport implements TransportInterface
22+
{
23+
/** @var \Doctrine\DBAL\Connection */
24+
private $connection;
25+
/** @var \Symfony\Component\Messenger\Transport\Serialization\Serializer */
26+
private $serializer;
27+
/** @var \Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver */
28+
private $receiver;
29+
/** @var \Symfony\Component\Messenger\Transport\Doctrine\DoctrineSender */
30+
private $sender;
31+
32+
public function __construct(Connection $connection, Serializer $serializer)
33+
{
34+
$this->connection = $connection;
35+
$this->serializer = $serializer ?? Serializer::create();
36+
}
37+
38+
public function receive(callable $handler): void
39+
{
40+
($this->receiver ?? $this->getReceiver())->receive($handler);
41+
}
42+
43+
public function stop(): void
44+
{
45+
($this->receiver ?? $this->getReceiver())->stop();
46+
}
47+
48+
public function send(Envelope $envelope): Envelope
49+
{
50+
return ($this->sender ?? $this->getSender())->send($envelope);
51+
}
52+
53+
private function getReceiver(): DoctrineReceiver
54+
{
55+
return $this->receiver = new DoctrineReceiver($this->connection, $this->serializer);
56+
}
57+
58+
private function getSender(): DoctrineSender
59+
{
60+
return $this->sender = new DoctrineSender($this->connection, $this->serializer);
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Doctrine;
13+
14+
use Symfony\Bridge\Doctrine\RegistryInterface;
15+
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
16+
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
17+
use Symfony\Component\Messenger\Transport\TransportInterface;
18+
19+
/**
20+
* @author Vincent Touzet <vincent.touzet@gmail.com>
21+
*/
22+
class DoctrineTransportFactory implements TransportFactoryInterface
23+
{
24+
/** @var \Symfony\Bridge\Doctrine\RegistryInterface */
25+
private $registry;
26+
/** @var \Symfony\Component\Messenger\Transport\Serialization\Serializer */
27+
private $serializer;
28+
/** @var bool */
29+
private $debug;
30+
31+
public function __construct(RegistryInterface $registry, Serializer $serializer = null, $debug = false)
32+
{
33+
$this->registry = $registry;
34+
$this->serializer = $serializer ?? Serializer::create();
35+
$this->debug = $debug;
36+
}
37+
38+
public function createTransport(string $dsn, array $options): TransportInterface
39+
{
40+
$manager = str_replace('doctrine://', '', $dsn);
41+
42+
$connection = new Connection($options['table_name'] ?? 'messenger_messages', $this->registry->getConnection($manager), $this->debug);
43+
44+
return new DoctrineTransport($connection, $this->serializer);
45+
}
46+
47+
public function supports(string $dsn, array $options): bool
48+
{
49+
return 0 === strpos($dsn, 'doctrine://');
50+
}
51+
}

0 commit comments

Comments
 (0)