-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
[ESM] Fix polling of SQS queue when batch size exceeds 10 #11945
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5fae1b2
10645b6
a15b87e
c2f1498
7490ef5
d4314f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ | |
|
||
import pytest | ||
from botocore.exceptions import ClientError | ||
from localstack_snapshot.snapshots.transformer import KeyValueBasedTransformer | ||
from localstack_snapshot.snapshots.transformer import KeyValueBasedTransformer, SortingTransformer | ||
|
||
from localstack import config | ||
from localstack.aws.api.lambda_ import InvalidParameterValueException, Runtime | ||
|
@@ -15,6 +15,7 @@ | |
from localstack.utils.testutil import check_expected_lambda_log_events_length, get_lambda_log_events | ||
from tests.aws.services.lambda_.functions import FUNCTIONS_PATH, lambda_integration | ||
from tests.aws.services.lambda_.test_lambda import ( | ||
TEST_LAMBDA_EVENT_SOURCE_MAPPING_SEND_MESSAGE, | ||
TEST_LAMBDA_PYTHON, | ||
TEST_LAMBDA_PYTHON_ECHO, | ||
TEST_LAMBDA_PYTHON_ECHO_VERSION_ENV, | ||
|
@@ -1042,6 +1043,182 @@ def test_sqs_event_source_mapping( | |
rs = aws_client.sqs.receive_message(QueueUrl=queue_url_1) | ||
assert rs.get("Messages", []) == [] | ||
|
||
@pytest.mark.parametrize("batch_size", [15, 100, 1_000, 10_000]) | ||
@markers.aws.validated | ||
def test_sqs_event_source_mapping_batch_size( | ||
self, | ||
create_lambda_function, | ||
sqs_create_queue, | ||
sqs_get_queue_arn, | ||
lambda_su_role, | ||
snapshot, | ||
cleanups, | ||
aws_client, | ||
batch_size, | ||
): | ||
snapshot.add_transformer(snapshot.transform.sqs_api()) | ||
snapshot.add_transformer(SortingTransformer("Records", lambda s: s["body"]), priority=-1) | ||
|
||
destination_queue_name = f"destination-queue-{short_uid()}" | ||
function_name = f"lambda_func-{short_uid()}" | ||
source_queue_name = f"source-queue-{short_uid()}" | ||
mapping_uuid = None | ||
|
||
destination_queue_url = sqs_create_queue(QueueName=destination_queue_name) | ||
create_lambda_function( | ||
func_name=function_name, | ||
handler_file=TEST_LAMBDA_EVENT_SOURCE_MAPPING_SEND_MESSAGE, | ||
runtime=Runtime.python3_12, | ||
envvars={"SQS_QUEUE_URL": destination_queue_url}, | ||
role=lambda_su_role, | ||
) | ||
|
||
queue_url = sqs_create_queue(QueueName=source_queue_name) | ||
queue_arn = sqs_get_queue_arn(queue_url) | ||
|
||
create_event_source_mapping_response = aws_client.lambda_.create_event_source_mapping( | ||
EventSourceArn=queue_arn, | ||
FunctionName=function_name, | ||
MaximumBatchingWindowInSeconds=10 if is_aws_cloud() else 2, | ||
BatchSize=batch_size, | ||
) | ||
mapping_uuid = create_event_source_mapping_response["UUID"] | ||
cleanups.append(lambda: aws_client.lambda_.delete_event_source_mapping(UUID=mapping_uuid)) | ||
snapshot.match("create-event-source-mapping-response", create_event_source_mapping_response) | ||
_await_event_source_mapping_enabled(aws_client.lambda_, mapping_uuid) | ||
|
||
reponse_batch_send_10 = aws_client.sqs.send_message_batch( | ||
QueueUrl=queue_url, | ||
Entries=[{"Id": f"{i}-0", "MessageBody": f"{i}-0-message-{i}"} for i in range(10)], | ||
) | ||
snapshot.match("send-message-batch-result-10", reponse_batch_send_10) | ||
|
||
reponse_batch_send_5 = aws_client.sqs.send_message_batch( | ||
QueueUrl=queue_url, | ||
Entries=[{"Id": f"{i}-1", "MessageBody": f"{i}-1-message-{i}"} for i in range(5)], | ||
) | ||
snapshot.match("send-message-batch-result-5", reponse_batch_send_5) | ||
|
||
batches = [] | ||
|
||
def get_msg_from_q(): | ||
messages_to_delete = [] | ||
receive_message_response = aws_client.sqs.receive_message( | ||
QueueUrl=destination_queue_url, | ||
MaxNumberOfMessages=10, | ||
VisibilityTimeout=120, | ||
WaitTimeSeconds=5 if is_aws_cloud() else 1, | ||
) | ||
messages = receive_message_response.get("Messages", []) | ||
for message in messages: | ||
received_batch = json.loads(message["Body"]) | ||
batches.append(received_batch) | ||
messages_to_delete.append( | ||
{"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]} | ||
) | ||
|
||
aws_client.sqs.delete_message_batch( | ||
QueueUrl=destination_queue_url, Entries=messages_to_delete | ||
) | ||
assert sum([len(batch) for batch in batches]) == 15 | ||
return [message for batch in batches for message in batch] | ||
|
||
events = retry(get_msg_from_q, retries=15, sleep=5) | ||
snapshot.match("Records", events) | ||
|
||
# FIXME: this fails due to ESM not correctly collecting and sending batches | ||
# where size exceeds 10 messages. | ||
@markers.snapshot.skip_snapshot_verify(paths=["$..total_batches_received"]) | ||
@markers.aws.validated | ||
def test_sqs_event_source_mapping_batching_reserved_concurrency( | ||
self, | ||
create_lambda_function, | ||
sqs_create_queue, | ||
sqs_get_queue_arn, | ||
lambda_su_role, | ||
snapshot, | ||
cleanups, | ||
aws_client, | ||
): | ||
snapshot.add_transformer(snapshot.transform.sqs_api()) | ||
snapshot.add_transformer(SortingTransformer("Records", lambda s: s["body"]), priority=-1) | ||
|
||
destination_queue_name = f"destination-queue-{short_uid()}" | ||
function_name = f"lambda_func-{short_uid()}" | ||
source_queue_name = f"source-queue-{short_uid()}" | ||
mapping_uuid = None | ||
|
||
destination_queue_url = sqs_create_queue(QueueName=destination_queue_name) | ||
create_lambda_function( | ||
func_name=function_name, | ||
handler_file=TEST_LAMBDA_EVENT_SOURCE_MAPPING_SEND_MESSAGE, | ||
runtime=Runtime.python3_12, | ||
envvars={"SQS_QUEUE_URL": destination_queue_url}, | ||
role=lambda_su_role, | ||
) | ||
|
||
# Prevent more than 2 Lambdas from being spun up at a time | ||
put_concurrency_resp = aws_client.lambda_.put_function_concurrency( | ||
FunctionName=function_name, ReservedConcurrentExecutions=2 | ||
) | ||
snapshot.match("put_concurrency_resp", put_concurrency_resp) | ||
|
||
queue_url = sqs_create_queue(QueueName=source_queue_name) | ||
queue_arn = sqs_get_queue_arn(queue_url) | ||
|
||
create_event_source_mapping_response = aws_client.lambda_.create_event_source_mapping( | ||
EventSourceArn=queue_arn, | ||
FunctionName=function_name, | ||
MaximumBatchingWindowInSeconds=10, | ||
BatchSize=20, | ||
ScalingConfig={ | ||
"MaximumConcurrency": 2 | ||
}, # Prevent more than 2 concurrent SQS workers from being spun up at a time | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: these refer to Lambda pollers I guess (also something we don't support in LS yet ;) |
||
) | ||
mapping_uuid = create_event_source_mapping_response["UUID"] | ||
cleanups.append(lambda: aws_client.lambda_.delete_event_source_mapping(UUID=mapping_uuid)) | ||
snapshot.match("create-event-source-mapping-response", create_event_source_mapping_response) | ||
_await_event_source_mapping_enabled(aws_client.lambda_, mapping_uuid) | ||
|
||
for b in range(3): | ||
aws_client.sqs.send_message_batch( | ||
QueueUrl=queue_url, | ||
Entries=[{"Id": f"{i}-{b}", "MessageBody": f"{i}-{b}-message"} for i in range(10)], | ||
) | ||
|
||
batches = [] | ||
|
||
def get_msg_from_q(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. idea for future: It feels we re-implement this pattern all over the place in different ways and should consider using an appropriate helper/fixture 🙈 |
||
messages_to_delete = [] | ||
receive_message_response = aws_client.sqs.receive_message( | ||
QueueUrl=destination_queue_url, | ||
MaxNumberOfMessages=10, | ||
VisibilityTimeout=120, | ||
WaitTimeSeconds=5, | ||
) | ||
messages = receive_message_response.get("Messages", []) | ||
for message in messages: | ||
received_batch = json.loads(message["Body"]) | ||
batches.append(received_batch) | ||
messages_to_delete.append( | ||
{"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]} | ||
) | ||
|
||
if messages_to_delete: | ||
aws_client.sqs.delete_message_batch( | ||
QueueUrl=destination_queue_url, Entries=messages_to_delete | ||
) | ||
assert sum([len(batch) for batch in batches]) == 30 | ||
return [message for batch in batches for message in batch] | ||
|
||
events = retry(get_msg_from_q, retries=15, sleep=5) | ||
|
||
# We expect to receive 2 batches where each batch contains some proportion of the | ||
# 30 messages we sent through, divided by the 20 ESM batch size. How this is split is | ||
# not determinable a priori so rather just snapshots the events and the no. of batches. | ||
snapshot.match("batch_info", {"total_batches_received": len(batches)}) | ||
snapshot.match("Records", events) | ||
|
||
@markers.aws.validated | ||
@pytest.mark.parametrize( | ||
"filter, item_matching, item_not_matching", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice coverage 💯
they took ~45s on my M1, but could be slower after we implement the window mechanism, hence it's good to adjust the window for LocalStack here 💡