Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 63 additions & 7 deletions localstack-core/localstack/services/lambda_/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import time
from typing import IO, Any, Optional, Tuple

from botocore.exceptions import ClientError

from localstack import config
from localstack.aws.api import RequestContext, ServiceException, handler
from localstack.aws.api.lambda_ import (
Expand Down Expand Up @@ -160,6 +162,7 @@
from localstack.services.lambda_.event_source_mapping.esm_worker_factory import (
EsmWorkerFactory,
)
from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client
from localstack.services.lambda_.invocation import AccessDeniedException
from localstack.services.lambda_.invocation.execution_environment import (
EnvironmentStartupTimeoutException,
Expand Down Expand Up @@ -224,6 +227,7 @@
lambda_event_source_mapping_arn,
parse_arn,
)
from localstack.utils.aws.client_types import ServicePrincipal
from localstack.utils.bootstrap import is_api_enabled
from localstack.utils.collections import PaginatedList
from localstack.utils.event_matcher import validate_event_pattern
Expand Down Expand Up @@ -1866,6 +1870,52 @@ def update_alias(
# =======================================
# ======= EVENT SOURCE MAPPINGS =========
# =======================================
def check_service_resource_exists(
self, service: str, resource_arn: str, function_arn: str, function_role_arn: str
):
"""
Check if the service resource exists and if the function has access to it.

Raises:
InvalidParameterValueException: If the service resource does not exist or the function does not have access to it.
"""
arn = parse_arn(resource_arn)
source_client = get_internal_client(
arn=resource_arn,
role_arn=function_role_arn,
service_principal=ServicePrincipal.lambda_,
source_arn=function_arn,
)
if service in ["sqs", "sqs-fifo"]:
try:
source_client.get_queue_attributes(QueueUrl=arn["resource"])
except ClientError as e:
if e.response["Error"]["Code"] == "AWS.SimpleQueueService.NonExistentQueue":
raise InvalidParameterValueException(
f"Error occurred while ReceiveMessage. SQS Error Code: {e.response['Error']['Code']}. SQS Error Message: {e.response['Error']['Message']}",
Type="User",
)
raise e
elif service in ["kinesis"]:
try:
source_client.describe_stream(StreamARN=resource_arn)
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
raise InvalidParameterValueException(
f"Stream not found: {resource_arn}",
Type="User",
)
raise e
elif service in ["dynamodb"]:
try:
source_client.describe_stream(StreamArn=resource_arn)
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
raise InvalidParameterValueException(
f"Stream not found: {resource_arn}",
Type="User",
)
raise e

@handler("CreateEventSourceMapping", expand=False)
def create_event_source_mapping(
Expand All @@ -1881,14 +1931,14 @@ def create_event_source_mapping_v2(
request: CreateEventSourceMappingRequest,
) -> EventSourceMappingConfiguration:
# Validations
function_arn, function_name, state = self.validate_event_source_mapping(context, request)
function_arn, function_name, state, function_version, function_role = (
self.validate_event_source_mapping(context, request)
)

esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()

# Copy esm_config to avoid a race condition with potential async update in the store
state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
function_version = get_function_version_from_arn(function_arn)
function_role = function_version.config.role
enabled = request.get("Enabled", True)
# TODO: check for potential async race condition update -> think about locking
esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker()
Expand Down Expand Up @@ -1962,6 +2012,7 @@ def validate_event_source_mapping(self, context, request):
fn = state.functions.get(function_name)
if not fn:
raise InvalidParameterValueException("Function does not exist", Type="User")

if qualifier:
# make sure the function version/alias exists
if api_utils.qualifier_is_alias(qualifier):
Expand All @@ -1981,6 +2032,11 @@ def validate_event_source_mapping(self, context, request):
else:
fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)

function_version = get_function_version_from_arn(fn_arn)
function_role = function_version.config.role

if source_arn := request.get("EventSourceArn"):
self.check_service_resource_exists(service, source_arn, fn_arn, function_role)
# Check we are validating a CreateEventSourceMapping request
if is_create_esm_request:

Expand Down Expand Up @@ -2026,7 +2082,7 @@ def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
f"existing mapping with UUID {uuid}",
Type="User",
)
return fn_arn, function_name, state
return fn_arn, function_name, state, function_version, function_role

@handler("UpdateEventSourceMapping", expand=False)
def update_event_source_mapping(
Expand Down Expand Up @@ -2065,7 +2121,9 @@ def update_event_source_mapping_v2(
temp_params = {} # values only set for the returned response, not saved internally (e.g. transient state)

# Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
function_arn, _, _ = self.validate_event_source_mapping(context, event_source_mapping)
function_arn, _, _, function_version, function_role = self.validate_event_source_mapping(
context, event_source_mapping
)

# remove the FunctionName field
event_source_mapping.pop("FunctionName", None)
Expand All @@ -2090,8 +2148,6 @@ def update_event_source_mapping_v2(
state.event_source_mappings[uuid] = event_source_mapping

# TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
function_version = get_function_version_from_arn(function_arn)
function_role = function_version.config.role
worker_factory = EsmWorkerFactory(
event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,30 @@ def _get_lambda_logs_event(function_name, expected_num_events, retries=30):
],
)
class TestDynamoDBEventSourceMapping:
@markers.aws.validated
def test_esm_with_not_existing_dynamodb_stream(
self, aws_client, create_lambda_function, lambda_su_role, account_id, region_name, snapshot
):
function_name = f"simple-lambda-{short_uid()}"
create_lambda_function(
func_name=function_name,
handler_file=TEST_LAMBDA_PYTHON_ECHO,
runtime=Runtime.python3_12,
role=lambda_su_role,
timeout=5,
)
not_existing_dynamodb_stream_arn = f"arn:aws:dynamodb:{region_name}:{account_id}:table/test-table-4a53f4e8/stream/2025-02-22T03:03:25.490"
with pytest.raises(ClientError) as e:
aws_client.lambda_.create_event_source_mapping(
FunctionName=function_name,
BatchSize=1,
StartingPosition="TRIM_HORIZON",
EventSourceArn=not_existing_dynamodb_stream_arn,
MaximumBatchingWindowInSeconds=1,
MaximumRetryAttempts=1,
)
snapshot.match("error", e.value.response)

@markers.aws.validated
def test_dynamodb_event_source_mapping(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping": {
"recorded-date": "12-10-2024, 10:55:29",
"recorded-date": "22-02-2025, 03:03:03",
"recorded-content": {
"create-table-result": {
"TableDescription": {
Expand Down Expand Up @@ -4459,5 +4459,22 @@
"version": "1.0"
}
}
},
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_esm_with_not_existing_dynamodb_stream": {
"recorded-date": "26-02-2025, 03:08:09",
"recorded-content": {
"error": {
"Error": {
"Code": "InvalidParameterValueException",
"Message": "Stream not found: arn:<partition>:dynamodb:<region>:111111111111:table/test-table-4a53f4e8/stream/2025-02-22T03:03:25.490"
},
"Type": "User",
"message": "Stream not found: arn:<partition>:dynamodb:<region>:111111111111:table/test-table-4a53f4e8/stream/2025-02-22T03:03:25.490",
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 400
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"last_validated_date": "2024-10-12T11:11:04+00:00"
},
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping": {
"last_validated_date": "2024-10-12T10:55:26+00:00"
"last_validated_date": "2025-02-22T03:03:01+00:00"
},
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping_with_on_failure_destination_config": {
"last_validated_date": "2024-10-12T11:01:14+00:00"
Expand Down Expand Up @@ -88,5 +88,8 @@
},
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_report_batch_item_success_scenarios[null_success]": {
"last_validated_date": "2024-10-12T11:42:04+00:00"
},
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_esm_with_not_existing_dynamodb_stream": {
"last_validated_date": "2025-02-26T03:08:08+00:00"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@ def _snapshot_transformers(snapshot):
],
)
class TestKinesisSource:
@markers.aws.validated
def test_esm_with_not_existing_kinesis_stream(
self, aws_client, create_lambda_function, lambda_su_role, snapshot, account_id, region_name
):
function_name = f"simple-lambda-{short_uid()}"
create_lambda_function(
func_name=function_name,
handler_file=TEST_LAMBDA_PYTHON_ECHO,
runtime=Runtime.python3_12,
role=lambda_su_role,
timeout=5,
)
not_existing_stream_arn = (
f"arn:aws:kinesis:{region_name}:{account_id}:stream/test-foobar-81ded7e8"
)
with pytest.raises(ClientError) as e:
aws_client.lambda_.create_event_source_mapping(
EventSourceArn=not_existing_stream_arn,
FunctionName=function_name,
StartingPosition="LATEST",
)
snapshot.match("error", e.value.response)

@markers.aws.validated
def test_create_kinesis_event_source_mapping(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3156,5 +3156,22 @@
"version": "1.0"
}
}
},
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_esm_with_not_existing_kinesis_stream": {
"recorded-date": "26-02-2025, 03:05:30",
"recorded-content": {
"error": {
"Error": {
"Code": "InvalidParameterValueException",
"Message": "Stream not found: arn:<partition>:kinesis:<region>:111111111111:stream/test-foobar-81ded7e8"
},
"Type": "User",
"message": "Stream not found: arn:<partition>:kinesis:<region>:111111111111:stream/test-foobar-81ded7e8",
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 400
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_duplicate_event_source_mappings": {
"last_validated_date": "2024-12-13T14:03:01+00:00"
},
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_esm_with_not_existing_kinesis_stream": {
"last_validated_date": "2025-02-26T03:05:29+00:00"
},
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_empty_provided": {
"last_validated_date": "2024-12-13T14:45:29+00:00"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,30 @@ def _snapshot_transformers(snapshot):
)


@markers.aws.validated
def test_esm_with_not_existing_sqs_queue(
aws_client, account_id, region_name, create_lambda_function, lambda_su_role, snapshot
):
function_name = f"simple-lambda-{short_uid()}"
create_lambda_function(
func_name=function_name,
handler_file=LAMBDA_SQS_INTEGRATION_FILE,
runtime=Runtime.python3_12,
role=lambda_su_role,
timeout=5,
)
not_existing_queue_arn = (
f"arn:aws:sqs:{region_name}:{account_id}:not-existing-queue-{short_uid()}"
)
with pytest.raises(ClientError) as e:
aws_client.lambda_.create_event_source_mapping(
EventSourceArn=not_existing_queue_arn,
FunctionName=function_name,
BatchSize=1,
)
snapshot.match("error", e.value.response)


@markers.aws.validated
def test_failing_lambda_retries_after_visibility_timeout(
create_lambda_function,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4559,5 +4559,22 @@
}
]
}
},
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::test_esm_with_not_existing_sqs_queue": {
"recorded-date": "26-02-2025, 03:01:33",
"recorded-content": {
"error": {
"Error": {
"Code": "InvalidParameterValueException",
"Message": "Error occurred while ReceiveMessage. SQS Error Code: AWS.SimpleQueueService.NonExistentQueue. SQS Error Message: The specified queue does not exist."
},
"Type": "User",
"message": "Error occurred while ReceiveMessage. SQS Error Code: AWS.SimpleQueueService.NonExistentQueue. SQS Error Message: The specified queue does not exist.",
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 400
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::TestSQSEventSourceMapping::test_sqs_invalid_event_filter[simple string]": {
"last_validated_date": "2024-10-12T13:43:40+00:00"
},
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::test_esm_with_not_existing_sqs_queue": {
"last_validated_date": "2025-02-26T03:01:32+00:00"
},
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::test_failing_lambda_retries_after_visibility_timeout": {
"last_validated_date": "2024-11-25T12:12:47+00:00"
},
Expand Down