Skip to content

[Messenger] Adding message de-duplication to transports that do not have it (doctrine transport being inspiration) #58634

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
psihius opened this issue Oct 22, 2024 · 8 comments

Comments

@psihius
Copy link

psihius commented Oct 22, 2024

Description

I want to add a message queue de-duplication that is based on a message content hash for Doctrine Transport since SQL databases have a natural way to do it, which is by using the INSERT INTO ... ON DUPLICATE KEY statement (and analogues where supported).

While using the lock component to lock and release either with a custom middleware or inside handlers themselves is something you could do, there are some significant performance considerations for doctrine transports that require adding at least one table, running additional queries and there are race condition considerations that are hard to overcome, especially when we are starting to consider high load scenarios running many workers in parallel spread out over many servers.

RabbitMQ and Amazon SQS have built-in queue de-duplication that can be used (didn't find any mention for beanstalkd).
Redis has a big warning that you can't run more than one consumer per specific combo of stream, group and consumer, so parallel processing of messages seems to not be an issue due to how Redis works for queues - consumers always run in sequence.

My idea for Doctrine based transports is pretty simple - we add a nullable hash field to messenger_message table with a unique index on it with default hash implementation being: $hash = hash('whatever we pick, md5?', sprintf('%s-%s-%s', $queueName, get_class($messageObject), json_serialize($messageObject))). With the on duplicate, since we are trying to insert same body for the same queue of the same message type, we just do ON DUPLICATE body = body since the body is identical, so we don't need to change it. Definetelly works for MySQL, MariaDB and PosgtreSQL. I don't really have any experience with Oracle or MSSQL, so for this I would like to ask people in the know to chip in.

Why is this good?

  • Nullable field with default being NULL makes it optional, so you can configure deduplication based on queue or even message type if we want to implement it like that.
  • We are not adding any additional locks, especially if it's a multi-server consumer environment with a single queue server, which removes needing to setup anything and get it out of the box when enabled at least for Doctrine Transport
  • We add minimal processing to the hot path - the serializer already is creating the JSON to insert into the database, so ideally it's just hash calculation with some simple logic.
  • It's high throughput friendly since database handles conflicts (forgoing contention issues with locks).
  • Long-running or delayed messages do not have issues with long TTL times (for example, I have in my system messages that can be delayed for up to 10 days to be delivered, maintaining locks via Lock component is not going to be great, especially when I have a run rate of 200-300 messages a second).

As long as the message sits in the queue in the table, it gets deduplicated automatically until it's processed whatever it's run time is without relying on any 3rd party systems.

And we can provide via a stamp DeduplicationKeyStamp for people to use their own logic for key combos - if they want message type-based deduplication instead of message type + body + queue name - easy to implement. Could even just do a callback.

I was told that this probably would end up being a more general implementation in abstract layer, I'm fine if it goes that way, but I see problems that some transports afford far more efficient ways to do it than others - SQL and RabbitMQ having deduplication in the software itself, Doctrine Transport can leverage SQL features, but things like Redis and Beanstalkd definitely will have to relly on external locks (lock component). I'm not sure there's a good way to make it non-transport-specific, or at least each transport will have it's own implementation under the hood with some generic deduplication stamp. But I was thinking more that being configuration level thing on message type declaration maybe even or queue based setting (maybe just apply a stamp automatically via config options so you don't have to add it manually on every dispatch in the code).

At least for Doctrine Transport, I'm available to do the implementation myself (contribute some of my company's time for it)

Example

No response

@ro0NL
Copy link
Contributor

ro0NL commented Oct 22, 2024

i'd like to see a solution at the abstraction level, aka TransportInterface

ref nats-io/nats-server#5142
also ref #54141

@ro0NL
Copy link
Contributor

ro0NL commented Oct 22, 2024

TLDR; the problem to solve is "dont send while queued"

@ro0NL
Copy link
Contributor

ro0NL commented Oct 22, 2024

IF we want to dedup per msg body:

$encodedMessage = $this->serializer->encode($envelope);
$hash = hash('xxh128', $encodedMessage['body']) 

see Symfony\Component\Messenger\Transport\Serialization\SerializerInterface

@ro0NL
Copy link
Contributor

ro0NL commented Oct 22, 2024

And we can provide via a stamp DeduplicationKeyStamp

in our codebase, we only provide a DeduplicateStamp::$enabled = bool, to tell wether the message should be dedupped yes/no

@psihius
Copy link
Author

psihius commented Oct 22, 2024

I have seen the #54141 PR, it is lock component-based and lock component has major limitations with different stores https://symfony.com/doc/current/components/lock.html#available-stores
Many of the stores do not support blocking, and many that do are local only.
Those that are remote and support locking are SQL-based stores, which, in the case of Doctrine Transport, is one hell of an overhead that can be entirely avoided.

I understand that making it as universal as possible is the target, but I've run into my share of issues with messenger due to traffic causing issues, and deadlocks (thankfully there are a bunch of fixes in 7.1, but I need to upgrade to it yet), but that's besides the point. Why would you use a slow external deduplication method when Amazon's SQS has a built in deduplication that you can just enable in the service settings (or send an amazon specific stamp). Same for RabbitMQ - it's a plugin for the server that does it server side, not client side. Same goes for the Doctrine Transport - client side deduplication would be horribly slow due to either filesystem I/O or networking delays (in case of Redis/Doctrine/PosgreSQL Lock component stores).

If you mean that we should have an abstract DeduplicateStamp that then each transport accepts and has it's own transport specific implementation that's more efficient (and we can provide ability to fall back to using just Lock component) - that's something I can get behind as configurable.

@psihius
Copy link
Author

psihius commented Oct 22, 2024

IF we want to dedup per msg body:

$encodedMessage = $this->serializer->encode($envelope);
$hash = hash('xxh128', $encodedMessage['body']) 

see Symfony\Component\Messenger\Transport\Serialization\SerializerInterface

Yes, the idea is to reuse as much as we can of the existing infrastructure and avoid adding as much overhead as possible :) It's why this whole deduplication thing doesn't work that well with just custom stamps as done in https://github.com/ByteSpin/MessengerDedupeBundle

I just see this a bit differently because I have a high-load use case for which I'm putting effort into making deduplication in the Messenger component a reality and am willing and able to spend dev time. I know for a fact that adding filesystem I/O or network I/O into the mix is going to make things a lot worse in an already very hot code path that had lots of issues with deadlocks and had to be updated multiple times with database locking that's DB platform based: #54105 , #52639

I don't want to do just a naive implementation, I also want to make sure we implement efficient methods too right out of the gate because quite a lot of people have asked for it and as you saw on SF slack, there is a pretty constant stream of questions about deduplication over time and I've seen a very varied list of needs that is not going to be covered by "one size fits all" approach at all that is Lock component approach.

@ro0NL
Copy link
Contributor

ro0NL commented Oct 23, 2024

i realize solving this in abstract manner is not as easy

it's mostly an implementation detail per transport/queue

that said, we should strive for generic reuseable stamps

@carsonbot
Copy link

Thank you for this suggestion.
There has not been a lot of activity here for a while. Would you still like to see this feature?
Every feature is developed by the community.
Perhaps someone would like to try?
You can read how to contribute to get started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants