-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
fix SNS FIFO ordering #12285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix SNS FIFO ordering #12285
Conversation
LocalStack Community integration with Pro 2 files ± 0 2 suites ±0 1h 10m 17s ⏱️ - 40m 57s Results for commit 2550f06. ± Comparison against base commit 9bc5bc9. This pull request removes 1456 and adds 3 tests. Note that renamed tests count towards both.
This pull request removes 224 skipped tests and adds 1 skipped test. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
"tests/aws/services/sns/test_sns.py::TestSNSSubscriptionSQSFifo::test_message_to_fifo_sqs_ordering": { | ||
"recorded-date": "19-02-2025, 01:29:15", | ||
"recorded-content": {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
empty snapshot is because we're using a transformer fixture with autouse
, so it creates an entry even if no snapshot are recorded 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great! I'm wondering though if perhaps there's an approach with less operational overhead.
Like, what if we instead just created a subclass from a ThreadPoolExecutor
and added a constraint that each new task for the same topic should block until the previous job has completed -- emulating this FIFO behaviour (am aware there's a memory leak with the dictionary but ignore that for now lol).
I created the below and ran it against tests/aws/services/sns/test_sns.py::TestSNSSubscriptionSQSFifo
(including your new test) and it passed.
What're your thoughts?
class FifoThreadPoolExecutor(ThreadPoolExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._topic_events = {}
self._lock = threading.RLock()
# pass in the topic_arn at submission time
def submit(self, fn, topic_arn, /, *args, **kwargs):
with self._lock:
prev_event = self._topic_events.get(topic_arn)
new_event = threading.Event()
if prev_event is None:
new_event.set()
self._topic_events[topic_arn] = new_event
def wrapped_fn():
if prev_event:
prev_event.wait()
try:
result = fn(*args, **kwargs)
return result
finally:
new_event.set()
return super().submit(wrapped_fn)
I agree that creating a self managed The fact that all workers share the same work queue will linearize the threads if you get multiple tasks to the same topic, because each thread will pick a task related to the same topic, and they will wait on each other. This effect will be worse the more topic you add, because if you send first 20 messages to topic A and then 1 to topic B with only 10 workers, you'll probably have to wait until most of the topic A messages have been dispatched for your topic B message to be sent. I've tried to draw this to explain the behavior: On top is the (virtual) task queue, imagining that queuing a task takes negligible amount of time compared to executing one. Not sure what is the right way, and if this is really bad, because this executor will only be used for the FIFO topics, and the default implementation will still rely on the standard lib one. |
Thinking about this again, the right ordering needs to be on the Topic + Subscriber level, as for now, if we have a topic A with 10 subscribers and a second one B with only one, things are really unbalanced (because a task above would actually be 10 tasks, one for each subscriber). Only individual subscribers need to keep the ordering true related to the order it came from the topic, so to get even more performance, it'd almost be better to partition it on the topic ARN + subscriber ARN key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good change. Using the subscription key as well seems like a better distribution of tasks, though.
if index <= len(self._work_queues): | ||
self._add_worker() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was confusing on the first read - it almost seems like we are adding threads unbounded, but the _add_worker
method has a check in it. Maybe we can rename this a bit, so it is clearer? _add_worker_if_possible
or something. Just a minor nit though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the biggest check is that we're using cycle
, so the index will always circle back to 0. The check is add_worker
is mostly for safety, but it should actually never happen I think. I'll add a comment on the next cycle call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually looking at the code and yeah the check is bad actually 😂 I'll update it, I think I need to instead to get_worker_index
. Will update, thanks a lot, that makes it clearer
edit: we actually need to check on the amount of thread instead of the index, makes it much clearer and allows us to remove the check in _add_worker
.
Co-authored-by: Daniel Fangl <daniel.fangl@localstack.cloud>
Co-authored-by: Daniel Fangl <daniel.fangl@localstack.cloud>
Thanks a lot for the reviews! |
Motivation
We've got a report with #12276 that SNS FIFO ordering wasn't guaranteed between subscriptions of the same topic (usually first messages are unordered, then they get ordered).
This PR introduces a "partitioned" thread pool executor: each "topic" (it can be anything, it is not scoped only to SNS), will have its own worker. This allows the guarantee that the work executed on that topic is sequential.
But you can have multiple workers, and each will have some topics assigned to them.
The user also added a very good reproducer in Rust: https://github.com/dkales/fifo-example, verifying that ordering is the same across all queues, and it now works all the time.
Some more context
This was something we were aware of with @thrau and had discussed already a long time ago, but the request hadn't come yet.
A quick fix we thought of was to create a
ThreadPoolExecutor
with only one worker, so that we would make sure things would be executed sequentially, or create our own workers.What's happening today is that the
ThreadPoolExecutor
does not have all thread spawned up in the beginning. It takes a while for them to start, and they will start in random order, so publishing 10 messages will awake 10 threads, and thread no. 4 will maybe finish faster than thread no.2After a while, things get ordered because all threads are "warm", and they are picking up work from the single work queue of the executor in the same "rhythm".
Changes
TopicPartitionedThreadPoolExecutor
loosely based on the standard library onefixes #12276
Testing
Removing the new thread pool logic and trying the new test:
I've also tried the new test with 15 topics to validate that it works with more than 10 topics (with the 10 workers)