Skip to content

Commit 263e556

Browse files
committed
[ESM] Validate event sources existence
1 parent 958b4f1 commit 263e556

10 files changed

+189
-2
lines changed

localstack-core/localstack/services/lambda_/provider.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import time
1010
from typing import IO, Any, Optional, Tuple
1111

12+
from botocore.exceptions import ClientError
13+
1214
from localstack import config
1315
from localstack.aws.api import RequestContext, ServiceException, handler
1416
from localstack.aws.api.lambda_ import (
@@ -160,6 +162,7 @@
160162
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
161163
EsmWorkerFactory,
162164
)
165+
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
163166
from localstack.services.lambda_.invocation import AccessDeniedException
164167
from localstack.services.lambda_.invocation.execution_environment import (
165168
EnvironmentStartupTimeoutException,
@@ -1866,6 +1869,39 @@ def update_alias(
18661869
# =======================================
18671870
# ======= EVENT SOURCE MAPPINGS =========
18681871
# =======================================
1872+
def check_service_resource_exists(self, service, resource_arn):
1873+
arn = parse_arn(resource_arn)
1874+
source_client = get_internal_client(resource_arn)
1875+
if service in ["sqs", "sqs-fifo"]:
1876+
try:
1877+
source_client.get_queue_url(QueueName=arn["resource"])
1878+
except ClientError as e:
1879+
if e.response["Error"]["QueryErrorCode"] == "QueueDoesNotExist":
1880+
raise InvalidParameterValueException(
1881+
f"Error occurred while ReceiveMessage. SQS Error Code: InvalidAddress. SQS Error Message: The address https://sqs.<region>.amazonaws.com/{arn['account']}/{arn['resource']} is not valid for this endpoint.",
1882+
Type="User",
1883+
)
1884+
raise e
1885+
elif service in ["kinesis"]:
1886+
try:
1887+
source_client.describe_stream(StreamARN=resource_arn)
1888+
except ClientError as e:
1889+
if e.response["Error"]["Code"] == "ResourceNotFoundException":
1890+
raise InvalidParameterValueException(
1891+
"Received Exception while reading from provided stream. No account found for the given parameters",
1892+
Type="User",
1893+
)
1894+
raise e
1895+
elif service in ["dynamodb"]:
1896+
try:
1897+
source_client.describe_stream(StreamArn=resource_arn)
1898+
except ClientError as e:
1899+
if e.response["Error"]["Code"] == "ResourceNotFoundException":
1900+
raise InvalidParameterValueException(
1901+
"The resource that you specified for the EventSourceArn parameter doesn't exist.",
1902+
Type="User",
1903+
)
1904+
raise e
18691905

18701906
@handler("CreateEventSourceMapping", expand=False)
18711907
def create_event_source_mapping(
@@ -1935,6 +1971,8 @@ def validate_event_source_mapping(self, context, request):
19351971
Type="User",
19361972
)
19371973

1974+
self.check_service_resource_exists(service, request["EventSourceArn"])
1975+
19381976
if (filter_criteria := request.get("FilterCriteria")) is not None:
19391977
for filter_ in filter_criteria.get("Filters", []):
19401978
pattern_str = filter_.get("Pattern")

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,36 @@ def _get_lambda_logs_event(function_name, expected_num_events, retries=30):
9393
],
9494
)
9595
class TestDynamoDBEventSourceMapping:
96+
@markers.snapshot.skip_snapshot_verify(
97+
paths=[
98+
"$..message",
99+
"$..Error.Message",
100+
]
101+
)
102+
@markers.aws.validated
103+
def test_esm_with_not_existing_dynamodb_stream(
104+
self, aws_client, create_lambda_function, lambda_su_role, account_id, region_name, snapshot
105+
):
106+
function_name = f"simple-lambda-{short_uid()}"
107+
create_lambda_function(
108+
func_name=function_name,
109+
handler_file=TEST_LAMBDA_PYTHON_ECHO,
110+
runtime=Runtime.python3_12,
111+
role=lambda_su_role,
112+
timeout=5,
113+
)
114+
not_existing_dynamodb_stream_arn = f"arn:aws:dynamodb:{region_name}:{account_id}:table/test-table-{short_uid()}/stream/2025-02-22T03:03:25.490"
115+
with pytest.raises(ClientError) as e:
116+
aws_client.lambda_.create_event_source_mapping(
117+
FunctionName=function_name,
118+
BatchSize=1,
119+
StartingPosition="TRIM_HORIZON",
120+
EventSourceArn=not_existing_dynamodb_stream_arn,
121+
MaximumBatchingWindowInSeconds=1,
122+
MaximumRetryAttempts=1,
123+
)
124+
snapshot.match("error", e.value.response)
125+
96126
@markers.aws.validated
97127
def test_dynamodb_event_source_mapping(
98128
self,

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.snapshot.json

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping": {
3-
"recorded-date": "12-10-2024, 10:55:29",
3+
"recorded-date": "22-02-2025, 03:03:03",
44
"recorded-content": {
55
"create-table-result": {
66
"TableDescription": {
@@ -4459,5 +4459,22 @@
44594459
"version": "1.0"
44604460
}
44614461
}
4462+
},
4463+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_esm_with_not_existing_dynamodb_stream": {
4464+
"recorded-date": "26-02-2025, 03:08:09",
4465+
"recorded-content": {
4466+
"error": {
4467+
"Error": {
4468+
"Code": "InvalidParameterValueException",
4469+
"Message": "Stream not found: arn:<partition>:dynamodb:<region>:111111111111:table/test-table-4a53f4e8/stream/2025-02-22T03:03:25.490"
4470+
},
4471+
"Type": "User",
4472+
"message": "Stream not found: arn:<partition>:dynamodb:<region>:111111111111:table/test-table-4a53f4e8/stream/2025-02-22T03:03:25.490",
4473+
"ResponseMetadata": {
4474+
"HTTPHeaders": {},
4475+
"HTTPStatusCode": 400
4476+
}
4477+
}
4478+
}
44624479
}
44634480
}

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.validation.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
"last_validated_date": "2024-10-12T11:11:04+00:00"
3737
},
3838
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping": {
39-
"last_validated_date": "2024-10-12T10:55:26+00:00"
39+
"last_validated_date": "2025-02-22T03:03:01+00:00"
4040
},
4141
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping_with_on_failure_destination_config": {
4242
"last_validated_date": "2024-10-12T11:01:14+00:00"
@@ -88,5 +88,8 @@
8888
},
8989
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_report_batch_item_success_scenarios[null_success]": {
9090
"last_validated_date": "2024-10-12T11:42:04+00:00"
91+
},
92+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_esm_with_not_existing_dynamodb_stream": {
93+
"last_validated_date": "2025-02-26T03:08:08+00:00"
9194
}
9295
}

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,35 @@ def _snapshot_transformers(snapshot):
7070
],
7171
)
7272
class TestKinesisSource:
73+
@markers.snapshot.skip_snapshot_verify(
74+
paths=[
75+
"$..message",
76+
"$..Error.Message",
77+
]
78+
)
79+
@markers.aws.validated
80+
def test_esm_with_not_existing_kinesis_stream(
81+
self, aws_client, create_lambda_function, lambda_su_role, snapshot, account_id, region_name
82+
):
83+
function_name = f"simple-lambda-{short_uid()}"
84+
create_lambda_function(
85+
func_name=function_name,
86+
handler_file=TEST_LAMBDA_PYTHON_ECHO,
87+
runtime=Runtime.python3_12,
88+
role=lambda_su_role,
89+
timeout=5,
90+
)
91+
not_existing_stream_arn = (
92+
f"arn:aws:kinesis:{region_name}:{account_id}:stream/test-foobar-{short_uid()}"
93+
)
94+
with pytest.raises(ClientError) as e:
95+
aws_client.lambda_.create_event_source_mapping(
96+
EventSourceArn=not_existing_stream_arn,
97+
FunctionName=function_name,
98+
StartingPosition="LATEST",
99+
)
100+
snapshot.match("error", e.value.response)
101+
73102
@markers.aws.validated
74103
def test_create_kinesis_event_source_mapping(
75104
self,

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.snapshot.json

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3156,5 +3156,22 @@
31563156
"version": "1.0"
31573157
}
31583158
}
3159+
},
3160+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_esm_with_not_existing_kinesis_stream": {
3161+
"recorded-date": "26-02-2025, 03:05:30",
3162+
"recorded-content": {
3163+
"error": {
3164+
"Error": {
3165+
"Code": "InvalidParameterValueException",
3166+
"Message": "Stream not found: arn:<partition>:kinesis:<region>:111111111111:stream/test-foobar-81ded7e8"
3167+
},
3168+
"Type": "User",
3169+
"message": "Stream not found: arn:<partition>:kinesis:<region>:111111111111:stream/test-foobar-81ded7e8",
3170+
"ResponseMetadata": {
3171+
"HTTPHeaders": {},
3172+
"HTTPStatusCode": 400
3173+
}
3174+
}
3175+
}
31593176
}
31603177
}

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.validation.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_duplicate_event_source_mappings": {
1515
"last_validated_date": "2024-12-13T14:03:01+00:00"
1616
},
17+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_esm_with_not_existing_kinesis_stream": {
18+
"last_validated_date": "2025-02-26T03:05:29+00:00"
19+
},
1720
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_empty_provided": {
1821
"last_validated_date": "2024-12-13T14:45:29+00:00"
1922
},

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,36 @@ def _snapshot_transformers(snapshot):
5555
)
5656

5757

58+
@markers.snapshot.skip_snapshot_verify(
59+
paths=[
60+
"$..message",
61+
"$..Error.Message",
62+
]
63+
)
64+
@markers.aws.validated
65+
def test_esm_with_not_existing_sqs_queue(
66+
aws_client, account_id, region_name, create_lambda_function, lambda_su_role, snapshot
67+
):
68+
function_name = f"simple-lambda-{short_uid()}"
69+
create_lambda_function(
70+
func_name=function_name,
71+
handler_file=LAMBDA_SQS_INTEGRATION_FILE,
72+
runtime=Runtime.python3_12,
73+
role=lambda_su_role,
74+
timeout=5,
75+
)
76+
not_existing_queue_arn = (
77+
f"arn:aws:sqs:{region_name}:{account_id}:not-existing-queue-{short_uid()}"
78+
)
79+
with pytest.raises(ClientError) as e:
80+
aws_client.lambda_.create_event_source_mapping(
81+
EventSourceArn=not_existing_queue_arn,
82+
FunctionName=function_name,
83+
BatchSize=1,
84+
)
85+
snapshot.match("error", e.value.response)
86+
87+
5888
@markers.aws.validated
5989
def test_failing_lambda_retries_after_visibility_timeout(
6090
create_lambda_function,

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.snapshot.json

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4559,5 +4559,22 @@
45594559
}
45604560
]
45614561
}
4562+
},
4563+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::test_esm_with_not_existing_sqs_queue": {
4564+
"recorded-date": "26-02-2025, 03:01:33",
4565+
"recorded-content": {
4566+
"error": {
4567+
"Error": {
4568+
"Code": "InvalidParameterValueException",
4569+
"Message": "Error occurred while ReceiveMessage. SQS Error Code: AWS.SimpleQueueService.NonExistentQueue. SQS Error Message: The specified queue does not exist."
4570+
},
4571+
"Type": "User",
4572+
"message": "Error occurred while ReceiveMessage. SQS Error Code: AWS.SimpleQueueService.NonExistentQueue. SQS Error Message: The specified queue does not exist.",
4573+
"ResponseMetadata": {
4574+
"HTTPHeaders": {},
4575+
"HTTPStatusCode": 400
4576+
}
4577+
}
4578+
}
45624579
}
45634580
}

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.validation.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@
104104
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::TestSQSEventSourceMapping::test_sqs_invalid_event_filter[simple string]": {
105105
"last_validated_date": "2024-10-12T13:43:40+00:00"
106106
},
107+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::test_esm_with_not_existing_sqs_queue": {
108+
"last_validated_date": "2025-02-26T03:01:32+00:00"
109+
},
107110
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::test_failing_lambda_retries_after_visibility_timeout": {
108111
"last_validated_date": "2024-11-25T12:12:47+00:00"
109112
},

0 commit comments

Comments
 (0)