-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Adding message de-duplication to transports that do not have it (doctrine transport being inspiration) #58634
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
i'd like to see a solution at the abstraction level, aka TransportInterface ref nats-io/nats-server#5142 |
TLDR; the problem to solve is "dont send while queued" |
IF we want to dedup per msg body:
see |
in our codebase, we only provide a |
I have seen the #54141 PR, it is lock component-based and lock component has major limitations with different stores https://symfony.com/doc/current/components/lock.html#available-stores I understand that making it as universal as possible is the target, but I've run into my share of issues with messenger due to traffic causing issues, and deadlocks (thankfully there are a bunch of fixes in 7.1, but I need to upgrade to it yet), but that's besides the point. Why would you use a slow external deduplication method when Amazon's SQS has a built in deduplication that you can just enable in the service settings (or send an amazon specific stamp). Same for RabbitMQ - it's a plugin for the server that does it server side, not client side. Same goes for the Doctrine Transport - client side deduplication would be horribly slow due to either filesystem I/O or networking delays (in case of Redis/Doctrine/PosgreSQL Lock component stores). If you mean that we should have an abstract |
Yes, the idea is to reuse as much as we can of the existing infrastructure and avoid adding as much overhead as possible :) It's why this whole deduplication thing doesn't work that well with just custom stamps as done in https://github.com/ByteSpin/MessengerDedupeBundle I just see this a bit differently because I have a high-load use case for which I'm putting effort into making deduplication in the Messenger component a reality and am willing and able to spend dev time. I know for a fact that adding filesystem I/O or network I/O into the mix is going to make things a lot worse in an already very hot code path that had lots of issues with deadlocks and had to be updated multiple times with database locking that's DB platform based: #54105 , #52639 I don't want to do just a naive implementation, I also want to make sure we implement efficient methods too right out of the gate because quite a lot of people have asked for it and as you saw on SF slack, there is a pretty constant stream of questions about deduplication over time and I've seen a very varied list of needs that is not going to be covered by "one size fits all" approach at all that is Lock component approach. |
i realize solving this in abstract manner is not as easy it's mostly an implementation detail per transport/queue that said, we should strive for generic reuseable stamps |
Thank you for this suggestion. |
Description
I want to add a message queue de-duplication that is based on a message content hash for Doctrine Transport since SQL databases have a natural way to do it, which is by using the
INSERT INTO ... ON DUPLICATE KEY
statement (and analogues where supported).While using the lock component to lock and release either with a custom middleware or inside handlers themselves is something you could do, there are some significant performance considerations for doctrine transports that require adding at least one table, running additional queries and there are race condition considerations that are hard to overcome, especially when we are starting to consider high load scenarios running many workers in parallel spread out over many servers.
RabbitMQ and Amazon SQS have built-in queue de-duplication that can be used (didn't find any mention for beanstalkd).
Redis has a big warning that you can't run more than one consumer per specific combo of stream, group and consumer, so parallel processing of messages seems to not be an issue due to how Redis works for queues - consumers always run in sequence.
My idea for Doctrine based transports is pretty simple - we add a nullable hash field to
messenger_message
table with a unique index on it with default hash implementation being:$hash = hash('whatever we pick, md5?', sprintf('%s-%s-%s', $queueName, get_class($messageObject), json_serialize($messageObject)))
. With the on duplicate, since we are trying to insert same body for the same queue of the same message type, we just doON DUPLICATE body = body
since the body is identical, so we don't need to change it. Definetelly works for MySQL, MariaDB and PosgtreSQL. I don't really have any experience with Oracle or MSSQL, so for this I would like to ask people in the know to chip in.Why is this good?
NULL
makes it optional, so you can configure deduplication based on queue or even message type if we want to implement it like that.As long as the message sits in the queue in the table, it gets deduplicated automatically until it's processed whatever it's run time is without relying on any 3rd party systems.
And we can provide via a stamp
DeduplicationKeyStamp
for people to use their own logic for key combos - if they want message type-based deduplication instead of message type + body + queue name - easy to implement. Could even just do a callback.I was told that this probably would end up being a more general implementation in abstract layer, I'm fine if it goes that way, but I see problems that some transports afford far more efficient ways to do it than others - SQL and RabbitMQ having deduplication in the software itself, Doctrine Transport can leverage SQL features, but things like Redis and Beanstalkd definitely will have to relly on external locks (lock component). I'm not sure there's a good way to make it non-transport-specific, or at least each transport will have it's own implementation under the hood with some generic deduplication stamp. But I was thinking more that being configuration level thing on message type declaration maybe even or queue based setting (maybe just apply a stamp automatically via config options so you don't have to add it manually on every dispatch in the code).
At least for Doctrine Transport, I'm available to do the implementation myself (contribute some of my company's time for it)
Example
No response
The text was updated successfully, but these errors were encountered: