Skip to content

Commit dd0d96e

Browse files
authored
Lambda ESM: Validate event sources existence (#12297)
1 parent 288a34a commit dd0d96e

10 files changed

+196
-9
lines changed

localstack-core/localstack/services/lambda_/provider.py

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import time
1010
from typing import IO, Any, Optional, Tuple
1111

12+
from botocore.exceptions import ClientError
13+
1214
from localstack import config
1315
from localstack.aws.api import RequestContext, ServiceException, handler
1416
from localstack.aws.api.lambda_ import (
@@ -160,6 +162,7 @@
160162
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
161163
EsmWorkerFactory,
162164
)
165+
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
163166
from localstack.services.lambda_.invocation import AccessDeniedException
164167
from localstack.services.lambda_.invocation.execution_environment import (
165168
EnvironmentStartupTimeoutException,
@@ -224,6 +227,7 @@
224227
lambda_event_source_mapping_arn,
225228
parse_arn,
226229
)
230+
from localstack.utils.aws.client_types import ServicePrincipal
227231
from localstack.utils.bootstrap import is_api_enabled
228232
from localstack.utils.collections import PaginatedList
229233
from localstack.utils.event_matcher import validate_event_pattern
@@ -1866,6 +1870,52 @@ def update_alias(
18661870
# =======================================
18671871
# ======= EVENT SOURCE MAPPINGS =========
18681872
# =======================================
1873+
def check_service_resource_exists(
1874+
self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
1875+
):
1876+
"""
1877+
Check if the service resource exists and if the function has access to it.
1878+
1879+
Raises:
1880+
InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
1881+
"""
1882+
arn = parse_arn(resource_arn)
1883+
source_client = get_internal_client(
1884+
arn=resource_arn,
1885+
role_arn=function_role_arn,
1886+
service_principal=ServicePrincipal.lambda_,
1887+
source_arn=function_arn,
1888+
)
1889+
if service in ["sqs", "sqs-fifo"]:
1890+
try:
1891+
source_client.get_queue_attributes(QueueUrl=arn["resource"])
1892+
except ClientError as e:
1893+
if e.response["Error"]["Code"] == "AWS.SimpleQueueService.NonExistentQueue":
1894+
raise InvalidParameterValueException(
1895+
f"Error occurred while ReceiveMessage. SQS Error Code: {e.response['Error']['Code']}. SQS Error Message: {e.response['Error']['Message']}",
1896+
Type="User",
1897+
)
1898+
raise e
1899+
elif service in ["kinesis"]:
1900+
try:
1901+
source_client.describe_stream(StreamARN=resource_arn)
1902+
except ClientError as e:
1903+
if e.response["Error"]["Code"] == "ResourceNotFoundException":
1904+
raise InvalidParameterValueException(
1905+
f"Stream not found: {resource_arn}",
1906+
Type="User",
1907+
)
1908+
raise e
1909+
elif service in ["dynamodb"]:
1910+
try:
1911+
source_client.describe_stream(StreamArn=resource_arn)
1912+
except ClientError as e:
1913+
if e.response["Error"]["Code"] == "ResourceNotFoundException":
1914+
raise InvalidParameterValueException(
1915+
f"Stream not found: {resource_arn}",
1916+
Type="User",
1917+
)
1918+
raise e
18691919

18701920
@handler("CreateEventSourceMapping", expand=False)
18711921
def create_event_source_mapping(
@@ -1881,14 +1931,14 @@ def create_event_source_mapping_v2(
18811931
request: CreateEventSourceMappingRequest,
18821932
) -> EventSourceMappingConfiguration:
18831933
# Validations
1884-
function_arn, function_name, state = self.validate_event_source_mapping(context, request)
1934+
function_arn, function_name, state, function_version, function_role = (
1935+
self.validate_event_source_mapping(context, request)
1936+
)
18851937

18861938
esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
18871939

18881940
# Copy esm_config to avoid a race condition with potential async update in the store
18891941
state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
1890-
function_version = get_function_version_from_arn(function_arn)
1891-
function_role = function_version.config.role
18921942
enabled = request.get("Enabled", True)
18931943
# TODO: check for potential async race condition update -> think about locking
18941944
esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
@@ -1962,6 +2012,7 @@ def validate_event_source_mapping(self, context, request):
19622012
fn = state.functions.get(function_name)
19632013
if not fn:
19642014
raise InvalidParameterValueException("Function does not exist", Type="User")
2015+
19652016
if qualifier:
19662017
# make sure the function version/alias exists
19672018
if api_utils.qualifier_is_alias(qualifier):
@@ -1981,6 +2032,11 @@ def validate_event_source_mapping(self, context, request):
19812032
else:
19822033
fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
19832034

2035+
function_version = get_function_version_from_arn(fn_arn)
2036+
function_role = function_version.config.role
2037+
2038+
if source_arn := request.get("EventSourceArn"):
2039+
self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
19842040
# Check we are validating a CreateEventSourceMapping request
19852041
if is_create_esm_request:
19862042

@@ -2026,7 +2082,7 @@ def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
20262082
f"existing mapping with UUID {uuid}",
20272083
Type="User",
20282084
)
2029-
return fn_arn, function_name, state
2085+
return fn_arn, function_name, state, function_version, function_role
20302086

20312087
@handler("UpdateEventSourceMapping", expand=False)
20322088
def update_event_source_mapping(
@@ -2065,7 +2121,9 @@ def update_event_source_mapping_v2(
20652121
temp_params = {} # values only set for the returned response, not saved internally (e.g. transient state)
20662122

20672123
# Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2068-
function_arn, _, _ = self.validate_event_source_mapping(context, event_source_mapping)
2124+
function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
2125+
context, event_source_mapping
2126+
)
20692127

20702128
# remove the FunctionName field
20712129
event_source_mapping.pop("FunctionName", None)
@@ -2090,8 +2148,6 @@ def update_event_source_mapping_v2(
20902148
state.event_source_mappings[uuid] = event_source_mapping
20912149

20922150
# TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2093-
function_version = get_function_version_from_arn(function_arn)
2094-
function_role = function_version.config.role
20952151
worker_factory = EsmWorkerFactory(
20962152
event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
20972153
)

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,30 @@ def _get_lambda_logs_event(function_name, expected_num_events, retries=30):
9393
],
9494
)
9595
class TestDynamoDBEventSourceMapping:
96+
@markers.aws.validated
97+
def test_esm_with_not_existing_dynamodb_stream(
98+
self, aws_client, create_lambda_function, lambda_su_role, account_id, region_name, snapshot
99+
):
100+
function_name = f"simple-lambda-{short_uid()}"
101+
create_lambda_function(
102+
func_name=function_name,
103+
handler_file=TEST_LAMBDA_PYTHON_ECHO,
104+
runtime=Runtime.python3_12,
105+
role=lambda_su_role,
106+
timeout=5,
107+
)
108+
not_existing_dynamodb_stream_arn = f"arn:aws:dynamodb:{region_name}:{account_id}:table/test-table-4a53f4e8/stream/2025-02-22T03:03:25.490"
109+
with pytest.raises(ClientError) as e:
110+
aws_client.lambda_.create_event_source_mapping(
111+
FunctionName=function_name,
112+
BatchSize=1,
113+
StartingPosition="TRIM_HORIZON",
114+
EventSourceArn=not_existing_dynamodb_stream_arn,
115+
MaximumBatchingWindowInSeconds=1,
116+
MaximumRetryAttempts=1,
117+
)
118+
snapshot.match("error", e.value.response)
119+
96120
@markers.aws.validated
97121
def test_dynamodb_event_source_mapping(
98122
self,

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.snapshot.json

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping": {
3-
"recorded-date": "12-10-2024, 10:55:29",
3+
"recorded-date": "22-02-2025, 03:03:03",
44
"recorded-content": {
55
"create-table-result": {
66
"TableDescription": {
@@ -4459,5 +4459,22 @@
44594459
"version": "1.0"
44604460
}
44614461
}
4462+
},
4463+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_esm_with_not_existing_dynamodb_stream": {
4464+
"recorded-date": "26-02-2025, 03:08:09",
4465+
"recorded-content": {
4466+
"error": {
4467+
"Error": {
4468+
"Code": "InvalidParameterValueException",
4469+
"Message": "Stream not found: arn:<partition>:dynamodb:<region>:111111111111:table/test-table-4a53f4e8/stream/2025-02-22T03:03:25.490"
4470+
},
4471+
"Type": "User",
4472+
"message": "Stream not found: arn:<partition>:dynamodb:<region>:111111111111:table/test-table-4a53f4e8/stream/2025-02-22T03:03:25.490",
4473+
"ResponseMetadata": {
4474+
"HTTPHeaders": {},
4475+
"HTTPStatusCode": 400
4476+
}
4477+
}
4478+
}
44624479
}
44634480
}

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.validation.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
"last_validated_date": "2024-10-12T11:11:04+00:00"
3737
},
3838
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping": {
39-
"last_validated_date": "2024-10-12T10:55:26+00:00"
39+
"last_validated_date": "2025-02-22T03:03:01+00:00"
4040
},
4141
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping_with_on_failure_destination_config": {
4242
"last_validated_date": "2024-10-12T11:01:14+00:00"
@@ -88,5 +88,8 @@
8888
},
8989
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_report_batch_item_success_scenarios[null_success]": {
9090
"last_validated_date": "2024-10-12T11:42:04+00:00"
91+
},
92+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_esm_with_not_existing_dynamodb_stream": {
93+
"last_validated_date": "2025-02-26T03:08:08+00:00"
9194
}
9295
}

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,29 @@ def _snapshot_transformers(snapshot):
7070
],
7171
)
7272
class TestKinesisSource:
73+
@markers.aws.validated
74+
def test_esm_with_not_existing_kinesis_stream(
75+
self, aws_client, create_lambda_function, lambda_su_role, snapshot, account_id, region_name
76+
):
77+
function_name = f"simple-lambda-{short_uid()}"
78+
create_lambda_function(
79+
func_name=function_name,
80+
handler_file=TEST_LAMBDA_PYTHON_ECHO,
81+
runtime=Runtime.python3_12,
82+
role=lambda_su_role,
83+
timeout=5,
84+
)
85+
not_existing_stream_arn = (
86+
f"arn:aws:kinesis:{region_name}:{account_id}:stream/test-foobar-81ded7e8"
87+
)
88+
with pytest.raises(ClientError) as e:
89+
aws_client.lambda_.create_event_source_mapping(
90+
EventSourceArn=not_existing_stream_arn,
91+
FunctionName=function_name,
92+
StartingPosition="LATEST",
93+
)
94+
snapshot.match("error", e.value.response)
95+
7396
@markers.aws.validated
7497
def test_create_kinesis_event_source_mapping(
7598
self,

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.snapshot.json

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3156,5 +3156,22 @@
31563156
"version": "1.0"
31573157
}
31583158
}
3159+
},
3160+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_esm_with_not_existing_kinesis_stream": {
3161+
"recorded-date": "26-02-2025, 03:05:30",
3162+
"recorded-content": {
3163+
"error": {
3164+
"Error": {
3165+
"Code": "InvalidParameterValueException",
3166+
"Message": "Stream not found: arn:<partition>:kinesis:<region>:111111111111:stream/test-foobar-81ded7e8"
3167+
},
3168+
"Type": "User",
3169+
"message": "Stream not found: arn:<partition>:kinesis:<region>:111111111111:stream/test-foobar-81ded7e8",
3170+
"ResponseMetadata": {
3171+
"HTTPHeaders": {},
3172+
"HTTPStatusCode": 400
3173+
}
3174+
}
3175+
}
31593176
}
31603177
}

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.validation.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_duplicate_event_source_mappings": {
1515
"last_validated_date": "2024-12-13T14:03:01+00:00"
1616
},
17+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_esm_with_not_existing_kinesis_stream": {
18+
"last_validated_date": "2025-02-26T03:05:29+00:00"
19+
},
1720
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_empty_provided": {
1821
"last_validated_date": "2024-12-13T14:45:29+00:00"
1922
},

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,30 @@ def _snapshot_transformers(snapshot):
5656
)
5757

5858

59+
@markers.aws.validated
60+
def test_esm_with_not_existing_sqs_queue(
61+
aws_client, account_id, region_name, create_lambda_function, lambda_su_role, snapshot
62+
):
63+
function_name = f"simple-lambda-{short_uid()}"
64+
create_lambda_function(
65+
func_name=function_name,
66+
handler_file=LAMBDA_SQS_INTEGRATION_FILE,
67+
runtime=Runtime.python3_12,
68+
role=lambda_su_role,
69+
timeout=5,
70+
)
71+
not_existing_queue_arn = (
72+
f"arn:aws:sqs:{region_name}:{account_id}:not-existing-queue-{short_uid()}"
73+
)
74+
with pytest.raises(ClientError) as e:
75+
aws_client.lambda_.create_event_source_mapping(
76+
EventSourceArn=not_existing_queue_arn,
77+
FunctionName=function_name,
78+
BatchSize=1,
79+
)
80+
snapshot.match("error", e.value.response)
81+
82+
5983
@markers.aws.validated
6084
def test_failing_lambda_retries_after_visibility_timeout(
6185
create_lambda_function,

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.snapshot.json

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4559,5 +4559,22 @@
45594559
}
45604560
]
45614561
}
4562+
},
4563+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::test_esm_with_not_existing_sqs_queue": {
4564+
"recorded-date": "26-02-2025, 03:01:33",
4565+
"recorded-content": {
4566+
"error": {
4567+
"Error": {
4568+
"Code": "InvalidParameterValueException",
4569+
"Message": "Error occurred while ReceiveMessage. SQS Error Code: AWS.SimpleQueueService.NonExistentQueue. SQS Error Message: The specified queue does not exist."
4570+
},
4571+
"Type": "User",
4572+
"message": "Error occurred while ReceiveMessage. SQS Error Code: AWS.SimpleQueueService.NonExistentQueue. SQS Error Message: The specified queue does not exist.",
4573+
"ResponseMetadata": {
4574+
"HTTPHeaders": {},
4575+
"HTTPStatusCode": 400
4576+
}
4577+
}
4578+
}
45624579
}
45634580
}

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.validation.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@
104104
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::TestSQSEventSourceMapping::test_sqs_invalid_event_filter[simple string]": {
105105
"last_validated_date": "2024-10-12T13:43:40+00:00"
106106
},
107+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::test_esm_with_not_existing_sqs_queue": {
108+
"last_validated_date": "2025-02-26T03:01:32+00:00"
109+
},
107110
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::test_failing_lambda_retries_after_visibility_timeout": {
108111
"last_validated_date": "2024-11-25T12:12:47+00:00"
109112
},

0 commit comments

Comments
 (0)