-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
Add handling for delayed message to redis transport #31977
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add handling for delayed message to redis transport #31977
Conversation
e5c2dc5
to
2e54349
Compare
$queuedMessageCount = $this->connection->zcount($this->queue, 0, time()); | ||
|
||
if ($queuedMessageCount) { | ||
foreach ($this->connection->zpopmin($this->queue, $queuedMessageCount) as $queuedMessage => $time) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, if there are 100 delayed messages, and only 5 delayed messages that actually are available now, wouldn't this end up cycling through the 100 delayed messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ragboyjr no $queuedMessageCount
will be 5 then and so it only pop 5 messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, understood.
Your technique for handling the race condition seems nice and simple, but one thing that concerns me would be loss of messages between the time that we pop it, then your redis instance has some failure, and then it fails when we try to re-add it (in the event of that race condition.
Part of me wonders if maybe, doing something where we pass in a LUA script to where it pops and pushes into the stream in one transaction would be preferable since the whole transaction would fail instead of part way through. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ragboyjr As redis has only features to move a message from list to another list and not from a list to a stream. we would need to create a transaction then. never used that but I think it should be something like this:
$this->connection->multi();
foreach ($this->connection->zpopmin($this->queue, $queuedMessageCount) as $queuedMessage => $time) {
$queuedMessage = json_encode($queuedMessage, true);
// if a futured placed message is actually popped because of a race condition with
// another running message consumer, the message is readded to the queue by add function
// else its just added stream and will be available for all stream consumers
$this->add($queuedMessage['body'], $queuedMessage['headers'], (time() - $time) * 1000);
}
$this->connection->exec();
we only need to start the transaction if there are really queued messages so its not blocking if there are none.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Y, I think a redis transaction would work as well, would like to see if that could be something we could cover in an integration test of adding items to the queue, then having redis fail and making sure we don't lose any messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Transactions sadly don't work as we can't read inside a transaction.
So to only solution would be to park this messages in another list as every consumer need its own name we postfix the new list with stream + group + consumer names and so its a unique list only for this consumer to avoid a conflicts with other running consumers. If it crash the consumer need to check if he has still things in this list to push to the stream.
Ping @alexander-schranz! It looks like this is technically challenging. Is the path clear? Or are we stuck on those details? |
faeb7f0
to
9031d90
Compare
42a84cd
to
7407abd
Compare
@weaverryan theoretically there is a little time where a message could get lost when the consumer crashes. Between read from the queued/delayed message sorted set list and adding to the stream. symfony/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php Lines 120 to 128 in 7407abd
I did now change that only one message is read and added to the stream. |
@alexander-schranz do you think utilizing a lua script similar to laravel would be able to prevent this issue? |
@alexander-schranz mind if I take a stab at implementing the transactions part as a lua script? |
I have a bit of time this weekend, and I'd love to see this feature for the redis adapter. |
@ragboyjr sure. Basically its the |
Is there any progress on this PR? |
7407abd
to
ebb9ca2
Compare
@sroze do you want to have a look at this :)? |
@alexander-schranz Is this done & ready? Or are there still some issues? |
@weaverryan ready from my side :) |
I'm playing with this right now, will try to leave a review tonight. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add changelog entry
src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Outdated
Show resolved
Hide resolved
df3840b
to
39057cf
Compare
39057cf
to
cfece10
Compare
Thank you @alexander-schranz. |
…lexander-schranz) This PR was merged into the 4.4 branch. Discussion ---------- Add handling for delayed message to redis transport | Q | A | ------------- | --- | Branch? | 4.4 | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | WIP | Fixed tickets | Fixes #31711 | License | MIT | Doc PR | symfony/symfony-docs#... TODO Still in WIP: This pull request implements delayed messages for redis transport. It will park the messages in an own sorted set and if the time comes it will push the messages to the stream to make them available for all consumers. Because of a race condition when having multiple consumers it need to be checked if not accidently a message from the future is popped by zpopmin so the add function is called and there is check if the delay is in the present/past and only then add the message to the stream. Commits ------- cfece10 Add handling for delayed message to redis transport
…ander-schranz) This PR was merged into the 4.4 branch. Discussion ---------- Remove hint that redis does not support DelayStamp The DelayStamp was added in 4.4 symfony/symfony#31977 so the hint can now be removed. fixes #12595 Commits ------- b98fa06 Remove hint that redis does not support DelayStamp
Holy Wow!!! so excited to see this merged in!!! |
Awesome! |
Still in WIP: This pull request implements delayed messages for redis transport. It will park the messages in an own sorted set and if the time comes it will push the messages to the stream to make them available for all consumers. Because of a race condition when having multiple consumers it need to be checked if not accidently a message from the future is popped by zpopmin so the add function is called and there is check if the delay is in the present/past and only then add the message to the stream.