Skip to content

fix SNS FIFO ordering #12285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 21, 2025
Merged

fix SNS FIFO ordering #12285

merged 7 commits into from
Feb 21, 2025

Conversation

bentsku
Copy link
Contributor

@bentsku bentsku commented Feb 19, 2025

Motivation

We've got a report with #12276 that SNS FIFO ordering wasn't guaranteed between subscriptions of the same topic (usually first messages are unordered, then they get ordered).

This PR introduces a "partitioned" thread pool executor: each "topic" (it can be anything, it is not scoped only to SNS), will have its own worker. This allows the guarantee that the work executed on that topic is sequential.
But you can have multiple workers, and each will have some topics assigned to them.

The user also added a very good reproducer in Rust: https://github.com/dkales/fifo-example, verifying that ordering is the same across all queues, and it now works all the time.

Some more context

This was something we were aware of with @thrau and had discussed already a long time ago, but the request hadn't come yet.

A quick fix we thought of was to create a ThreadPoolExecutor with only one worker, so that we would make sure things would be executed sequentially, or create our own workers.

What's happening today is that the ThreadPoolExecutor does not have all thread spawned up in the beginning. It takes a while for them to start, and they will start in random order, so publishing 10 messages will awake 10 threads, and thread no. 4 will maybe finish faster than thread no.2
After a while, things get ordered because all threads are "warm", and they are picking up work from the single work queue of the executor in the same "rhythm".

Changes

  • introduced a TopicPartitionedThreadPoolExecutor loosely based on the standard library one
  • use it for FIFO topics in SNS so that we guarantee ordering
  • add a test for it

fixes #12276

Testing

Removing the new thread pool logic and trying the new test:

tests/aws/services/sns/test_sns.py:2999 (TestSNSSubscriptionSQSFifo.test_message_to_fifo_sqs_ordering)
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'] != ['0', '2', '1', '3', '4', '5', '6', '7', '8', '9']

Expected :['0', '2', '1', '3', '4', '5', '6', '7', '8', '9']
Actual   :['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

I've also tried the new test with 15 topics to validate that it works with more than 10 topics (with the 10 workers)

@bentsku bentsku added aws:sns Amazon Simple Notification Service semver: minor Non-breaking changes which can be included in minor releases, but not in patch releases labels Feb 19, 2025
@bentsku bentsku self-assigned this Feb 19, 2025
@bentsku bentsku added this to the 4.2 milestone Feb 19, 2025
Copy link

github-actions bot commented Feb 19, 2025

LocalStack Community integration with Pro

    2 files  ±    0      2 suites  ±0   1h 10m 17s ⏱️ - 40m 57s
2 646 tests  - 1 453  2 537 ✅  - 1 230  109 💤  - 223  0 ❌ ±0 
2 648 runs   - 1 453  2 537 ✅  - 1 230  111 💤  - 223  0 ❌ ±0 

Results for commit 2550f06. ± Comparison against base commit 9bc5bc9.

This pull request removes 1456 and adds 3 tests. Note that renamed tests count towards both.
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_lambda_dynamodb
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_opensearch_crud
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_search_books
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_setup
tests.aws.scenario.kinesis_firehose.test_kinesis_firehose.TestKinesisFirehoseScenario ‑ test_kinesis_firehose_s3
tests.aws.scenario.lambda_destination.test_lambda_destination_scenario.TestLambdaDestinationScenario ‑ test_destination_sns
tests.aws.scenario.lambda_destination.test_lambda_destination_scenario.TestLambdaDestinationScenario ‑ test_infra
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_prefill_dynamodb_table
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_stepfunctions_input_recipient_list[step_function_input0-SUCCEEDED]
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_stepfunctions_input_recipient_list[step_function_input1-SUCCEEDED]
…
tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction ‑ test_invalid_vpc_config_security_group
tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction ‑ test_invalid_vpc_config_subnet
tests.aws.services.sns.test_sns.TestSNSSubscriptionSQSFifo ‑ test_message_to_fifo_sqs_ordering
This pull request removes 224 skipped tests and adds 1 skipped test. Note that renamed tests count towards both.
tests.aws.scenario.kinesis_firehose.test_kinesis_firehose.TestKinesisFirehoseScenario ‑ test_kinesis_firehose_s3
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_stepfunctions_input_recipient_list[step_function_input4-FAILED]
tests.aws.scenario.mythical_mysfits.test_mythical_misfits.TestMythicalMisfitsScenario ‑ test_deployed_infra_state
tests.aws.scenario.mythical_mysfits.test_mythical_misfits.TestMythicalMisfitsScenario ‑ test_populate_data
tests.aws.scenario.mythical_mysfits.test_mythical_misfits.TestMythicalMisfitsScenario ‑ test_user_clicks_are_stored
tests.aws.services.cloudcontrol.test_cloudcontrol_api.TestCloudControlResourceApi ‑ test_api_exceptions
tests.aws.services.cloudcontrol.test_cloudcontrol_api.TestCloudControlResourceApi ‑ test_create_exceptions
tests.aws.services.cloudcontrol.test_cloudcontrol_api.TestCloudControlResourceApi ‑ test_create_invalid_desiredstate
tests.aws.services.cloudcontrol.test_cloudcontrol_api.TestCloudControlResourceApi ‑ test_double_create_with_client_token
tests.aws.services.cloudcontrol.test_cloudcontrol_api.TestCloudControlResourceApi ‑ test_lifecycle
…
tests.aws.services.lambda_.test_lambda_api.TestLambdaFunction ‑ test_invalid_vpc_config_security_group

♻️ This comment has been updated with latest results.

@bentsku bentsku requested review from dfangl and gregfurman February 19, 2025 10:03
@bentsku bentsku marked this pull request as ready for review February 19, 2025 10:03
@bentsku bentsku requested a review from baermat as a code owner February 19, 2025 10:03
Comment on lines +5082 to +5084
"tests/aws/services/sns/test_sns.py::TestSNSSubscriptionSQSFifo::test_message_to_fifo_sqs_ordering": {
"recorded-date": "19-02-2025, 01:29:15",
"recorded-content": {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty snapshot is because we're using a transformer fixture with autouse, so it creates an entry even if no snapshot are recorded 😅

Copy link
Contributor

@gregfurman gregfurman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! I'm wondering though if perhaps there's an approach with less operational overhead.

Like, what if we instead just created a subclass from a ThreadPoolExecutor and added a constraint that each new task for the same topic should block until the previous job has completed -- emulating this FIFO behaviour (am aware there's a memory leak with the dictionary but ignore that for now lol).

I created the below and ran it against tests/aws/services/sns/test_sns.py::TestSNSSubscriptionSQSFifo (including your new test) and it passed.

What're your thoughts?

class FifoThreadPoolExecutor(ThreadPoolExecutor):
   def __init__(self, *args, **kwargs):
       super().__init__(*args, **kwargs)
       self._topic_events = {}
       self._lock = threading.RLock()

   # pass in the topic_arn at submission time
   def submit(self, fn, topic_arn, /, *args, **kwargs):
       with self._lock:
           prev_event = self._topic_events.get(topic_arn)
           new_event = threading.Event()
           
           if prev_event is None:
               new_event.set()
           
           self._topic_events[topic_arn] = new_event
       
       def wrapped_fn():
           if prev_event:
               prev_event.wait()
               
           try:
               result = fn(*args, **kwargs)
               return result
           finally:
               new_event.set()

       return super().submit(wrapped_fn)

@bentsku
Copy link
Contributor Author

bentsku commented Feb 19, 2025

I agree that creating a self managed ThreadPoolExecutor-like class is quite a lot of overhead, but sadly there's nothing in the standard library to fit this kind of need.

The fact that all workers share the same work queue will linearize the threads if you get multiple tasks to the same topic, because each thread will pick a task related to the same topic, and they will wait on each other. This effect will be worse the more topic you add, because if you send first 20 messages to topic A and then 1 to topic B with only 10 workers, you'll probably have to wait until most of the topic A messages have been dispatched for your topic B message to be sent.
This is because the workers will actively pick up tasks from the queue, and then wait on it until previous ones are executed.

I've tried to draw this to explain the behavior:

On top is the (virtual) task queue, imagining that queuing a task takes negligible amount of time compared to executing one.
On the left is your implementation, and on the right the one work queue per worker one.
On the left, the worker queue is the same as the task queue. On the right, tasks will be split into different queues depending on the topic, allowing workers to pick what they want and never wait.
If you do the same schema as under, but with 10 Topic A tasks, the Topic B tasks are going to wait a very long time, which is why this implementation is similar to having only one worker thread.

Untitled-2025-01-03-1509-6

Not sure what is the right way, and if this is really bad, because this executor will only be used for the FIFO topics, and the default implementation will still rely on the standard lib one.

@bentsku
Copy link
Contributor Author

bentsku commented Feb 21, 2025

Thinking about this again, the right ordering needs to be on the Topic + Subscriber level, as for now, if we have a topic A with 10 subscribers and a second one B with only one, things are really unbalanced (because a task above would actually be 10 tasks, one for each subscriber). Only individual subscribers need to keep the ordering true related to the order it came from the topic, so to get even more performance, it'd almost be better to partition it on the topic ARN + subscriber ARN key.

Copy link
Member

@dfangl dfangl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good change. Using the subscription key as well seems like a better distribution of tasks, though.

Comment on lines 86 to 87
if index <= len(self._work_queues):
self._add_worker()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was confusing on the first read - it almost seems like we are adding threads unbounded, but the _add_worker method has a check in it. Maybe we can rename this a bit, so it is clearer? _add_worker_if_possible or something. Just a minor nit though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the biggest check is that we're using cycle, so the index will always circle back to 0. The check is add_worker is mostly for safety, but it should actually never happen I think. I'll add a comment on the next cycle call

Copy link
Contributor Author

@bentsku bentsku Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually looking at the code and yeah the check is bad actually 😂 I'll update it, I think I need to instead to get_worker_index. Will update, thanks a lot, that makes it clearer
edit: we actually need to check on the amount of thread instead of the index, makes it much clearer and allows us to remove the check in _add_worker.

bentsku and others added 3 commits February 21, 2025 15:53
Co-authored-by: Daniel Fangl <daniel.fangl@localstack.cloud>
Co-authored-by: Daniel Fangl <daniel.fangl@localstack.cloud>
@bentsku
Copy link
Contributor Author

bentsku commented Feb 21, 2025

Thanks a lot for the reviews!
In order to keep the 1st iteration simple, we will keep partitioning on the TopicARN only, but will do a follow up with a more fine grained partitioning on the TopicARN + Subscriber ARN, as well as adding some clean up when the topic/subscriptions are deleted.

@bentsku bentsku merged commit 205ecb8 into master Feb 21, 2025
31 checks passed
@bentsku bentsku deleted the fix-sns-fifo branch February 21, 2025 20:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
aws:sns Amazon Simple Notification Service semver: minor Non-breaking changes which can be included in minor releases, but not in patch releases
Projects
None yet
Development

Successfully merging this pull request may close these issues.

bug: FIFO SNS topic to FIFO SQS queue fan-out does not produce same order in subscribed queues
3 participants