From ce759ddec0cbffe62a23942fdeb6292331a68075 Mon Sep 17 00:00:00 2001 From: Raul Gallegos Date: Thu, 27 Feb 2025 21:11:20 -0500 Subject: [PATCH] [ESM] Validate event sources existence --- .../localstack/services/lambda_/provider.py | 70 +++++++++++++++++-- ...test_lambda_integration_dynamodbstreams.py | 24 +++++++ ..._integration_dynamodbstreams.snapshot.json | 19 ++++- ...ntegration_dynamodbstreams.validation.json | 5 +- .../test_lambda_integration_kinesis.py | 23 ++++++ ...t_lambda_integration_kinesis.snapshot.json | 17 +++++ ...lambda_integration_kinesis.validation.json | 3 + .../test_lambda_integration_sqs.py | 24 +++++++ .../test_lambda_integration_sqs.snapshot.json | 17 +++++ ...est_lambda_integration_sqs.validation.json | 3 + 10 files changed, 196 insertions(+), 9 deletions(-) diff --git a/localstack-core/localstack/services/lambda_/provider.py b/localstack-core/localstack/services/lambda_/provider.py index 71415a62902b4..c6fd4434d707d 100644 --- a/localstack-core/localstack/services/lambda_/provider.py +++ b/localstack-core/localstack/services/lambda_/provider.py @@ -9,6 +9,8 @@ import time from typing import IO, Any, Optional, Tuple +from botocore.exceptions import ClientError + from localstack import config from localstack.aws.api import RequestContext, ServiceException, handler from localstack.aws.api.lambda_ import ( @@ -160,6 +162,7 @@ from localstack.services.lambda_.event_source_mapping.esm_worker_factory import ( EsmWorkerFactory, ) +from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client from localstack.services.lambda_.invocation import AccessDeniedException from localstack.services.lambda_.invocation.execution_environment import ( EnvironmentStartupTimeoutException, @@ -224,6 +227,7 @@ lambda_event_source_mapping_arn, parse_arn, ) +from localstack.utils.aws.client_types import ServicePrincipal from localstack.utils.bootstrap import is_api_enabled from localstack.utils.collections import PaginatedList from localstack.utils.event_matcher import validate_event_pattern @@ -1866,6 +1870,52 @@ def update_alias( # ======================================= # ======= EVENT SOURCE MAPPINGS ========= # ======================================= + def check_service_resource_exists( + self, service: str, resource_arn: str, function_arn: str, function_role_arn: str + ): + """ + Check if the service resource exists and if the function has access to it. + + Raises: + InvalidParameterValueException: If the service resource does not exist or the function does not have access to it. + """ + arn = parse_arn(resource_arn) + source_client = get_internal_client( + arn=resource_arn, + role_arn=function_role_arn, + service_principal=ServicePrincipal.lambda_, + source_arn=function_arn, + ) + if service in ["sqs", "sqs-fifo"]: + try: + source_client.get_queue_attributes(QueueUrl=arn["resource"]) + except ClientError as e: + if e.response["Error"]["Code"] == "AWS.SimpleQueueService.NonExistentQueue": + raise InvalidParameterValueException( + f"Error occurred while ReceiveMessage. SQS Error Code: {e.response['Error']['Code']}. SQS Error Message: {e.response['Error']['Message']}", + Type="User", + ) + raise e + elif service in ["kinesis"]: + try: + source_client.describe_stream(StreamARN=resource_arn) + except ClientError as e: + if e.response["Error"]["Code"] == "ResourceNotFoundException": + raise InvalidParameterValueException( + f"Stream not found: {resource_arn}", + Type="User", + ) + raise e + elif service in ["dynamodb"]: + try: + source_client.describe_stream(StreamArn=resource_arn) + except ClientError as e: + if e.response["Error"]["Code"] == "ResourceNotFoundException": + raise InvalidParameterValueException( + f"Stream not found: {resource_arn}", + Type="User", + ) + raise e @handler("CreateEventSourceMapping", expand=False) def create_event_source_mapping( @@ -1881,14 +1931,14 @@ def create_event_source_mapping_v2( request: CreateEventSourceMappingRequest, ) -> EventSourceMappingConfiguration: # Validations - function_arn, function_name, state = self.validate_event_source_mapping(context, request) + function_arn, function_name, state, function_version, function_role = ( + self.validate_event_source_mapping(context, request) + ) esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config() # Copy esm_config to avoid a race condition with potential async update in the store state.event_source_mappings[esm_config["UUID"]] = esm_config.copy() - function_version = get_function_version_from_arn(function_arn) - function_role = function_version.config.role enabled = request.get("Enabled", True) # TODO: check for potential async race condition update -> think about locking esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker() @@ -1962,6 +2012,7 @@ def validate_event_source_mapping(self, context, request): fn = state.functions.get(function_name) if not fn: raise InvalidParameterValueException("Function does not exist", Type="User") + if qualifier: # make sure the function version/alias exists if api_utils.qualifier_is_alias(qualifier): @@ -1981,6 +2032,11 @@ def validate_event_source_mapping(self, context, request): else: fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region) + function_version = get_function_version_from_arn(fn_arn) + function_role = function_version.config.role + + if source_arn := request.get("EventSourceArn"): + self.check_service_resource_exists(service, source_arn, fn_arn, function_role) # Check we are validating a CreateEventSourceMapping request if is_create_esm_request: @@ -2026,7 +2082,7 @@ def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]: f"existing mapping with UUID {uuid}", Type="User", ) - return fn_arn, function_name, state + return fn_arn, function_name, state, function_version, function_role @handler("UpdateEventSourceMapping", expand=False) def update_event_source_mapping( @@ -2065,7 +2121,9 @@ def update_event_source_mapping_v2( temp_params = {} # values only set for the returned response, not saved internally (e.g. transient state) # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised. - function_arn, _, _ = self.validate_event_source_mapping(context, event_source_mapping) + function_arn, _, _, function_version, function_role = self.validate_event_source_mapping( + context, event_source_mapping + ) # remove the FunctionName field event_source_mapping.pop("FunctionName", None) @@ -2090,8 +2148,6 @@ def update_event_source_mapping_v2( state.event_source_mappings[uuid] = event_source_mapping # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance. - function_version = get_function_version_from_arn(function_arn) - function_role = function_version.config.role worker_factory = EsmWorkerFactory( event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled) ) diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py index 0ea4c3cea639e..fed8e9c4a8723 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py @@ -93,6 +93,30 @@ def _get_lambda_logs_event(function_name, expected_num_events, retries=30): ], ) class TestDynamoDBEventSourceMapping: + @markers.aws.validated + def test_esm_with_not_existing_dynamodb_stream( + self, aws_client, create_lambda_function, lambda_su_role, account_id, region_name, snapshot + ): + function_name = f"simple-lambda-{short_uid()}" + create_lambda_function( + func_name=function_name, + handler_file=TEST_LAMBDA_PYTHON_ECHO, + runtime=Runtime.python3_12, + role=lambda_su_role, + timeout=5, + ) + not_existing_dynamodb_stream_arn = f"arn:aws:dynamodb:{region_name}:{account_id}:table/test-table-4a53f4e8/stream/2025-02-22T03:03:25.490" + with pytest.raises(ClientError) as e: + aws_client.lambda_.create_event_source_mapping( + FunctionName=function_name, + BatchSize=1, + StartingPosition="TRIM_HORIZON", + EventSourceArn=not_existing_dynamodb_stream_arn, + MaximumBatchingWindowInSeconds=1, + MaximumRetryAttempts=1, + ) + snapshot.match("error", e.value.response) + @markers.aws.validated def test_dynamodb_event_source_mapping( self, diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.snapshot.json b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.snapshot.json index 32c96cd776bab..709bfc346d2f0 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.snapshot.json +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.snapshot.json @@ -1,6 +1,6 @@ { "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping": { - "recorded-date": "12-10-2024, 10:55:29", + "recorded-date": "22-02-2025, 03:03:03", "recorded-content": { "create-table-result": { "TableDescription": { @@ -4459,5 +4459,22 @@ "version": "1.0" } } + }, + "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_esm_with_not_existing_dynamodb_stream": { + "recorded-date": "26-02-2025, 03:08:09", + "recorded-content": { + "error": { + "Error": { + "Code": "InvalidParameterValueException", + "Message": "Stream not found: arn::dynamodb::111111111111:table/test-table-4a53f4e8/stream/2025-02-22T03:03:25.490" + }, + "Type": "User", + "message": "Stream not found: arn::dynamodb::111111111111:table/test-table-4a53f4e8/stream/2025-02-22T03:03:25.490", + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 400 + } + } + } } } diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.validation.json b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.validation.json index afba5b1342707..0bfbdbc8c52c6 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.validation.json +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.validation.json @@ -36,7 +36,7 @@ "last_validated_date": "2024-10-12T11:11:04+00:00" }, "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping": { - "last_validated_date": "2024-10-12T10:55:26+00:00" + "last_validated_date": "2025-02-22T03:03:01+00:00" }, "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping_with_on_failure_destination_config": { "last_validated_date": "2024-10-12T11:01:14+00:00" @@ -88,5 +88,8 @@ }, "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_report_batch_item_success_scenarios[null_success]": { "last_validated_date": "2024-10-12T11:42:04+00:00" + }, + "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_esm_with_not_existing_dynamodb_stream": { + "last_validated_date": "2025-02-26T03:08:08+00:00" } } diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py index a21ca7de3bec5..e7ce14e770f08 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py @@ -70,6 +70,29 @@ def _snapshot_transformers(snapshot): ], ) class TestKinesisSource: + @markers.aws.validated + def test_esm_with_not_existing_kinesis_stream( + self, aws_client, create_lambda_function, lambda_su_role, snapshot, account_id, region_name + ): + function_name = f"simple-lambda-{short_uid()}" + create_lambda_function( + func_name=function_name, + handler_file=TEST_LAMBDA_PYTHON_ECHO, + runtime=Runtime.python3_12, + role=lambda_su_role, + timeout=5, + ) + not_existing_stream_arn = ( + f"arn:aws:kinesis:{region_name}:{account_id}:stream/test-foobar-81ded7e8" + ) + with pytest.raises(ClientError) as e: + aws_client.lambda_.create_event_source_mapping( + EventSourceArn=not_existing_stream_arn, + FunctionName=function_name, + StartingPosition="LATEST", + ) + snapshot.match("error", e.value.response) + @markers.aws.validated def test_create_kinesis_event_source_mapping( self, diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.snapshot.json b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.snapshot.json index 244a214c1c36f..ee96a18aa4aa0 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.snapshot.json +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.snapshot.json @@ -3156,5 +3156,22 @@ "version": "1.0" } } + }, + "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_esm_with_not_existing_kinesis_stream": { + "recorded-date": "26-02-2025, 03:05:30", + "recorded-content": { + "error": { + "Error": { + "Code": "InvalidParameterValueException", + "Message": "Stream not found: arn::kinesis::111111111111:stream/test-foobar-81ded7e8" + }, + "Type": "User", + "message": "Stream not found: arn::kinesis::111111111111:stream/test-foobar-81ded7e8", + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 400 + } + } + } } } diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.validation.json b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.validation.json index c398c32fbeea4..ef98dfb806d7e 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.validation.json +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.validation.json @@ -14,6 +14,9 @@ "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_duplicate_event_source_mappings": { "last_validated_date": "2024-12-13T14:03:01+00:00" }, + "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_esm_with_not_existing_kinesis_stream": { + "last_validated_date": "2025-02-26T03:05:29+00:00" + }, "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_empty_provided": { "last_validated_date": "2024-12-13T14:45:29+00:00" }, diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py index f98eda35f4f16..43244beff29f2 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py @@ -55,6 +55,30 @@ def _snapshot_transformers(snapshot): ) +@markers.aws.validated +def test_esm_with_not_existing_sqs_queue( + aws_client, account_id, region_name, create_lambda_function, lambda_su_role, snapshot +): + function_name = f"simple-lambda-{short_uid()}" + create_lambda_function( + func_name=function_name, + handler_file=LAMBDA_SQS_INTEGRATION_FILE, + runtime=Runtime.python3_12, + role=lambda_su_role, + timeout=5, + ) + not_existing_queue_arn = ( + f"arn:aws:sqs:{region_name}:{account_id}:not-existing-queue-{short_uid()}" + ) + with pytest.raises(ClientError) as e: + aws_client.lambda_.create_event_source_mapping( + EventSourceArn=not_existing_queue_arn, + FunctionName=function_name, + BatchSize=1, + ) + snapshot.match("error", e.value.response) + + @markers.aws.validated def test_failing_lambda_retries_after_visibility_timeout( create_lambda_function, diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.snapshot.json b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.snapshot.json index b96ff2cf5edb1..db1e05521cb70 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.snapshot.json +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.snapshot.json @@ -4559,5 +4559,22 @@ } ] } + }, + "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::test_esm_with_not_existing_sqs_queue": { + "recorded-date": "26-02-2025, 03:01:33", + "recorded-content": { + "error": { + "Error": { + "Code": "InvalidParameterValueException", + "Message": "Error occurred while ReceiveMessage. SQS Error Code: AWS.SimpleQueueService.NonExistentQueue. SQS Error Message: The specified queue does not exist." + }, + "Type": "User", + "message": "Error occurred while ReceiveMessage. SQS Error Code: AWS.SimpleQueueService.NonExistentQueue. SQS Error Message: The specified queue does not exist.", + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 400 + } + } + } } } diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.validation.json b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.validation.json index 17c1d997c2153..cd2ec152bd499 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.validation.json +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.validation.json @@ -104,6 +104,9 @@ "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::TestSQSEventSourceMapping::test_sqs_invalid_event_filter[simple string]": { "last_validated_date": "2024-10-12T13:43:40+00:00" }, + "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::test_esm_with_not_existing_sqs_queue": { + "last_validated_date": "2025-02-26T03:01:32+00:00" + }, "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::test_failing_lambda_retries_after_visibility_timeout": { "last_validated_date": "2024-11-25T12:12:47+00:00" },