Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ def poll_events(self) -> None:
# https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html#pipes-sqs-scaling
response = self.source_client.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=self.sqs_queue_parameters["BatchSize"],
MaxNumberOfMessages=min(
self.sqs_queue_parameters["BatchSize"], DEFAULT_MAX_RECEIVE_COUNT
), # BatchSize cannot exceed 10
MessageAttributeNames=["All"],
MessageSystemAttributeNames=[MessageSystemAttributeName.All],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest
from botocore.exceptions import ClientError
from localstack_snapshot.snapshots.transformer import KeyValueBasedTransformer
from localstack_snapshot.snapshots.transformer import KeyValueBasedTransformer, SortingTransformer

from localstack import config
from localstack.aws.api.lambda_ import InvalidParameterValueException, Runtime
Expand All @@ -15,6 +15,7 @@
from localstack.utils.testutil import check_expected_lambda_log_events_length, get_lambda_log_events
from tests.aws.services.lambda_.functions import FUNCTIONS_PATH, lambda_integration
from tests.aws.services.lambda_.test_lambda import (
TEST_LAMBDA_EVENT_SOURCE_MAPPING_SEND_MESSAGE,
TEST_LAMBDA_PYTHON,
TEST_LAMBDA_PYTHON_ECHO,
TEST_LAMBDA_PYTHON_ECHO_VERSION_ENV,
Expand Down Expand Up @@ -1042,6 +1043,182 @@ def test_sqs_event_source_mapping(
rs = aws_client.sqs.receive_message(QueueUrl=queue_url_1)
assert rs.get("Messages", []) == []

@pytest.mark.parametrize("batch_size", [15, 100, 1_000, 10_000])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice coverage 💯

they took ~45s on my M1, but could be slower after we implement the window mechanism, hence it's good to adjust the window for LocalStack here 💡

@markers.aws.validated
def test_sqs_event_source_mapping_batch_size(
self,
create_lambda_function,
sqs_create_queue,
sqs_get_queue_arn,
lambda_su_role,
snapshot,
cleanups,
aws_client,
batch_size,
):
snapshot.add_transformer(snapshot.transform.sqs_api())
snapshot.add_transformer(SortingTransformer("Records", lambda s: s["body"]), priority=-1)

destination_queue_name = f"destination-queue-{short_uid()}"
function_name = f"lambda_func-{short_uid()}"
source_queue_name = f"source-queue-{short_uid()}"
mapping_uuid = None

destination_queue_url = sqs_create_queue(QueueName=destination_queue_name)
create_lambda_function(
func_name=function_name,
handler_file=TEST_LAMBDA_EVENT_SOURCE_MAPPING_SEND_MESSAGE,
runtime=Runtime.python3_12,
envvars={"SQS_QUEUE_URL": destination_queue_url},
role=lambda_su_role,
)

queue_url = sqs_create_queue(QueueName=source_queue_name)
queue_arn = sqs_get_queue_arn(queue_url)

create_event_source_mapping_response = aws_client.lambda_.create_event_source_mapping(
EventSourceArn=queue_arn,
FunctionName=function_name,
MaximumBatchingWindowInSeconds=10 if is_aws_cloud() else 2,
BatchSize=batch_size,
)
mapping_uuid = create_event_source_mapping_response["UUID"]
cleanups.append(lambda: aws_client.lambda_.delete_event_source_mapping(UUID=mapping_uuid))
snapshot.match("create-event-source-mapping-response", create_event_source_mapping_response)
_await_event_source_mapping_enabled(aws_client.lambda_, mapping_uuid)

reponse_batch_send_10 = aws_client.sqs.send_message_batch(
QueueUrl=queue_url,
Entries=[{"Id": f"{i}-0", "MessageBody": f"{i}-0-message-{i}"} for i in range(10)],
)
snapshot.match("send-message-batch-result-10", reponse_batch_send_10)

reponse_batch_send_5 = aws_client.sqs.send_message_batch(
QueueUrl=queue_url,
Entries=[{"Id": f"{i}-1", "MessageBody": f"{i}-1-message-{i}"} for i in range(5)],
)
snapshot.match("send-message-batch-result-5", reponse_batch_send_5)

batches = []

def get_msg_from_q():
messages_to_delete = []
receive_message_response = aws_client.sqs.receive_message(
QueueUrl=destination_queue_url,
MaxNumberOfMessages=10,
VisibilityTimeout=120,
WaitTimeSeconds=5 if is_aws_cloud() else 1,
)
messages = receive_message_response.get("Messages", [])
for message in messages:
received_batch = json.loads(message["Body"])
batches.append(received_batch)
messages_to_delete.append(
{"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]}
)

aws_client.sqs.delete_message_batch(
QueueUrl=destination_queue_url, Entries=messages_to_delete
)
assert sum([len(batch) for batch in batches]) == 15
return [message for batch in batches for message in batch]

events = retry(get_msg_from_q, retries=15, sleep=5)
snapshot.match("Records", events)

# FIXME: this fails due to ESM not correctly collecting and sending batches
# where size exceeds 10 messages.
@markers.snapshot.skip_snapshot_verify(paths=["$..total_batches_received"])
@markers.aws.validated
def test_sqs_event_source_mapping_batching_reserved_concurrency(
self,
create_lambda_function,
sqs_create_queue,
sqs_get_queue_arn,
lambda_su_role,
snapshot,
cleanups,
aws_client,
):
snapshot.add_transformer(snapshot.transform.sqs_api())
snapshot.add_transformer(SortingTransformer("Records", lambda s: s["body"]), priority=-1)

destination_queue_name = f"destination-queue-{short_uid()}"
function_name = f"lambda_func-{short_uid()}"
source_queue_name = f"source-queue-{short_uid()}"
mapping_uuid = None

destination_queue_url = sqs_create_queue(QueueName=destination_queue_name)
create_lambda_function(
func_name=function_name,
handler_file=TEST_LAMBDA_EVENT_SOURCE_MAPPING_SEND_MESSAGE,
runtime=Runtime.python3_12,
envvars={"SQS_QUEUE_URL": destination_queue_url},
role=lambda_su_role,
)

# Prevent more than 2 Lambdas from being spun up at a time
put_concurrency_resp = aws_client.lambda_.put_function_concurrency(
FunctionName=function_name, ReservedConcurrentExecutions=2
)
snapshot.match("put_concurrency_resp", put_concurrency_resp)

queue_url = sqs_create_queue(QueueName=source_queue_name)
queue_arn = sqs_get_queue_arn(queue_url)

create_event_source_mapping_response = aws_client.lambda_.create_event_source_mapping(
EventSourceArn=queue_arn,
FunctionName=function_name,
MaximumBatchingWindowInSeconds=10,
BatchSize=20,
ScalingConfig={
"MaximumConcurrency": 2
}, # Prevent more than 2 concurrent SQS workers from being spun up at a time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these refer to Lambda pollers I guess (also something we don't support in LS yet ;)

)
mapping_uuid = create_event_source_mapping_response["UUID"]
cleanups.append(lambda: aws_client.lambda_.delete_event_source_mapping(UUID=mapping_uuid))
snapshot.match("create-event-source-mapping-response", create_event_source_mapping_response)
_await_event_source_mapping_enabled(aws_client.lambda_, mapping_uuid)

for b in range(3):
aws_client.sqs.send_message_batch(
QueueUrl=queue_url,
Entries=[{"Id": f"{i}-{b}", "MessageBody": f"{i}-{b}-message"} for i in range(10)],
)

batches = []

def get_msg_from_q():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idea for future: It feels we re-implement this pattern all over the place in different ways and should consider using an appropriate helper/fixture 🙈

messages_to_delete = []
receive_message_response = aws_client.sqs.receive_message(
QueueUrl=destination_queue_url,
MaxNumberOfMessages=10,
VisibilityTimeout=120,
WaitTimeSeconds=5,
)
messages = receive_message_response.get("Messages", [])
for message in messages:
received_batch = json.loads(message["Body"])
batches.append(received_batch)
messages_to_delete.append(
{"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]}
)

if messages_to_delete:
aws_client.sqs.delete_message_batch(
QueueUrl=destination_queue_url, Entries=messages_to_delete
)
assert sum([len(batch) for batch in batches]) == 30
return [message for batch in batches for message in batch]

events = retry(get_msg_from_q, retries=15, sleep=5)

# We expect to receive 2 batches where each batch contains some proportion of the
# 30 messages we sent through, divided by the 20 ESM batch size. How this is split is
# not determinable a priori so rather just snapshots the events and the no. of batches.
snapshot.match("batch_info", {"total_batches_received": len(batches)})
snapshot.match("Records", events)

@markers.aws.validated
@pytest.mark.parametrize(
"filter, item_matching, item_not_matching",
Expand Down
Loading
Loading