Skip to content

Commit ccddefd

Browse files
authored
Fix flaky lambda test event retry reserved concurrency (#12441)
1 parent 90ccf60 commit ccddefd

File tree

5 files changed

+157
-33
lines changed

5 files changed

+157
-33
lines changed

localstack-core/localstack/testing/pytest/fixtures.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,38 @@
6767
from mypy_boto3_sqs.type_defs import MessageTypeDef
6868

6969

70+
@pytest.fixture(scope="session")
71+
def aws_client_no_retry(aws_client_factory):
72+
"""
73+
This fixture can be used to obtain Boto clients with disabled retries for testing.
74+
botocore docs: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#configuring-a-retry-mode
75+
76+
Use this client when testing exceptions (i.e., with pytest.raises(...)) or expected errors (e.g., status code 500)
77+
to avoid unnecessary retries and mitigate test flakiness if the tested error condition is time-bound.
78+
79+
This client is needed for the following errors, exceptions, and HTTP status codes defined by the legacy retry mode:
80+
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#legacy-retry-mode
81+
General socket/connection errors:
82+
* ConnectionError
83+
* ConnectionClosedError
84+
* ReadTimeoutError
85+
* EndpointConnectionError
86+
87+
Service-side throttling/limit errors and exceptions:
88+
* Throttling
89+
* ThrottlingException
90+
* ThrottledException
91+
* RequestThrottledException
92+
* ProvisionedThroughputExceededException
93+
94+
HTTP status codes: 429, 500, 502, 503, 504, and 509
95+
96+
Hence, this client is not needed for a `ResourceNotFound` error (but it doesn't harm).
97+
"""
98+
no_retry_config = botocore.config.Config(retries={"max_attempts": 1})
99+
return aws_client_factory(config=no_retry_config)
100+
101+
70102
@pytest.fixture(scope="class")
71103
def aws_http_client_factory(aws_session):
72104
"""
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import datetime
2+
import json
3+
import os
4+
import time
5+
6+
import boto3
7+
8+
sqs_client = boto3.client("sqs", endpoint_url=os.environ.get("AWS_ENDPOINT_URL"))
9+
10+
11+
def handler(event, context):
12+
"""Example: Send a message to the queue_url provided in notify and then wait for 7 seconds.
13+
The message includes the value of the environment variable called "FUNCTION_VARIANT".
14+
aws_client.lambda_.invoke(
15+
FunctionName=fn_arn,
16+
InvocationType="Event",
17+
Payload=json.dumps({"notify": queue_url, "env_var": "FUNCTION_VARIANT", "label": "01-sleep", "wait": 7})
18+
)
19+
20+
Parameters:
21+
* `notify`: SQS queue URL to notify a message
22+
* `env_var`: Name of the environment variable that should be included in the message
23+
* `label`: Label to be included in the message
24+
* `wait`: Time in seconds to sleep
25+
"""
26+
if queue_url := event.get("notify"):
27+
message = {
28+
"request_id": context.aws_request_id,
29+
"timestamp": datetime.datetime.now(datetime.UTC).isoformat(),
30+
}
31+
if env_var := event.get("env_var"):
32+
message[env_var] = os.environ[env_var]
33+
if label := event.get("label"):
34+
message["label"] = label
35+
print(f"Notify message: {message}")
36+
sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(message))
37+
38+
if wait_time := event.get("wait"):
39+
print(f"Sleeping for {wait_time} seconds ...")
40+
time.sleep(wait_time)

tests/aws/services/lambda_/test_lambda.py

Lines changed: 82 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@
3030
get_invoke_init_type,
3131
update_done,
3232
)
33-
from localstack.testing.aws.util import create_client_with_keys, is_aws_cloud
33+
from localstack.testing.aws.util import (
34+
create_client_with_keys,
35+
is_aws_cloud,
36+
)
3437
from localstack.testing.pytest import markers
3538
from localstack.testing.snapshots.transformer_utility import PATTERN_UUID
3639
from localstack.utils import files, platform, testutil
@@ -123,6 +126,7 @@
123126
TEST_LAMBDA_PYTHON_MULTIPLE_HANDLERS = os.path.join(
124127
THIS_FOLDER, "functions/lambda_multiple_handlers.py"
125128
)
129+
TEST_LAMBDA_NOTIFIER = os.path.join(THIS_FOLDER, "functions/lambda_notifier.py")
126130

127131
PYTHON_TEST_RUNTIMES = RUNTIMES_AGGREGATED["python"]
128132
NODE_TEST_RUNTIMES = RUNTIMES_AGGREGATED["nodejs"]
@@ -2614,18 +2618,37 @@ def _invoke_lambda():
26142618
assert not errored
26152619

26162620
@markers.aws.validated
2617-
@pytest.mark.skip(reason="flaky")
2618-
def test_reserved_concurrency_async_queue(self, create_lambda_function, snapshot, aws_client):
2621+
def test_reserved_concurrency_async_queue(
2622+
self,
2623+
create_lambda_function,
2624+
sqs_create_queue,
2625+
sqs_collect_messages,
2626+
snapshot,
2627+
aws_client,
2628+
aws_client_no_retry,
2629+
):
2630+
"""Test async/event invoke retry behavior due to limited reserved concurrency.
2631+
Timeline:
2632+
1) Set ReservedConcurrentExecutions=1
2633+
2) sync_invoke_warm_up => ok
2634+
3) async_invoke_one => ok
2635+
4) async_invoke_two => gets retried
2636+
5) sync invoke => fails with TooManyRequestsException
2637+
6) Set ReservedConcurrentExecutions=3
2638+
7) sync_invoke_final => ok
2639+
"""
26192640
min_concurrent_executions = 10 + 3
26202641
check_concurrency_quota(aws_client, min_concurrent_executions)
26212642

2643+
queue_url = sqs_create_queue()
2644+
26222645
func_name = f"test_lambda_{short_uid()}"
26232646
create_lambda_function(
26242647
func_name=func_name,
2625-
handler_file=TEST_LAMBDA_INTROSPECT_PYTHON,
2648+
handler_file=TEST_LAMBDA_NOTIFIER,
26262649
runtime=Runtime.python3_12,
26272650
client=aws_client.lambda_,
2628-
timeout=20,
2651+
timeout=30,
26292652
)
26302653

26312654
fn = aws_client.lambda_.get_function_configuration(
@@ -2641,46 +2664,75 @@ def test_reserved_concurrency_async_queue(self, create_lambda_function, snapshot
26412664
snapshot.match("put_fn_concurrency", put_fn_concurrency)
26422665

26432666
# warm up the Lambda function to mitigate flakiness due to cold start
2644-
aws_client.lambda_.invoke(FunctionName=fn_arn, InvocationType="RequestResponse")
2667+
sync_invoke_warm_up = aws_client.lambda_.invoke(
2668+
FunctionName=fn_arn, InvocationType="RequestResponse"
2669+
)
2670+
assert "FunctionError" not in sync_invoke_warm_up
26452671

2646-
# simultaneously queue two event invocations
2647-
# The first event invoke gets executed immediately
2648-
aws_client.lambda_.invoke(
2649-
FunctionName=fn_arn, InvocationType="Event", Payload=json.dumps({"wait": 15})
2672+
# Immediately queue two event invocations:
2673+
# 1) The first event invoke gets executed immediately
2674+
async_invoke_one = aws_client.lambda_.invoke(
2675+
FunctionName=fn_arn,
2676+
InvocationType="Event",
2677+
Payload=json.dumps({"notify": queue_url, "wait": 15}),
26502678
)
2651-
# The second event invoke gets throttled and re-scheduled with an internal retry
2652-
aws_client.lambda_.invoke(
2653-
FunctionName=fn_arn, InvocationType="Event", Payload=json.dumps({"wait": 10})
2679+
assert "FunctionError" not in async_invoke_one
2680+
# 2) The second event invoke gets throttled and re-scheduled with an internal retry
2681+
async_invoke_two = aws_client.lambda_.invoke(
2682+
FunctionName=fn_arn,
2683+
InvocationType="Event",
2684+
Payload=json.dumps({"notify": queue_url}),
26542685
)
2686+
assert "FunctionError" not in async_invoke_two
26552687

2656-
# Ensure one event invocation is being executed and the other one is in the queue.
2657-
time.sleep(5)
2688+
# Wait until the first async invoke is being executed while the second async invoke is in the queue.
2689+
messages = sqs_collect_messages(
2690+
queue_url,
2691+
expected=1,
2692+
timeout=15,
2693+
)
2694+
async_invoke_one_notification = json.loads(messages[0]["Body"])
2695+
assert (
2696+
async_invoke_one_notification["request_id"]
2697+
== async_invoke_one["ResponseMetadata"]["RequestId"]
2698+
)
26582699

26592700
# Synchronous invocations raise an exception because insufficient reserved concurrency is available
2701+
# It is important to disable botocore retries because the concurrency limit is time-bound because it only
2702+
# triggers as long as the first async invoke is processing!
26602703
with pytest.raises(aws_client.lambda_.exceptions.TooManyRequestsException) as e:
2661-
aws_client.lambda_.invoke(FunctionName=fn_arn, InvocationType="RequestResponse")
2704+
aws_client_no_retry.lambda_.invoke(
2705+
FunctionName=fn_arn, InvocationType="RequestResponse"
2706+
)
26622707
snapshot.match("too_many_requests_exc", e.value.response)
26632708

26642709
# ReservedConcurrentExecutions=2 is insufficient because the throttled async event invoke might be
26652710
# re-scheduled before the synchronous invoke while the first async invoke is still running.
26662711
aws_client.lambda_.put_function_concurrency(
26672712
FunctionName=func_name, ReservedConcurrentExecutions=3
26682713
)
2669-
aws_client.lambda_.invoke(FunctionName=fn_arn, InvocationType="RequestResponse")
2670-
2671-
def assert_events():
2672-
log_events = aws_client.logs.filter_log_events(
2673-
logGroupName=f"/aws/lambda/{func_name}",
2674-
)["events"]
2675-
invocation_count = len(
2676-
[event["message"] for event in log_events if event["message"].startswith("REPORT")]
2677-
)
2678-
assert invocation_count == 4
2679-
2680-
retry(assert_events, retries=120, sleep=2)
2714+
# Invocations succeed after raising reserved concurrency
2715+
sync_invoke_final = aws_client.lambda_.invoke(
2716+
FunctionName=fn_arn,
2717+
InvocationType="RequestResponse",
2718+
Payload=json.dumps({"notify": queue_url}),
2719+
)
2720+
assert "FunctionError" not in sync_invoke_final
26812721

2682-
# TODO: snapshot logs & request ID for correlation after request id gets propagated
2683-
# https://github.com/localstack/localstack/pull/7874
2722+
# Contains the re-queued `async_invoke_two` and the `sync_invoke_final`, but the order might differ
2723+
# depending on whether invoke_two gets re-schedule before or after the final invoke.
2724+
# AWS docs: https://docs.aws.amazon.com/lambda/latest/dg/invocation-async-error-handling.html
2725+
# "The retry interval increases exponentially from 1 second after the first attempt to a maximum of 5 minutes."
2726+
final_messages = sqs_collect_messages(
2727+
queue_url,
2728+
expected=2,
2729+
timeout=20,
2730+
)
2731+
invoked_request_ids = {json.loads(msg["Body"])["request_id"] for msg in final_messages}
2732+
assert {
2733+
async_invoke_two["ResponseMetadata"]["RequestId"],
2734+
sync_invoke_final["ResponseMetadata"]["RequestId"],
2735+
} == invoked_request_ids
26842736

26852737
@markers.snapshot.skip_snapshot_verify(paths=["$..Attributes.AWSTraceHeader"])
26862738
@markers.aws.validated

tests/aws/services/lambda_/test_lambda.snapshot.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2982,7 +2982,7 @@
29822982
}
29832983
},
29842984
"tests/aws/services/lambda_/test_lambda.py::TestLambdaConcurrency::test_reserved_concurrency_async_queue": {
2985-
"recorded-date": "08-04-2024, 17:07:59",
2985+
"recorded-date": "26-03-2025, 10:53:54",
29862986
"recorded-content": {
29872987
"fn": {
29882988
"Architectures": [
@@ -3019,7 +3019,7 @@
30193019
"OptimizationStatus": "Off"
30203020
},
30213021
"State": "Active",
3022-
"Timeout": 20,
3022+
"Timeout": 30,
30233023
"TracingConfig": {
30243024
"Mode": "PassThrough"
30253025
},

tests/aws/services/lambda_/test_lambda.validation.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
"last_validated_date": "2024-04-08T17:08:10+00:00"
7979
},
8080
"tests/aws/services/lambda_/test_lambda.py::TestLambdaConcurrency::test_reserved_concurrency_async_queue": {
81-
"last_validated_date": "2024-04-08T17:07:56+00:00"
81+
"last_validated_date": "2025-03-26T10:54:29+00:00"
8282
},
8383
"tests/aws/services/lambda_/test_lambda.py::TestLambdaConcurrency::test_reserved_provisioned_overlap": {
8484
"last_validated_date": "2024-04-08T17:10:36+00:00"

0 commit comments

Comments
 (0)