Skip to content

Commit 205ecb8

Browse files
bentskudfangl
andauthored
fix SNS FIFO ordering (#12285)
Co-authored-by: Daniel Fangl <daniel.fangl@localstack.cloud>
1 parent 5d463b5 commit 205ecb8

File tree

5 files changed

+190
-2
lines changed

5 files changed

+190
-2
lines changed
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import itertools
2+
import logging
3+
import os
4+
import queue
5+
import threading
6+
7+
LOG = logging.getLogger(__name__)
8+
9+
10+
def _worker(work_queue: queue.Queue):
11+
try:
12+
while True:
13+
work_item = work_queue.get(block=True)
14+
if work_item is None:
15+
return
16+
work_item.run()
17+
# delete reference to the work item to avoid it being in memory until the next blocking `queue.get` call returns
18+
del work_item
19+
20+
except Exception:
21+
LOG.exception("Exception in worker")
22+
23+
24+
class _WorkItem:
25+
def __init__(self, fn, args, kwargs):
26+
self.fn = fn
27+
self.args = args
28+
self.kwargs = kwargs
29+
30+
def run(self):
31+
try:
32+
self.fn(*self.args, **self.kwargs)
33+
except Exception:
34+
LOG.exception("Unhandled Exception in while running %s", self.fn.__name__)
35+
36+
37+
class TopicPartitionedThreadPoolExecutor:
38+
"""
39+
This topic partition the work between workers based on Topics.
40+
It guarantees that each Topic only has one worker assigned, and thus that the tasks will be executed sequentially.
41+
42+
Loosely based on ThreadPoolExecutor for stdlib, but does not return Future as SNS does not need it (fire&forget)
43+
Could be extended if needed to fit other needs.
44+
45+
Currently, we do not re-balance between workers if some of them have more load. This could be investigated.
46+
"""
47+
48+
# Used to assign unique thread names when thread_name_prefix is not supplied.
49+
_counter = itertools.count().__next__
50+
51+
def __init__(self, max_workers: int = None, thread_name_prefix: str = ""):
52+
if max_workers is None:
53+
max_workers = min(32, (os.cpu_count() or 1) + 4)
54+
if max_workers <= 0:
55+
raise ValueError("max_workers must be greater than 0")
56+
57+
self._max_workers = max_workers
58+
self._thread_name_prefix = (
59+
thread_name_prefix or f"TopicThreadPoolExecutor-{self._counter()}"
60+
)
61+
62+
# for now, the pool isn't fair and is not redistributed depending on load
63+
self._pool = {}
64+
self._shutdown = False
65+
self._lock = threading.Lock()
66+
self._threads = set()
67+
self._work_queues = []
68+
self._cycle = itertools.cycle(range(max_workers))
69+
70+
def _add_worker(self):
71+
work_queue = queue.SimpleQueue()
72+
self._work_queues.append(work_queue)
73+
thread_name = f"{self._thread_name_prefix}_{len(self._threads)}"
74+
t = threading.Thread(name=thread_name, target=_worker, args=(work_queue,))
75+
t.daemon = True
76+
t.start()
77+
self._threads.add(t)
78+
79+
def _get_work_queue(self, topic: str) -> queue.SimpleQueue:
80+
if not (work_queue := self._pool.get(topic)):
81+
if len(self._threads) < self._max_workers:
82+
self._add_worker()
83+
84+
# we cycle through the possible indexes for a work queue, in order to distribute the load across
85+
# once we get to the max amount of worker, the cycle will start back at 0
86+
index = next(self._cycle)
87+
work_queue = self._work_queues[index]
88+
89+
# TODO: the pool is not cleaned up at the moment, think about the clean-up interface
90+
self._pool[topic] = work_queue
91+
return work_queue
92+
93+
def submit(self, fn, topic, /, *args, **kwargs) -> None:
94+
with self._lock:
95+
work_queue = self._get_work_queue(topic)
96+
97+
if self._shutdown:
98+
raise RuntimeError("cannot schedule new futures after shutdown")
99+
100+
w = _WorkItem(fn, args, kwargs)
101+
work_queue.put(w)
102+
103+
def shutdown(self, wait=True):
104+
with self._lock:
105+
self._shutdown = True
106+
107+
# Send a wake-up to prevent threads calling
108+
# _work_queue.get(block=True) from permanently blocking.
109+
for work_queue in self._work_queues:
110+
work_queue.put(None)
111+
112+
if wait:
113+
for t in self._threads:
114+
t.join()

localstack-core/localstack/services/sns/publisher.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from localstack.config import external_service_url
2323
from localstack.services.sns import constants as sns_constants
2424
from localstack.services.sns.certificate import SNS_SERVER_PRIVATE_KEY
25+
from localstack.services.sns.executor import TopicPartitionedThreadPoolExecutor
2526
from localstack.services.sns.filter import SubscriptionFilter
2627
from localstack.services.sns.models import (
2728
SnsApplicationPlatforms,
@@ -1176,9 +1177,13 @@ class PublishDispatcher:
11761177

11771178
def __init__(self, num_thread: int = 10):
11781179
self.executor = ThreadPoolExecutor(num_thread, thread_name_prefix="sns_pub")
1180+
self.topic_partitioned_executor = TopicPartitionedThreadPoolExecutor(
1181+
max_workers=num_thread, thread_name_prefix="sns_pub_fifo"
1182+
)
11791183

11801184
def shutdown(self):
11811185
self.executor.shutdown(wait=False)
1186+
self.topic_partitioned_executor.shutdown(wait=False)
11821187

11831188
def _should_publish(
11841189
self,
@@ -1295,8 +1300,16 @@ def publish_batch_to_topic(self, ctx: SnsBatchPublishContext, topic_arn: str) ->
12951300
)
12961301
self._submit_notification(notifier, individual_ctx, subscriber)
12971302

1298-
def _submit_notification(self, notifier, ctx: SnsPublishContext, subscriber: SnsSubscription):
1299-
self.executor.submit(notifier.publish, context=ctx, subscriber=subscriber)
1303+
def _submit_notification(
1304+
self, notifier, ctx: SnsPublishContext | SnsBatchPublishContext, subscriber: SnsSubscription
1305+
):
1306+
if (topic_arn := subscriber.get("TopicArn", "")).endswith(".fifo"):
1307+
# TODO: we still need to implement Message deduplication on the topic level with `should_publish` for FIFO
1308+
self.topic_partitioned_executor.submit(
1309+
notifier.publish, topic_arn, context=ctx, subscriber=subscriber
1310+
)
1311+
else:
1312+
self.executor.submit(notifier.publish, context=ctx, subscriber=subscriber)
13001313

13011314
def publish_to_phone_number(self, ctx: SnsPublishContext, phone_number: str) -> None:
13021315
LOG.debug(

tests/aws/services/sns/test_sns.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2997,6 +2997,60 @@ def test_publish_to_fifo_with_target_arn(self, sns_create_topic, aws_client):
29972997
)
29982998
assert "MessageId" in response
29992999

3000+
@markers.aws.validated
3001+
def test_message_to_fifo_sqs_ordering(
3002+
self,
3003+
sns_create_topic,
3004+
sqs_create_queue,
3005+
sns_create_sqs_subscription,
3006+
snapshot,
3007+
aws_client,
3008+
sqs_collect_messages,
3009+
):
3010+
topic_name = f"topic-{short_uid()}.fifo"
3011+
topic_attributes = {"FifoTopic": "true", "ContentBasedDeduplication": "true"}
3012+
topic_arn = sns_create_topic(
3013+
Name=topic_name,
3014+
Attributes=topic_attributes,
3015+
)["TopicArn"]
3016+
3017+
queue_attributes = {"FifoQueue": "true", "ContentBasedDeduplication": "true"}
3018+
queues = []
3019+
queue_amount = 5
3020+
message_amount = 10
3021+
3022+
for _ in range(queue_amount):
3023+
queue_name = f"queue-{short_uid()}.fifo"
3024+
queue_url = sqs_create_queue(
3025+
QueueName=queue_name,
3026+
Attributes=queue_attributes,
3027+
)
3028+
sns_create_sqs_subscription(
3029+
topic_arn=topic_arn, queue_url=queue_url, Attributes={"RawMessageDelivery": "true"}
3030+
)
3031+
queues.append(queue_url)
3032+
3033+
for i in range(message_amount):
3034+
aws_client.sns.publish(
3035+
TopicArn=topic_arn, Message=str(i), MessageGroupId="message-group-id-1"
3036+
)
3037+
3038+
all_messages = []
3039+
for queue_url in queues:
3040+
messages = sqs_collect_messages(
3041+
queue_url,
3042+
expected=message_amount,
3043+
timeout=10,
3044+
max_number_of_messages=message_amount,
3045+
)
3046+
contents = [message["Body"] for message in messages]
3047+
all_messages.append(contents)
3048+
3049+
# we're expecting the order to be the same across all queues
3050+
reference_order = all_messages[0]
3051+
for received_content in all_messages[1:]:
3052+
assert received_content == reference_order
3053+
30003054

30013055
class TestSNSSubscriptionSES:
30023056
@markers.aws.only_localstack

tests/aws/services/sns/test_sns.snapshot.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5078,5 +5078,9 @@
50785078
]
50795079
}
50805080
}
5081+
},
5082+
"tests/aws/services/sns/test_sns.py::TestSNSSubscriptionSQSFifo::test_message_to_fifo_sqs_ordering": {
5083+
"recorded-date": "19-02-2025, 01:29:15",
5084+
"recorded-content": {}
50815085
}
50825086
}

tests/aws/services/sns/test_sns.validation.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,9 @@
185185
"tests/aws/services/sns/test_sns.py::TestSNSSubscriptionSQSFifo::test_message_to_fifo_sqs[True]": {
186186
"last_validated_date": "2023-11-09T20:12:03+00:00"
187187
},
188+
"tests/aws/services/sns/test_sns.py::TestSNSSubscriptionSQSFifo::test_message_to_fifo_sqs_ordering": {
189+
"last_validated_date": "2025-02-19T01:29:14+00:00"
190+
},
188191
"tests/aws/services/sns/test_sns.py::TestSNSSubscriptionSQSFifo::test_publish_batch_messages_from_fifo_topic_to_fifo_queue[False]": {
189192
"last_validated_date": "2023-11-09T20:10:33+00:00"
190193
},

0 commit comments

Comments
 (0)