Skip to content

[RFC][Messenger] Redis Delay Implementation #31711

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ragboyjr opened this issue May 29, 2019 · 5 comments · Fixed by #31977
Closed

[RFC][Messenger] Redis Delay Implementation #31711

ragboyjr opened this issue May 29, 2019 · 5 comments · Fixed by #31977
Labels
Feature Messenger RFC RFC = Request For Comments (proposals about features that you want to be discussed)

Comments

@ragboyjr
Copy link
Contributor

Description

Currently, the Messenger Redis Implementation doesn't support the delay property of the DelayStamp. It's a bit tricky because redis streams doesn't support the delay natively.

I think we could probably implement something similar to Laravel where they basically put jobs with a delay into a sorted set sorted by the timestamp and when the consumer runs to pull a job, we first check the sorted set for jobs that are now available by the current timestamp, and if so, then we insert them into the queue.

Example

https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L109
https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/RedisQueue.php#L134

@ragboyjr ragboyjr changed the title [Messenger] Redis Delay Implementation [RFC][Messenger] Redis Delay Implementation May 29, 2019
@chalasr chalasr added Feature Messenger RFC RFC = Request For Comments (proposals about features that you want to be discussed) labels May 29, 2019
@weaverryan
Copy link
Member

Ping @alexander-schranz!

He and I had talked about this - I believe he also had a vision for how it should work, it just didn't get done in time. Could you share with us Alexander?

@alexander-schranz
Copy link
Contributor

alexander-schranz commented May 30, 2019

I would implement it the following way to have both delayed message and still have the redis stream features available and I think @ragboyjr did describe it the same way.

In case of no delay we just push the message directly to the stream. But in case when it is a delayed message we push the message into a redis sorted set where the key of the sorted set is the timestamp when it should be executed.

// Connection::add($message, $delay = 0)
if ($delay) {
     // push the message to a sorted set where the timestmap when to execute it is the sort key
     $redis->zadd($this->stream . '_queue', time() + $delay, $message);

     return;
}

// current implementation of the add ...
$redis->xadd(...);

In the get function we need first check the queue and push them then to stream if messages exist between 0 and current timestamp:

// Connection::get

// ...

// check if there are any messages which come from the queue
$messages = $redis->zrangebyscore($this->stream . '_queue', 0, time());

foreach ($messages as $message) {
    // pop message to avoid race condition when having multiple consumers
   $redis->...
    if ($poppedCorreclty) { // if not pop correctly because message not longer exist because of maybe a parallel running consumers did add it to the stream so just ignore it
        // push the messages to the stream so the message is available for all consumer and groups
        $this->add($message, 0);
    }
}

// following the current implementation with xreadgroup ...
$redis->xreadgroup(...)
// ...

The only disadvantage is that the consumer need to handle the delayed queue so if your consumer is not a symfony application he need to implement the get logic with sorted set read and push to the stream itself aslong as redis does not support delayed messages itself.

@ragboyjr
Copy link
Contributor Author

@alexander-schranz y, that's exactly what I was thinking.

@weaverryan
Copy link
Member

Someone want to open a PR?

@alexander-schranz
Copy link
Contributor

@weaverryan will try to have a look at it at the weekend, but if somebody else want to start work on it before just let me know here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Feature Messenger RFC RFC = Request For Comments (proposals about features that you want to be discussed)
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants