-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Add a Doctrine transport #29007
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Messenger] Add a Doctrine transport #29007
Conversation
ef46d12
to
536ed49
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, but overall missing typehints / return hints as it's a feature for 4.3+, so php 7.1+. I'm guessing the missing tests are because it's a wip :}
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php
Outdated
Show resolved
Hide resolved
536ed49
to
4fae182
Compare
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Outdated
Show resolved
Hide resolved
f0b0993
to
fc59186
Compare
fc59186
to
f998853
Compare
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransportFactory.php
Outdated
Show resolved
Hide resolved
d174c1b
to
4023eaa
Compare
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransportFactory.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransportFactory.php
Outdated
Show resolved
Hide resolved
4023eaa
to
9014d1b
Compare
cdcd762
to
2a31b47
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add @experimental in 4.3
on all these classes please? Then, a few remaining comments. After this, it should be ready to go!
'body' => '{"message": "Hi handled"}', | ||
'headers' => json_encode(['type' => DummyMessage::class]), | ||
'queue' => 'default', | ||
'created_at' => '2019-03-15 12:00:00', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do this then. It's going to be YYYY-MM-DDTHH:mm:ss.sssZ
, with the DateTime
as UTC. I suggest to use a function rather than having the ->format(...)
call copy/pasted.
src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Outdated
Show resolved
Hide resolved
@vincenttouzet mind the conflict with |
b2bccd7
to
f4f4bf4
Compare
@sroze I've rebase and squash my commits :) |
@vincenttouzet but you kept the |
f4f4bf4
to
3991088
Compare
That was just the doc in the configuration (part of another commit). |
return null; | ||
} | ||
|
||
$doctrineEnvelope['headers'] = Type::getType(Type::JSON)->convertToPHPValue($doctrineEnvelope['headers'], $this->driverConnection->getDatabasePlatform()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type::JSON
does not exists on early Doctrine versions as the low
test suite is highlighting. Therefore, I suggest to move it back to STRING
(like the body
) and use json_encode
. Whoever wants to have the table with a JSON
field can create it manually. (FYI @weaverryan)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes i saw that. I was trying to require a minimum version for Doctrine but the Json type is available since v2.6.0 (tagged 2017-07-23 01:02) ... I've put back the json_encode / json_decode with the string type then
3991088
to
88d008c
Compare
Thank you @vincenttouzet. |
This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Add a Doctrine transport | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | | License | MIT | Doc PR | symfony/symfony-docs#10616 | DoctrineBundle PR | doctrine/DoctrineBundle#868 As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component. Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database). # How it works The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868 ## Configuration To configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`) ```yml # config/packages/messenger.yaml framework: messenger: transports: my_transport: "doctrine://default?queue=important" ``` ## Table schema Dispatched messages are stored into a database table with the following schema: | Column | Type | Options | Description | |--------------|----------|--------------------------|-------------------------------------------------------------------| | id | bigint | AUTO_INCREMENT, NOT NULL | Primary key | | body | text | NOT NULL | Body of the message | | headers | text | NOT NULL | Headers of the message | | queue | varchar(32) | NOT NULL | Headers of the message | | created_at | datetime | NOT NULL | When the message was inserted onto the table. (automatically set) | | available_at | datetime | NOT NULL | When the message is available to be handled | | delivered_at | datetime | NULL | When the message was delivered to a worker | ## Message dispatching When dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish` ## Message consuming The message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle. ### Getting the next message * Start a transaction * Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query) * Update the message in database to update the delivered_at columns * Commit the transaction ### Handling the message The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table. If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`. ## Message requeueing It may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds) # TODO - [x] Add tests - [x] Create DOC PR - [x] PR on doctrine-bundle for transport factory - [x] Add a `available_at` column - [x] Add a `queue` column - [x] Implement the retry functionnality : See #30557 - [x] Rebase after #29476 Commits ------- 88d008c [Messenger] Add a Doctrine transport
@sroze while updating the Doc PR I realized that I've keep the |
@vincenttouzet sure. |
@sroze Hum there is also a problem with the DoctrineTransportFactory: Fatal error: Declaration of Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory::createTransport(string $dsn, array $options): Symfony\Component\Messenger\Transport\TransportInterface must be compatible with Symfony\Component\Messenger\Transport\TransportFactoryInterface::createTransport(string $dsn, array $options, Symfony\Component\Messenger\Transport\Serialization\SerializerInterface $serializer): Symfony\Component\Messenger\Transport\TransportInterface in /var/www/sources/symfony/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransportFactory.php on line 26 it worked before merging 🤔 . I can fix this in the same MR if you want |
I fixed it in #30799. |
…terface (sroze) This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Fix the Doctrine transport to use the new interface | Q | A | ------------- | --- | Branch? | master | Bug fix? | yes | New feature? | no | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | #29007 | License | MIT | Doc PR | ø Commits ------- 75e3355 Fix the Doctrine transport to use the new interface
…uzet) This PR was submitted for the master branch but it was merged into the 4.3 branch instead (closes #10616). Discussion ---------- [Messenger] Describe the doctrine transport Add documentation relative to the PR symfony/symfony#29007 TODO - [ ] Document the max granularity in seconds for delay (doctrine/dbal#2873) Commits ------- 27c0f9d [Messenger] Describe the doctrine tranport
As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component.
Actually
AMQP
is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database).How it works
The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868
Configuration
To configure a Doctrine transport the dsn MUST have the format
doctrine://<entity_manager_name>
where<entity_manager_name>
is the name of the entity manager (usuallydefault
)Table schema
Dispatched messages are stored into a database table with the following schema:
Message dispatching
When dispatching a message a new row is inserted into the table. See
Symfony\Component\Messenger\Transport\Doctrine::publish
Message consuming
The message is retrieved by the
Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver
. It calls theSymfony\Component\Messenger\Transport\Doctrine::get
method to get the next message to handle.Getting the next message
SELECT ... FOR UPDATE
query)Handling the message
The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the
Symfony\Component\Messenger\Transport\Doctrine::ack
which delete the message from the table.If an error occured the receiver call the
Symfony\Component\Messenger\Transport\Doctrine::nack
method which update the message to set the delivered_at column tonull
.Message requeueing
It may happen that a message is stuck in
delivered
state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages theDoctrineReceiver
call theSymfony\Component\Messenger\Transport\Doctrine::requeueMessages
. This method update all the message with adelivered_at
not null since more than the "redeliver timeout" (default to 3600 seconds)TODO
available_at
columnqueue
column