Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0c13876
[ESM] Create SQS message collector for batched processing
gregfurman Nov 27, 2024
795ad69
[ESM] Change batch collection method to use threading
gregfurman Nov 28, 2024
3dd840f
Remove old implementation
gregfurman Nov 29, 2024
8f162b6
Address comments
gregfurman Nov 29, 2024
0237cab
Remove adaptive backoff and add size-based flushing
gregfurman Jan 9, 2025
6dd8fc5
[ESM] Handle polling of batches exceeding SQS message limits (#12118)
gregfurman Jan 20, 2025
e1ca826
[ESM] Use SQS long polling, override parameter, and set boto timeout
gregfurman Jan 21, 2025
d600448
Skip failing test
gregfurman Jan 21, 2025
4d9e087
Merge branch 'master' into fix/esm/batching
gregfurman Jan 23, 2025
1ae029c
Address comments
gregfurman Jan 29, 2025
c9df467
Remove last of SQS work
gregfurman Jan 29, 2025
9ff29b7
Revert long polling for MaximumBatchingWindowInSeconds duration
gregfurman Jan 29, 2025
fde11fc
Merge branch 'master' into fix/esm/batching
gregfurman Jan 30, 2025
fbefdd6
Merge remote-tracking branch 'origin/master' into fix/esm/batching
gregfurman Jan 30, 2025
e597a3c
Merge branch 'master' into fix/esm/batching
gregfurman Feb 4, 2025
3216d7a
Merge branch 'master' into fix/esm/batching
gregfurman Feb 4, 2025
4e65ee0
WIP: address comments
gregfurman Feb 6, 2025
f5ed105
Merge branch 'master' into fix/esm/batching
gregfurman Feb 10, 2025
3684339
Fix logging of mini-batches
gregfurman Feb 10, 2025
397d354
Merge remote-tracking branch 'origin/master' into fix/esm/batching
gregfurman Feb 10, 2025
f747b4a
Add batching window override for SQS long polling
gregfurman Feb 10, 2025
fa1a10a
remove unnecessary enumerate
gregfurman Feb 10, 2025
275f942
Integrate with new SQS changes
gregfurman Feb 10, 2025
fa3cc11
Allow outer loop to handle exception
gregfurman Feb 10, 2025
95782dc
docs: Add documentation on performance optimisations
gregfurman Feb 12, 2025
5edb709
Some documentation clarifications
gregfurman Feb 24, 2025
6ebbf2f
Merge branch 'master' into fix/esm/batching
gregfurman Feb 25, 2025
72ea973
Address final comments and rebase
gregfurman Feb 25, 2025
23d6cb6
fix: Load up SQS queue prior to polling in flaky test
gregfurman Feb 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
from localstack.services.lambda_.event_source_mapping.pollers.dynamodb_poller import DynamoDBPoller
from localstack.services.lambda_.event_source_mapping.pollers.kinesis_poller import KinesisPoller
from localstack.services.lambda_.event_source_mapping.pollers.poller import Poller
from localstack.services.lambda_.event_source_mapping.pollers.sqs_poller import SqsPoller
from localstack.services.lambda_.event_source_mapping.pollers.sqs_poller import (
DEFAULT_MAX_WAIT_TIME_SECONDS,
SqsPoller,
)
from localstack.services.lambda_.event_source_mapping.senders.lambda_sender import LambdaSender
from localstack.utils.aws.arns import parse_arn
from localstack.utils.aws.client_types import ServicePrincipal
Expand Down Expand Up @@ -111,6 +114,24 @@ def get_esm_worker(self) -> EsmWorker:
role_arn=self.function_role_arn,
service_principal=ServicePrincipal.lambda_,
source_arn=self.esm_config["FunctionArn"],
client_config=botocore.config.Config(
retries={"total_max_attempts": 1}, # Disable retries
read_timeout=max(
self.esm_config.get(
"MaximumBatchingWindowInSeconds", DEFAULT_MAX_WAIT_TIME_SECONDS
),
60,
)
+ 5, # Extend read timeout (with 5s buffer) for long-polling
# Setting tcp_keepalive to true allows the boto client to keep
# a long-running TCP connection when making calls to the gateway.
# This ensures long-poll calls do not prematurely have their socket
# connection marked as stale if no data is transferred for a given
# period of time hence preventing premature drops or resets of the
# connection.
# See https://aws.amazon.com/blogs/networking-and-content-delivery/implementing-long-running-tcp-connections-within-vpc-networking/
tcp_keepalive=True,
),
)

filter_criteria = self.esm_config.get("FilterCriteria", {"Filters": []})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,21 @@
Poller,
parse_batch_item_failures,
)
from localstack.services.sqs.constants import HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT
from localstack.services.lambda_.event_source_mapping.senders.sender_utils import (
batched,
)
from localstack.services.sqs.constants import (
HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT,
HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS,
)
from localstack.utils.aws.arns import parse_arn
from localstack.utils.strings import first_char_to_lower

LOG = logging.getLogger(__name__)

DEFAULT_MAX_RECEIVE_COUNT = 10
# See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html
DEFAULT_MAX_WAIT_TIME_SECONDS = 20


class SqsPoller(Poller):
Expand Down Expand Up @@ -71,13 +79,34 @@ def handle_message_count_override(params, context, **kwargs):

context[HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT] = str(requested_count)

def handle_message_wait_time_seconds_override(params, context, **kwargs):
requested_wait = params.pop("sqs_override_wait_time_seconds", None)
if not requested_wait or requested_wait <= DEFAULT_MAX_WAIT_TIME_SECONDS:
return

context[HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS] = str(requested_wait)

def handle_inject_headers(params, context, **kwargs):
if override := context.pop(HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT, None):
params["headers"][HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT] = override
if override_message_count := context.pop(
HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT, None
):
params["headers"][HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT] = (
override_message_count
)

if override_wait_time := context.pop(
HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS, None
):
params["headers"][HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS] = (
override_wait_time
)

event_system.register(
"provide-client-params.sqs.ReceiveMessage", handle_message_count_override
)
event_system.register(
"provide-client-params.sqs.ReceiveMessage", handle_message_wait_time_seconds_override
)
# Since we delete SQS messages after processing, this allows us to remove up to 10K entries at a time.
event_system.register(
"provide-client-params.sqs.DeleteMessageBatch", handle_message_count_override
Expand All @@ -98,30 +127,62 @@ def event_source(self) -> str:
return "aws:sqs"

def poll_events(self) -> None:
# SQS pipe source: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html
# "The 9 Ways an SQS Message can be Deleted": https://lucvandonkersgoed.com/2022/01/20/the-9-ways-an-sqs-message-can-be-deleted/
# TODO: implement batch window expires based on MaximumBatchingWindowInSeconds
# TODO: implement invocation payload size quota
# TODO: consider long-polling vs. short-polling trade-off. AWS uses long-polling:
# https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html#pipes-sqs-scaling
# In order to improve performance, we've adopted long-polling for the SQS poll operation `ReceiveMessage` [1].
# * Our LS-internal optimizations leverage custom boto-headers to set larger batch sizes and longer wait times than what the AWS API allows [2].
# * Higher batch collection durations and no. of records retrieved per request mean fewer calls to the LocalStack gateway [3] when polling an event-source [4].
# * LocalStack shutdown works because the LocalStack gateway shuts down and terminates the open connection.
# * Provider lifecycle hooks have been added to ensure blocking long-poll calls are gracefully interrupted and returned.
#
# Pros (+) / Cons (-):
# + Alleviates pressure on the gateway since each `ReceiveMessage` call only returns once we reach the desired `BatchSize` or the `WaitTimeSeconds` elapses.
# + Matches the AWS behavior also using long-polling
# - Blocks a LocalStack gateway thread (default 1k) for every open connection, which could lead to resource contention if used at scale.
#
# Refs / Notes:
# [1] Amazon SQS short and long polling: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html
# [2] PR (2025-02): https://github.com/localstack/localstack/pull/12002
# [3] Note: Under high volumes of requests, the LocalStack gateway becomes a major performance bottleneck.
# [4] ESM blog mentioning long-polling: https://aws.amazon.com/de/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/

# TODO: Handle exceptions differently i.e QueueNotExist or ConnectionFailed should retry with backoff
response = self.source_client.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=min(self.batch_size, DEFAULT_MAX_RECEIVE_COUNT),
WaitTimeSeconds=min(self.maximum_batching_window, DEFAULT_MAX_WAIT_TIME_SECONDS),
MessageAttributeNames=["All"],
MessageSystemAttributeNames=[MessageSystemAttributeName.All],
# Override how many messages we can receive per call
sqs_override_max_message_count=self.batch_size,
# Override how long to wait until batching conditions are met
sqs_override_wait_time_seconds=self.maximum_batching_window,
)
if messages := response.get("Messages"):
LOG.debug("Polled %d events from %s", len(messages), self.source_arn)

messages = response.get("Messages", [])
if not messages:
# TODO: Consider this case triggering longer wait-times (with backoff) between poll_events calls in the outer-loop.
return

LOG.debug("Polled %d events from %s", len(messages), self.source_arn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably good for debugging to be explicit here. I'm just wondering whether it's intentional to log empty polls as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking it could be useful for debugging to log explicitly whether nothing was polled from the event source. Perhaps we can distinguish this better with a "Polled no events from %s" -- wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main thought is around avoiding log pollution (imagine 100 ESMs printing every second), but it's probably worth keeping for now. For example: it would help to identify whether jitter around the 1s interval is needed 💡 .

The format is fine, being consistent is good 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be very careful about this - many people have DEBUG=1 by default, and this can be a lot. I agree it is not urgent to remove - but especially the message for no events could be removed in the future.

# TODO: implement invocation payload size quota
# NOTE: Split up a batch into mini-batches of up to 2.5K records each. This is to prevent exceeding the 6MB size-limit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could move the # TODO: implement invocation payload size quota here, clarifying that's only a heuristic and not a perfect parity implementation

# imposed on payloads sent to a Lambda as well as LocalStack Lambdas failing to handle large payloads efficiently.
# See https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html#invocation-eventsourcemapping-batching
for message_batch in batched(messages, 2500):
if len(message_batch) < len(messages):
LOG.debug(
"Splitting events from %s into mini-batch (%d/%d)",
self.source_arn,
len(message_batch),
len(messages),
)
try:
if self.is_fifo_queue:
# TODO: think about starvation behavior because once failing message could block other groups
fifo_groups = split_by_message_group_id(messages)
fifo_groups = split_by_message_group_id(message_batch)
for fifo_group_messages in fifo_groups.values():
self.handle_messages(fifo_group_messages)
else:
self.handle_messages(messages)
self.handle_messages(message_batch)

# TODO: unify exception handling across pollers: should we catch and raise?
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import math
import time

import pytest
Expand Down Expand Up @@ -1077,9 +1078,6 @@ def get_msg_from_q():
events = retry(get_msg_from_q, retries=15, sleep=5)
snapshot.match("Records", events)

# FIXME: this fails due to ESM not correctly collecting and sending batches
# where size exceeds 10 messages.
@markers.snapshot.skip_snapshot_verify(paths=["$..total_batches_received"])
@markers.aws.validated
def test_sqs_event_source_mapping_batching_reserved_concurrency(
self,
Expand Down Expand Up @@ -1117,10 +1115,16 @@ def test_sqs_event_source_mapping_batching_reserved_concurrency(
queue_url = sqs_create_queue(QueueName=source_queue_name)
queue_arn = sqs_get_queue_arn(queue_url)

for b in range(3):
aws_client.sqs.send_message_batch(
QueueUrl=queue_url,
Entries=[{"Id": f"{i}-{b}", "MessageBody": f"{i}-{b}-message"} for i in range(10)],
)

create_event_source_mapping_response = aws_client.lambda_.create_event_source_mapping(
EventSourceArn=queue_arn,
FunctionName=function_name,
MaximumBatchingWindowInSeconds=10,
MaximumBatchingWindowInSeconds=1,
BatchSize=20,
ScalingConfig={
"MaximumConcurrency": 2
Expand All @@ -1131,12 +1135,6 @@ def test_sqs_event_source_mapping_batching_reserved_concurrency(
snapshot.match("create-event-source-mapping-response", create_event_source_mapping_response)
_await_event_source_mapping_enabled(aws_client.lambda_, mapping_uuid)

for b in range(3):
aws_client.sqs.send_message_batch(
QueueUrl=queue_url,
Entries=[{"Id": f"{i}-{b}", "MessageBody": f"{i}-{b}-message"} for i in range(10)],
)

batches = []

def get_msg_from_q():
Expand Down Expand Up @@ -1566,13 +1564,7 @@ def test_duplicate_event_source_mappings(
20,
100,
1_000,
pytest.param(
10_000,
marks=pytest.mark.skip(
reason="Flushing based on payload sizes not yet implemented so large payloads are causing issues."
),
id="10000",
),
10_000,
],
)
@markers.aws.only_localstack
Expand Down Expand Up @@ -1617,17 +1609,72 @@ def test_sqs_event_source_mapping_batch_size_override(
cleanups.append(lambda: aws_client.lambda_.delete_event_source_mapping(UUID=mapping_uuid))
_await_event_source_mapping_enabled(aws_client.lambda_, mapping_uuid)

expected_invocations = math.ceil(batch_size / 2500)
events = retry(
check_expected_lambda_log_events_length,
retries=10,
sleep=1,
function_name=function_name,
expected_length=expected_invocations,
logs_client=aws_client.logs,
)

assert sum(len(event.get("Records", [])) for event in events) == batch_size

rs = aws_client.sqs.receive_message(QueueUrl=queue_url)
assert rs.get("Messages", []) == []

@markers.aws.only_localstack
def test_sqs_event_source_mapping_batching_window_size_override(
self,
create_lambda_function,
sqs_create_queue,
sqs_get_queue_arn,
lambda_su_role,
cleanups,
aws_client,
):
function_name = f"lambda_func-{short_uid()}"
queue_name = f"queue-{short_uid()}"
mapping_uuid = None

create_lambda_function(
func_name=function_name,
handler_file=TEST_LAMBDA_PYTHON_ECHO,
runtime=Runtime.python3_12,
role=lambda_su_role,
)
queue_url = sqs_create_queue(QueueName=queue_name)
queue_arn = sqs_get_queue_arn(queue_url)

create_event_source_mapping_response = aws_client.lambda_.create_event_source_mapping(
EventSourceArn=queue_arn,
FunctionName=function_name,
MaximumBatchingWindowInSeconds=30,
BatchSize=10_000,
)
mapping_uuid = create_event_source_mapping_response["UUID"]
cleanups.append(lambda: aws_client.lambda_.delete_event_source_mapping(UUID=mapping_uuid))
_await_event_source_mapping_enabled(aws_client.lambda_, mapping_uuid)

# Send 4 messages and delay their arrival by 5, 10, 15, and 25 seconds respectively
for s in [5, 10, 15, 25]:
aws_client.sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({"delayed": f"{s}"}),
)

events = retry(
check_expected_lambda_log_events_length,
retries=60,
sleep=1,
function_name=function_name,
expected_length=1,
logs_client=aws_client.logs,
)

assert len(events) == 1
assert len(events[0].get("Records", [])) == batch_size
assert len(events[0].get("Records", [])) == 4

rs = aws_client.sqs.receive_message(QueueUrl=queue_url)
assert rs.get("Messages", []) == []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2033,7 +2033,7 @@
}
},
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::TestSQSEventSourceMapping::test_sqs_event_source_mapping_batching_reserved_concurrency": {
"recorded-date": "29-11-2024, 13:29:56",
"recorded-date": "25-02-2025, 16:35:01",
"recorded-content": {
"put_concurrency_resp": {
"ReservedConcurrentExecutions": 2,
Expand All @@ -2049,7 +2049,7 @@
"FunctionArn": "arn:<partition>:lambda:<region>:111111111111:function:<resource:2>",
"FunctionResponseTypes": [],
"LastModified": "<datetime>",
"MaximumBatchingWindowInSeconds": 10,
"MaximumBatchingWindowInSeconds": 1,
"ScalingConfig": {
"MaximumConcurrency": 2
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
"last_validated_date": "2024-12-11T13:42:55+00:00"
},
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::TestSQSEventSourceMapping::test_sqs_event_source_mapping_batching_reserved_concurrency": {
"last_validated_date": "2024-11-29T13:29:53+00:00"
"last_validated_date": "2025-02-25T16:34:59+00:00"
},
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::TestSQSEventSourceMapping::test_sqs_event_source_mapping_update": {
"last_validated_date": "2024-10-12T13:45:43+00:00"
Expand Down
Loading