-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Added support for auto trimming of redis streams #31825
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
So this is similar to TTL in rabbitmq: https://www.rabbitmq.com/ttl.html If we can implement it for redis and doctrine, we could add a generic |
There is no such thing as TTL in Redis Streams :) |
Redis stream's ID are timestamp, you may run a script to remove older items local t = redis.call('TIME')
local keys=redis.call('XRANGE', ARGV[1], '-', (t[1]-ARGV[2]) * 1000)
for i=1,#keys,1 do
redis.call('XDEL', ARGV[1], keys[i][1])
end
return #keys with note: I'm not an expert of redis, I don't know the performance impact of such script. |
There‘s a reason why TTL support is not built-in in Redis (yet). You can read more about it here: https://github.com/antirez/redis/issues/4450 |
I don't see the need to implement that as a core feature. If messages need to have a ttl then people should use the tools that support it, e.g. rabbitmq. If it's purely about local development where you might not run the workers, then it would be enough to have a script that deletes old items from redis that you can call manually or in a cron or whatever people prefer. |
@Tobion with redis implémentation, consumed message are not deleted. This PR is not about providing a feature to delete message. But about cleaning the server. 2 options:
@Toflar I'm aware of that issue. When using redis stream as a messages storage there is a trade off between performance and reliability In my experience I already had situation where consumers were out during few hours and queue grown 100 000 times bigger than usual. In such situation, capping with MAXLEN would have lost messages or would requires to set a extremely high MAXLEN (which will consume a lot of space even when the situation is ok) Maybe we can consider combining the 2 solutions: counting outdated elements with local t = redis.call('TIME')
local keys=redis.call('XRANGE', ARGV[1], '-', (t[1]-ARGV[2]) * 1000, 'COUNT', ARGV[3])
if #keys > 0 then
local l = redis.call('XLEN', ARGV[1])
redis.call('XTRIM', ARGV[1], 'MAXLEN', '~', l-#keys)
return #keys
else
return 0
end" what do you think? |
The use case is that I already have Redis running and Streams are pretty much explicitly built for message handling. Why would I want to manage yet another tool if Redis can serve me well? 😄
Hmmm, so I see why my PR does not seem practical under certain circumstances. These are the use cases I see:
And all of those optionally restricted to receivers. So I don't know but maybe something like
(I did not really think about naming and defaults, I just think aloud here.) Does this seem like a better approach? |
hmm. I'm not sure about a dedicated command, because: running a periodic small cleanup (like you did with the trimProbability) or a big one every X minutes, will, in both case, block the Redis thread and block every consumers. BUT, the more you call the trim algorithm, the smaller is the subset of messages to remove, and the shorter will be the blocking time window. FYI, I've made a small bench with very simple data (I don't know if the result will be the same with larger messages) it takes ~50ms to remove a small subset of ~10 000 (that's why I was thinking about removing the oldest 2 000 messages every 1 000 XADD) But thinking about that, I really wonder if such complexity worth it. Maybe, I'm just exposing an edge case, and your first idea is the good one. We may warned user in the documentation, and recommend to use a real messaging bus for high throughput cases. |
The 4 parameter of the xadd function is the maxlen e.g.: $redis->xadd("new3", "*", ["key" => "value"], 10); its just missing in the phpdocs (my fault 🙈 ). |
Ah 😄 Well, that can be easily changed. Anything you want to add to the discussion? |
@Toflar I think by default we should not autotrim because a default value is difficult to set here, but it would be good to have a way to configure this. Is the performance better when using |
Agreed.
I have absolutely no clue. I don't know which is better, |
A simple bench the xadd seems to be a better choice: Testvalues adding 100000 messages with a trim size of 100: normal xAdd: 4.6s So I think we should go with: $this->connection->xadd($this->stream, '*', ..., $this->maxlength, (bool) $this->maxlength); As its optional I think its ok to implement this for the redis transport? |
Thanks for benchmarking 👍 I'll update the PR accordingly but I'd like to wait for feedback by Sam and Robin. |
Thanks for raising the issue @Toflar. |
src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me 👍
1942fec
to
7fe06bc
Compare
Thank you @Toflar. |
…treams (Toflar) This PR was squashed before being merged into the 4.4 branch (closes #31825). Discussion ---------- [Messenger] Added support for auto trimming of redis streams | Q | A | ------------- | --- | Branch? | 4.4 | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | | License | MIT | Doc PR | will submit if concept is okay Right now the Redis stream will just grow indefinitely. However, there are means to have it delete old entries from time to time. Note: I could not use the `XADD mystream MAXLEN ~ 1000 *` notation because the PHP redis extension does not support the `MAXLEN` option afaics so I went for the extra `XTRIM` command. I explicitly enabled the approximate flag because it makes absolutely no sense to hardcode the limit for us although we could even have this configurable too (but I don't think we should). The whole idea of this PR is to enable occasional trimming of the stream so it doesn't grow forever, so when you configure something like `20000` it may well happen that trimming only happens at `25000` depending on your settings. Ping @soyuka @alexander-schranz @chalasr :) Commits ------- 7fe06bc [Messenger] Added support for auto trimming of redis streams
… option (Toflar) This PR was submitted for the master branch but it was merged into the 4.4 branch instead (closes symfony#11870). Discussion ---------- Documented redis transport new stream_max_entries option Docs for symfony/symfony#31825 Commits ------- d1aa071 Documented redis transport new stream_max_entries option
Right now the Redis stream will just grow indefinitely. However, there are means to have it delete old entries from time to time.
Note: I could not use the
XADD mystream MAXLEN ~ 1000 *
notation because the PHP redis extension does not support theMAXLEN
option afaics so I went for the extraXTRIM
command.I explicitly enabled the approximate flag because it makes absolutely no sense to hardcode the limit for us although we could even have this configurable too (but I don't think we should).
The whole idea of this PR is to enable occasional trimming of the stream so it doesn't grow forever, so when you configure something like
20000
it may well happen that trimming only happens at25000
depending on your settings.Ping @soyuka @alexander-schranz @chalasr :)