30
30
get_invoke_init_type ,
31
31
update_done ,
32
32
)
33
- from localstack .testing .aws .util import create_client_with_keys , is_aws_cloud
33
+ from localstack .testing .aws .util import (
34
+ create_client_with_keys ,
35
+ is_aws_cloud ,
36
+ )
34
37
from localstack .testing .pytest import markers
35
38
from localstack .testing .snapshots .transformer_utility import PATTERN_UUID
36
39
from localstack .utils import files , platform , testutil
123
126
TEST_LAMBDA_PYTHON_MULTIPLE_HANDLERS = os .path .join (
124
127
THIS_FOLDER , "functions/lambda_multiple_handlers.py"
125
128
)
129
+ TEST_LAMBDA_NOTIFIER = os .path .join (THIS_FOLDER , "functions/lambda_notifier.py" )
126
130
127
131
PYTHON_TEST_RUNTIMES = RUNTIMES_AGGREGATED ["python" ]
128
132
NODE_TEST_RUNTIMES = RUNTIMES_AGGREGATED ["nodejs" ]
@@ -2614,18 +2618,37 @@ def _invoke_lambda():
2614
2618
assert not errored
2615
2619
2616
2620
@markers .aws .validated
2617
- @pytest .mark .skip (reason = "flaky" )
2618
- def test_reserved_concurrency_async_queue (self , create_lambda_function , snapshot , aws_client ):
2621
+ def test_reserved_concurrency_async_queue (
2622
+ self ,
2623
+ create_lambda_function ,
2624
+ sqs_create_queue ,
2625
+ sqs_collect_messages ,
2626
+ snapshot ,
2627
+ aws_client ,
2628
+ aws_client_no_retry ,
2629
+ ):
2630
+ """Test async/event invoke retry behavior due to limited reserved concurrency.
2631
+ Timeline:
2632
+ 1) Set ReservedConcurrentExecutions=1
2633
+ 2) sync_invoke_warm_up => ok
2634
+ 3) async_invoke_one => ok
2635
+ 4) async_invoke_two => gets retried
2636
+ 5) sync invoke => fails with TooManyRequestsException
2637
+ 6) Set ReservedConcurrentExecutions=3
2638
+ 7) sync_invoke_final => ok
2639
+ """
2619
2640
min_concurrent_executions = 10 + 3
2620
2641
check_concurrency_quota (aws_client , min_concurrent_executions )
2621
2642
2643
+ queue_url = sqs_create_queue ()
2644
+
2622
2645
func_name = f"test_lambda_{ short_uid ()} "
2623
2646
create_lambda_function (
2624
2647
func_name = func_name ,
2625
- handler_file = TEST_LAMBDA_INTROSPECT_PYTHON ,
2648
+ handler_file = TEST_LAMBDA_NOTIFIER ,
2626
2649
runtime = Runtime .python3_12 ,
2627
2650
client = aws_client .lambda_ ,
2628
- timeout = 20 ,
2651
+ timeout = 30 ,
2629
2652
)
2630
2653
2631
2654
fn = aws_client .lambda_ .get_function_configuration (
@@ -2641,46 +2664,75 @@ def test_reserved_concurrency_async_queue(self, create_lambda_function, snapshot
2641
2664
snapshot .match ("put_fn_concurrency" , put_fn_concurrency )
2642
2665
2643
2666
# warm up the Lambda function to mitigate flakiness due to cold start
2644
- aws_client .lambda_ .invoke (FunctionName = fn_arn , InvocationType = "RequestResponse" )
2667
+ sync_invoke_warm_up = aws_client .lambda_ .invoke (
2668
+ FunctionName = fn_arn , InvocationType = "RequestResponse"
2669
+ )
2670
+ assert "FunctionError" not in sync_invoke_warm_up
2645
2671
2646
- # simultaneously queue two event invocations
2647
- # The first event invoke gets executed immediately
2648
- aws_client .lambda_ .invoke (
2649
- FunctionName = fn_arn , InvocationType = "Event" , Payload = json .dumps ({"wait" : 15 })
2672
+ # Immediately queue two event invocations:
2673
+ # 1) The first event invoke gets executed immediately
2674
+ async_invoke_one = aws_client .lambda_ .invoke (
2675
+ FunctionName = fn_arn ,
2676
+ InvocationType = "Event" ,
2677
+ Payload = json .dumps ({"notify" : queue_url , "wait" : 15 }),
2650
2678
)
2651
- # The second event invoke gets throttled and re-scheduled with an internal retry
2652
- aws_client .lambda_ .invoke (
2653
- FunctionName = fn_arn , InvocationType = "Event" , Payload = json .dumps ({"wait" : 10 })
2679
+ assert "FunctionError" not in async_invoke_one
2680
+ # 2) The second event invoke gets throttled and re-scheduled with an internal retry
2681
+ async_invoke_two = aws_client .lambda_ .invoke (
2682
+ FunctionName = fn_arn ,
2683
+ InvocationType = "Event" ,
2684
+ Payload = json .dumps ({"notify" : queue_url }),
2654
2685
)
2686
+ assert "FunctionError" not in async_invoke_two
2655
2687
2656
- # Ensure one event invocation is being executed and the other one is in the queue.
2657
- time .sleep (5 )
2688
+ # Wait until the first async invoke is being executed while the second async invoke is in the queue.
2689
+ messages = sqs_collect_messages (
2690
+ queue_url ,
2691
+ expected = 1 ,
2692
+ timeout = 15 ,
2693
+ )
2694
+ async_invoke_one_notification = json .loads (messages [0 ]["Body" ])
2695
+ assert (
2696
+ async_invoke_one_notification ["request_id" ]
2697
+ == async_invoke_one ["ResponseMetadata" ]["RequestId" ]
2698
+ )
2658
2699
2659
2700
# Synchronous invocations raise an exception because insufficient reserved concurrency is available
2701
+ # It is important to disable botocore retries because the concurrency limit is time-bound because it only
2702
+ # triggers as long as the first async invoke is processing!
2660
2703
with pytest .raises (aws_client .lambda_ .exceptions .TooManyRequestsException ) as e :
2661
- aws_client .lambda_ .invoke (FunctionName = fn_arn , InvocationType = "RequestResponse" )
2704
+ aws_client_no_retry .lambda_ .invoke (
2705
+ FunctionName = fn_arn , InvocationType = "RequestResponse"
2706
+ )
2662
2707
snapshot .match ("too_many_requests_exc" , e .value .response )
2663
2708
2664
2709
# ReservedConcurrentExecutions=2 is insufficient because the throttled async event invoke might be
2665
2710
# re-scheduled before the synchronous invoke while the first async invoke is still running.
2666
2711
aws_client .lambda_ .put_function_concurrency (
2667
2712
FunctionName = func_name , ReservedConcurrentExecutions = 3
2668
2713
)
2669
- aws_client .lambda_ .invoke (FunctionName = fn_arn , InvocationType = "RequestResponse" )
2670
-
2671
- def assert_events ():
2672
- log_events = aws_client .logs .filter_log_events (
2673
- logGroupName = f"/aws/lambda/{ func_name } " ,
2674
- )["events" ]
2675
- invocation_count = len (
2676
- [event ["message" ] for event in log_events if event ["message" ].startswith ("REPORT" )]
2677
- )
2678
- assert invocation_count == 4
2679
-
2680
- retry (assert_events , retries = 120 , sleep = 2 )
2714
+ # Invocations succeed after raising reserved concurrency
2715
+ sync_invoke_final = aws_client .lambda_ .invoke (
2716
+ FunctionName = fn_arn ,
2717
+ InvocationType = "RequestResponse" ,
2718
+ Payload = json .dumps ({"notify" : queue_url }),
2719
+ )
2720
+ assert "FunctionError" not in sync_invoke_final
2681
2721
2682
- # TODO: snapshot logs & request ID for correlation after request id gets propagated
2683
- # https://github.com/localstack/localstack/pull/7874
2722
+ # Contains the re-queued `async_invoke_two` and the `sync_invoke_final`, but the order might differ
2723
+ # depending on whether invoke_two gets re-schedule before or after the final invoke.
2724
+ # AWS docs: https://docs.aws.amazon.com/lambda/latest/dg/invocation-async-error-handling.html
2725
+ # "The retry interval increases exponentially from 1 second after the first attempt to a maximum of 5 minutes."
2726
+ final_messages = sqs_collect_messages (
2727
+ queue_url ,
2728
+ expected = 2 ,
2729
+ timeout = 20 ,
2730
+ )
2731
+ invoked_request_ids = {json .loads (msg ["Body" ])["request_id" ] for msg in final_messages }
2732
+ assert {
2733
+ async_invoke_two ["ResponseMetadata" ]["RequestId" ],
2734
+ sync_invoke_final ["ResponseMetadata" ]["RequestId" ],
2735
+ } == invoked_request_ids
2684
2736
2685
2737
@markers .snapshot .skip_snapshot_verify (paths = ["$..Attributes.AWSTraceHeader" ])
2686
2738
@markers .aws .validated
0 commit comments