-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
[ESM] Improve SQS batch collection and flushing #12002
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0c13876
795ad69
3dd840f
8f162b6
0237cab
6dd8fc5
e1ca826
d600448
4d9e087
1ae029c
c9df467
9ff29b7
fde11fc
fbefdd6
e597a3c
3216d7a
4e65ee0
f5ed105
3684339
397d354
f747b4a
fa1a10a
275f942
fa3cc11
95782dc
5edb709
6ebbf2f
72ea973
23d6cb6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,13 +16,21 @@ | |
Poller, | ||
parse_batch_item_failures, | ||
) | ||
from localstack.services.sqs.constants import HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT | ||
from localstack.services.lambda_.event_source_mapping.senders.sender_utils import ( | ||
batched, | ||
) | ||
from localstack.services.sqs.constants import ( | ||
HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT, | ||
HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS, | ||
) | ||
from localstack.utils.aws.arns import parse_arn | ||
from localstack.utils.strings import first_char_to_lower | ||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
DEFAULT_MAX_RECEIVE_COUNT = 10 | ||
# See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html | ||
DEFAULT_MAX_WAIT_TIME_SECONDS = 20 | ||
joe4dev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
class SqsPoller(Poller): | ||
|
@@ -71,13 +79,34 @@ def handle_message_count_override(params, context, **kwargs): | |
|
||
context[HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT] = str(requested_count) | ||
|
||
def handle_message_wait_time_seconds_override(params, context, **kwargs): | ||
requested_wait = params.pop("sqs_override_wait_time_seconds", None) | ||
if not requested_wait or requested_wait <= DEFAULT_MAX_WAIT_TIME_SECONDS: | ||
return | ||
|
||
context[HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS] = str(requested_wait) | ||
|
||
def handle_inject_headers(params, context, **kwargs): | ||
if override := context.pop(HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT, None): | ||
params["headers"][HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT] = override | ||
if override_message_count := context.pop( | ||
HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT, None | ||
): | ||
params["headers"][HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT] = ( | ||
override_message_count | ||
) | ||
|
||
if override_wait_time := context.pop( | ||
HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS, None | ||
): | ||
params["headers"][HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS] = ( | ||
override_wait_time | ||
) | ||
|
||
event_system.register( | ||
"provide-client-params.sqs.ReceiveMessage", handle_message_count_override | ||
) | ||
event_system.register( | ||
"provide-client-params.sqs.ReceiveMessage", handle_message_wait_time_seconds_override | ||
) | ||
# Since we delete SQS messages after processing, this allows us to remove up to 10K entries at a time. | ||
event_system.register( | ||
"provide-client-params.sqs.DeleteMessageBatch", handle_message_count_override | ||
|
@@ -98,30 +127,62 @@ def event_source(self) -> str: | |
return "aws:sqs" | ||
|
||
def poll_events(self) -> None: | ||
# SQS pipe source: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html | ||
# "The 9 Ways an SQS Message can be Deleted": https://lucvandonkersgoed.com/2022/01/20/the-9-ways-an-sqs-message-can-be-deleted/ | ||
# TODO: implement batch window expires based on MaximumBatchingWindowInSeconds | ||
# TODO: implement invocation payload size quota | ||
# TODO: consider long-polling vs. short-polling trade-off. AWS uses long-polling: | ||
# https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html#pipes-sqs-scaling | ||
# In order to improve performance, we've adopted long-polling for the SQS poll operation `ReceiveMessage` [1]. | ||
# * Our LS-internal optimizations leverage custom boto-headers to set larger batch sizes and longer wait times than what the AWS API allows [2]. | ||
# * Higher batch collection durations and no. of records retrieved per request mean fewer calls to the LocalStack gateway [3] when polling an event-source [4]. | ||
# * LocalStack shutdown works because the LocalStack gateway shuts down and terminates the open connection. | ||
# * Provider lifecycle hooks have been added to ensure blocking long-poll calls are gracefully interrupted and returned. | ||
# | ||
# Pros (+) / Cons (-): | ||
# + Alleviates pressure on the gateway since each `ReceiveMessage` call only returns once we reach the desired `BatchSize` or the `WaitTimeSeconds` elapses. | ||
gregfurman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# + Matches the AWS behavior also using long-polling | ||
# - Blocks a LocalStack gateway thread (default 1k) for every open connection, which could lead to resource contention if used at scale. | ||
# | ||
# Refs / Notes: | ||
# [1] Amazon SQS short and long polling: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html | ||
# [2] PR (2025-02): https://github.com/localstack/localstack/pull/12002 | ||
# [3] Note: Under high volumes of requests, the LocalStack gateway becomes a major performance bottleneck. | ||
# [4] ESM blog mentioning long-polling: https://aws.amazon.com/de/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/ | ||
|
||
# TODO: Handle exceptions differently i.e QueueNotExist or ConnectionFailed should retry with backoff | ||
response = self.source_client.receive_message( | ||
QueueUrl=self.queue_url, | ||
MaxNumberOfMessages=min(self.batch_size, DEFAULT_MAX_RECEIVE_COUNT), | ||
WaitTimeSeconds=min(self.maximum_batching_window, DEFAULT_MAX_WAIT_TIME_SECONDS), | ||
MessageAttributeNames=["All"], | ||
MessageSystemAttributeNames=[MessageSystemAttributeName.All], | ||
# Override how many messages we can receive per call | ||
sqs_override_max_message_count=self.batch_size, | ||
# Override how long to wait until batching conditions are met | ||
sqs_override_wait_time_seconds=self.maximum_batching_window, | ||
) | ||
if messages := response.get("Messages"): | ||
LOG.debug("Polled %d events from %s", len(messages), self.source_arn) | ||
|
||
messages = response.get("Messages", []) | ||
if not messages: | ||
# TODO: Consider this case triggering longer wait-times (with backoff) between poll_events calls in the outer-loop. | ||
return | ||
|
||
LOG.debug("Polled %d events from %s", len(messages), self.source_arn) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's probably good for debugging to be explicit here. I'm just wondering whether it's intentional to log empty polls as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking it could be useful for debugging to log explicitly whether nothing was polled from the event source. Perhaps we can distinguish this better with a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My main thought is around avoiding log pollution (imagine 100 ESMs printing every second), but it's probably worth keeping for now. For example: it would help to identify whether jitter around the 1s interval is needed 💡 . The format is fine, being consistent is good 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should be very careful about this - many people have |
||
# TODO: implement invocation payload size quota | ||
# NOTE: Split up a batch into mini-batches of up to 2.5K records each. This is to prevent exceeding the 6MB size-limit | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we could move the |
||
# imposed on payloads sent to a Lambda as well as LocalStack Lambdas failing to handle large payloads efficiently. | ||
# See https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html#invocation-eventsourcemapping-batching | ||
for message_batch in batched(messages, 2500): | ||
if len(message_batch) < len(messages): | ||
LOG.debug( | ||
"Splitting events from %s into mini-batch (%d/%d)", | ||
self.source_arn, | ||
len(message_batch), | ||
len(messages), | ||
) | ||
try: | ||
if self.is_fifo_queue: | ||
# TODO: think about starvation behavior because once failing message could block other groups | ||
fifo_groups = split_by_message_group_id(messages) | ||
fifo_groups = split_by_message_group_id(message_batch) | ||
for fifo_group_messages in fifo_groups.values(): | ||
self.handle_messages(fifo_group_messages) | ||
else: | ||
self.handle_messages(messages) | ||
self.handle_messages(message_batch) | ||
|
||
# TODO: unify exception handling across pollers: should we catch and raise? | ||
except Exception as e: | ||
|
Uh oh!
There was an error while loading. Please reload this page.