-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[RFC][Messenger] Redis Delay Implementation #31711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Ping @alexander-schranz! He and I had talked about this - I believe he also had a vision for how it should work, it just didn't get done in time. Could you share with us Alexander? |
I would implement it the following way to have both delayed message and still have the redis stream features available and I think @ragboyjr did describe it the same way. In case of no delay we just push the message directly to the stream. But in case when it is a delayed message we push the message into a redis sorted set where the key of the sorted set is the timestamp when it should be executed. // Connection::add($message, $delay = 0)
if ($delay) {
// push the message to a sorted set where the timestmap when to execute it is the sort key
$redis->zadd($this->stream . '_queue', time() + $delay, $message);
return;
}
// current implementation of the add ...
$redis->xadd(...); In the get function we need first check the queue and push them then to stream if messages exist between 0 and current timestamp: // Connection::get
// ...
// check if there are any messages which come from the queue
$messages = $redis->zrangebyscore($this->stream . '_queue', 0, time());
foreach ($messages as $message) {
// pop message to avoid race condition when having multiple consumers
$redis->...
if ($poppedCorreclty) { // if not pop correctly because message not longer exist because of maybe a parallel running consumers did add it to the stream so just ignore it
// push the messages to the stream so the message is available for all consumer and groups
$this->add($message, 0);
}
}
// following the current implementation with xreadgroup ...
$redis->xreadgroup(...)
// ... The only disadvantage is that the consumer need to handle the delayed queue so if your consumer is not a symfony application he need to implement the get logic with sorted set read and push to the stream itself aslong as redis does not support delayed messages itself. |
@alexander-schranz y, that's exactly what I was thinking. |
Someone want to open a PR? |
@weaverryan will try to have a look at it at the weekend, but if somebody else want to start work on it before just let me know here. |
Description
Currently, the Messenger Redis Implementation doesn't support the delay property of the DelayStamp. It's a bit tricky because redis streams doesn't support the delay natively.
I think we could probably implement something similar to Laravel where they basically put jobs with a delay into a sorted set sorted by the timestamp and when the consumer runs to pull a job, we first check the sorted set for jobs that are now available by the current timestamp, and if so, then we insert them into the queue.
Example
https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L109
https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/RedisQueue.php#L134
The text was updated successfully, but these errors were encountered: