-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Batch delivery #51034
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 7.3
Are you sure you want to change the base?
[Messenger] Batch delivery #51034
Conversation
src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php
Outdated
Show resolved
Hide resolved
8ecf8bb
to
84e4797
Compare
src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransportFactory.php
Outdated
Show resolved
Hide resolved
84e4797
to
18d2270
Compare
308f750
to
5572863
Compare
I've rebased this PR on branch 6.4, and removed all the commits that were merely small improvements (I will open PRs for them later). I see that the pipeline is failing but it seems to be unrelated to this PR. I am, however, afraid that this PR might be getting stale since it is open for a few weeks now. Is this normal or is there something else that I should do? (I'm new to contributing to Symfony so I really don't know) btw, there are checks failing but I believe they are unrelated to this PR. |
bcd4960
to
c727bb9
Compare
Rebased again and put the changelog in the correct file |
e6bc6fe
to
5501be1
Compare
be723f3
to
b12ed43
Compare
b12ed43
to
32105fb
Compare
Currently, the doctrine transport only delivers one message at a time, resulting in low performance and a very high amount of hits to the DB. (one hit per message) With batch delivery, the transport will retrieve several messages in a single query. Co-authored-by: Alexander Malyk <shu.rick.ifmo@gmail.com>
32105fb
to
d2bae3b
Compare
public function __destruct() | ||
{ | ||
// The worker using this transport might have pulled 50 msgs out of the mq, marking them as "in process", | ||
// but because of its options (ie --limit, --failure-limit, --memory-limit, --time-limit) it might terminate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then I think we would increase complexity and it wouldn't scale.
For --limit
we could do some calculations... would be doable despite the increase in complexity..
But for -failure-limit
we would need to somehow keep track of how many failures happened, probably replicating logic that is already built somewhere. For --memory-limit
and --time-limit
the same.
Then, if/when in the future we would add more options, someone would need to also adjust this code, to be able to handle the new option.
With the implementation in this PR however, we don't need to do anything special to adjust to a particular option, neither now nor in the future if/when we add new options.
*/ | ||
public function extractDoctrineEnvelopeListIds(array $doctrineEnvelopeList): array | ||
{ | ||
return array_map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return array_column($doctrineEnvelopeList, 'id')
will do the same and might be a bit faster by keeping all the logic in the C layer.
@@ -151,7 +155,7 @@ public function send(string $body, array $headers, int $delay = 0): string | |||
return $this->driverConnection->lastInsertId(); | |||
} | |||
|
|||
public function get(): ?array | |||
public function get(): array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest document the type of the returned array.
], | ||
[ | ||
Types::DATETIME_IMMUTABLE, | ||
class_exists(ArrayParameterType::class) ? ArrayParameterType::INTEGER : Types::SIMPLE_ARRAY, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wrong. Types::SIMPLE_ARRAY
is something totally different. The fallback for older DBAL versions is Connection::PARAM_INT_ARRAY
.
And the code below does not check for older versions, so it is inconsistent.
@@ -58,11 +60,16 @@ public function get(): iterable | |||
throw new TransportException($exception->getMessage(), 0, $exception); | |||
} | |||
|
|||
if (null === $doctrineEnvelope) { | |||
if (null === $doctrineEnvelopeList) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't ever be true. Connection::get
cannot return null
anymore.
@@ -83,6 +90,11 @@ public function reject(Envelope $envelope): void | |||
} | |||
} | |||
|
|||
public function undeliverNotHandled(): void |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be marked as @internal
?
Currently, the doctrine transport only delivers one message at a time,
resulting in low performance and a very high amount of hits to the DB.
(one hit per message)
With batch delivery, the transport will retrieve several messages in a single query.