Skip to content

Commit 2ab5135

Browse files
gregfurmanmacnev2013
authored andcommitted
ESMv2: Fix UpdateEventSourceMapping request and batch size check for SQS (#11637)
1 parent c117651 commit 2ab5135

File tree

4 files changed

+120
-99
lines changed

4 files changed

+120
-99
lines changed

localstack-core/localstack/services/lambda_/api_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ def validate_and_set_batch_size(service: str, batch_size: Optional[int] = None)
637637
BATCH_SIZE_RANGES = {
638638
"kafka": (100, 10_000),
639639
"kinesis": (100, 10_000),
640-
"dynamodb": (100, 1_000),
640+
"dynamodb": (100, 10_000),
641641
"sqs-fifo": (10, 10),
642642
"sqs": (10, 10_000),
643643
"mq": (100, 10_000),

localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,14 @@ def stop(self):
107107
with self._state_lock:
108108
self.enabled = False
109109
self.current_state = EsmState.DISABLING
110+
self.update_esm_state_in_store(EsmState.DISABLING)
110111
self.state_transition_reason = self.user_state_reason
111112
self._shutdown_event.set()
112113

113114
def delete(self):
114115
with self._state_lock:
115116
self.current_state = EsmState.DELETING
117+
self.update_esm_state_in_store(EsmState.DELETING)
116118
self.state_transition_reason = self.user_state_reason
117119
self._shutdown_event.set()
118120

localstack-core/localstack/services/lambda_/provider.py

Lines changed: 84 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1951,6 +1951,7 @@ def create_event_source_mapping_v1(
19511951

19521952
def validate_event_source_mapping(self, context, request):
19531953
# TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
1954+
is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation
19541955

19551956
service = None
19561957
if "SelfManagedEventSource" in request:
@@ -1963,12 +1964,22 @@ def validate_event_source_mapping(self, context, request):
19631964
raise InvalidParameterValueException("Unrecognized event source.", Type="User")
19641965
if service is None:
19651966
service = extract_service_from_arn(request["EventSourceArn"])
1967+
1968+
batch_size = api_utils.validate_and_set_batch_size(service, request.get("BatchSize"))
19661969
if service in ["dynamodb", "kinesis"] and "StartingPosition" not in request:
19671970
raise InvalidParameterValueException(
19681971
"1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null.",
19691972
Type="User",
19701973
)
1971-
request_function_name = request["FunctionName"]
1974+
if service in ["sqs", "sqs-fifo"]:
1975+
if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0:
1976+
raise InvalidParameterValueException(
1977+
"Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10",
1978+
Type="User",
1979+
)
1980+
# Can either have a FunctionName (i.e CreateEventSourceMapping request) or
1981+
# an internal EventSourceMappingConfiguration representation
1982+
request_function_name = request.get("FunctionName") or request.get("FunctionArn")
19721983
# can be either a partial arn or a full arn for the version/alias
19731984
function_name, qualifier, account, region = function_locators_from_arn(
19741985
request_function_name
@@ -1999,48 +2010,51 @@ def validate_event_source_mapping(self, context, request):
19992010
else:
20002011
fn_arn = api_utils.unqualified_lambda_arn(function_name, account, region)
20012012

2002-
def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
2003-
if event_source_arn := mapping.get("EventSourceArn"):
2004-
return [event_source_arn]
2005-
return (
2006-
mapping.get("SelfManagedEventSource", {})
2007-
.get("Endpoints", {})
2008-
.get("KAFKA_BOOTSTRAP_SERVERS", [])
2009-
)
2010-
2011-
# check for event source duplicates
2012-
# TODO: currently validated for sqs, kinesis, and dynamodb
2013-
service_id = load_service(service).service_id
2014-
for uuid, mapping in state.event_source_mappings.items():
2015-
mapping_sources = _get_mapping_sources(mapping)
2016-
request_sources = _get_mapping_sources(request)
2017-
if mapping["FunctionArn"] == fn_arn and (
2018-
set(mapping_sources).intersection(request_sources)
2019-
):
2020-
if service == "sqs":
2021-
# *shakes fist at SQS*
2022-
raise ResourceConflictException(
2023-
f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2024-
f'and function (" {function_name} ") already exists. Please update or delete the '
2025-
f"existing mapping with UUID {uuid}",
2026-
Type="User",
2027-
)
2028-
elif service == "kafka":
2029-
if set(mapping["Topics"]).intersection(request["Topics"]):
2013+
# Check we are validating a CreateEventSourceMapping request
2014+
if is_create_esm_request:
2015+
2016+
def _get_mapping_sources(mapping: dict[str, Any]) -> list[str]:
2017+
if event_source_arn := mapping.get("EventSourceArn"):
2018+
return [event_source_arn]
2019+
return (
2020+
mapping.get("SelfManagedEventSource", {})
2021+
.get("Endpoints", {})
2022+
.get("KAFKA_BOOTSTRAP_SERVERS", [])
2023+
)
2024+
2025+
# check for event source duplicates
2026+
# TODO: currently validated for sqs, kinesis, and dynamodb
2027+
service_id = load_service(service).service_id
2028+
for uuid, mapping in state.event_source_mappings.items():
2029+
mapping_sources = _get_mapping_sources(mapping)
2030+
request_sources = _get_mapping_sources(request)
2031+
if mapping["FunctionArn"] == fn_arn and (
2032+
set(mapping_sources).intersection(request_sources)
2033+
):
2034+
if service == "sqs":
2035+
# *shakes fist at SQS*
20302036
raise ResourceConflictException(
2031-
f'An event source mapping with event source ("{",".join(request_sources)}"), '
2032-
f'function ("{fn_arn}"), '
2033-
f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2037+
f'An event source mapping with {service_id} arn (" {mapping["EventSourceArn"]} ") '
2038+
f'and function (" {function_name} ") already exists. Please update or delete the '
2039+
f"existing mapping with UUID {uuid}",
2040+
Type="User",
2041+
)
2042+
elif service == "kafka":
2043+
if set(mapping["Topics"]).intersection(request["Topics"]):
2044+
raise ResourceConflictException(
2045+
f'An event source mapping with event source ("{",".join(request_sources)}"), '
2046+
f'function ("{fn_arn}"), '
2047+
f'topics ("{",".join(request["Topics"])}") already exists. Please update or delete the '
2048+
f"existing mapping with UUID {uuid}",
2049+
Type="User",
2050+
)
2051+
else:
2052+
raise ResourceConflictException(
2053+
f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2054+
f'(" {function_name} ") provided mapping already exists. Please update or delete the '
20342055
f"existing mapping with UUID {uuid}",
20352056
Type="User",
20362057
)
2037-
else:
2038-
raise ResourceConflictException(
2039-
f'The event source arn (" {mapping["EventSourceArn"]} ") and function '
2040-
f'(" {function_name} ") provided mapping already exists. Please update or delete the '
2041-
f"existing mapping with UUID {uuid}",
2042-
Type="User",
2043-
)
20442058
return fn_arn, function_name, state
20452059

20462060
@handler("UpdateEventSourceMapping", expand=False)
@@ -2141,37 +2155,53 @@ def update_event_source_mapping_v2(
21412155
"The resource you requested does not exist.", Type="User"
21422156
) # TODO: test?
21432157

2144-
# remove the FunctionName field
2145-
function_name_or_arn = request_data.pop("FunctionName", None)
2146-
21472158
# normalize values to overwrite
21482159
event_source_mapping = old_event_source_mapping | request_data
21492160

2150-
if function_name_or_arn:
2151-
# if the FunctionName field was present, update the FunctionArn of the EventSourceMapping
2152-
account_id, region = api_utils.get_account_and_region(function_name_or_arn, context)
2153-
function_name, qualifier = api_utils.get_name_and_qualifier(
2154-
function_name_or_arn, None, context
2155-
)
2156-
event_source_mapping["FunctionArn"] = api_utils.qualified_lambda_arn(
2157-
function_name, qualifier, account_id, region
2158-
)
2159-
21602161
temp_params = {} # values only set for the returned response, not saved internally (e.g. transient state)
21612162

2163+
# Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2164+
function_arn, _, _ = self.validate_event_source_mapping(context, event_source_mapping)
2165+
2166+
# remove the FunctionName field
2167+
event_source_mapping.pop("FunctionName", None)
2168+
2169+
if function_arn:
2170+
event_source_mapping["FunctionArn"] = function_arn
2171+
21622172
esm_worker = self.esm_workers[uuid]
21632173
# Only apply update if the desired state differs
21642174
enabled = request.get("Enabled")
21652175
if enabled is not None:
21662176
if enabled and old_event_source_mapping["State"] != EsmState.ENABLED:
2167-
esm_worker.start()
21682177
event_source_mapping["State"] = EsmState.ENABLING
21692178
# TODO: What happens when trying to update during an update or failed state?!
21702179
elif not enabled and old_event_source_mapping["State"] == EsmState.ENABLED:
2171-
esm_worker.stop()
21722180
event_source_mapping["State"] = EsmState.DISABLING
2181+
else:
2182+
event_source_mapping["State"] = EsmState.UPDATING
2183+
2184+
# To ensure parity, certain responses need to be immediately returned
2185+
temp_params["State"] = event_source_mapping["State"]
21732186

21742187
state.event_source_mappings[uuid] = event_source_mapping
2188+
2189+
# TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2190+
function_version = get_function_version_from_arn(function_arn)
2191+
function_role = function_version.config.role
2192+
worker_factory = EsmWorkerFactory(
2193+
event_source_mapping, function_role, request.get("Enabled", esm_worker.enabled)
2194+
)
2195+
2196+
# Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2197+
updated_esm_worker = worker_factory.get_esm_worker()
2198+
self.esm_workers[uuid] = updated_esm_worker
2199+
2200+
# We should stop() the worker since the delete() will remove the ESM from the state mapping.
2201+
esm_worker.stop()
2202+
# This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2203+
updated_esm_worker.create()
2204+
21752205
return {**event_source_mapping, **temp_params}
21762206

21772207
def delete_event_source_mapping(

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py

Lines changed: 33 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -952,14 +952,11 @@ def test_fifo_message_group_parallelism(
952952
],
953953
)
954954
class TestSQSEventSourceMapping:
955-
# TODO refactor
956-
@pytest.mark.skipif(
957-
is_v2_esm(), reason="ESM v2 does not implement create and update validations yet"
958-
)
959955
@markers.aws.validated
960956
def test_event_source_mapping_default_batch_size(
961957
self,
962958
create_lambda_function,
959+
create_event_source_mapping,
963960
sqs_create_queue,
964961
sqs_get_queue_arn,
965962
lambda_su_role,
@@ -973,47 +970,42 @@ def test_event_source_mapping_default_batch_size(
973970
queue_url_1 = sqs_create_queue(QueueName=queue_name_1)
974971
queue_arn_1 = sqs_get_queue_arn(queue_url_1)
975972

976-
try:
977-
create_lambda_function(
978-
func_name=function_name,
979-
handler_file=TEST_LAMBDA_PYTHON_ECHO,
980-
runtime=Runtime.python3_12,
981-
role=lambda_su_role,
973+
create_lambda_function(
974+
func_name=function_name,
975+
handler_file=TEST_LAMBDA_PYTHON_ECHO,
976+
runtime=Runtime.python3_12,
977+
role=lambda_su_role,
978+
)
979+
980+
rs = create_event_source_mapping(EventSourceArn=queue_arn_1, FunctionName=function_name)
981+
snapshot.match("create-event-source-mapping", rs)
982+
983+
uuid = rs["UUID"]
984+
assert DEFAULT_SQS_BATCH_SIZE == rs["BatchSize"]
985+
_await_event_source_mapping_enabled(aws_client.lambda_, uuid)
986+
987+
with pytest.raises(ClientError) as e:
988+
# Update batch size with invalid value
989+
rs = aws_client.lambda_.update_event_source_mapping(
990+
UUID=uuid,
991+
FunctionName=function_name,
992+
BatchSize=MAX_SQS_BATCH_SIZE_FIFO + 1,
982993
)
994+
snapshot.match("invalid-update-event-source-mapping", e.value.response)
995+
e.match(InvalidParameterValueException.code)
996+
997+
queue_url_2 = sqs_create_queue(QueueName=queue_name_2)
998+
queue_arn_2 = sqs_get_queue_arn(queue_url_2)
983999

1000+
with pytest.raises(ClientError) as e:
1001+
# Create event source mapping with invalid batch size value
9841002
rs = aws_client.lambda_.create_event_source_mapping(
985-
EventSourceArn=queue_arn_1, FunctionName=function_name
1003+
EventSourceArn=queue_arn_2,
1004+
FunctionName=function_name,
1005+
BatchSize=MAX_SQS_BATCH_SIZE_FIFO + 1,
9861006
)
987-
snapshot.match("create-event-source-mapping", rs)
988-
989-
uuid = rs["UUID"]
990-
assert DEFAULT_SQS_BATCH_SIZE == rs["BatchSize"]
991-
_await_event_source_mapping_enabled(aws_client.lambda_, uuid)
992-
993-
with pytest.raises(ClientError) as e:
994-
# Update batch size with invalid value
995-
rs = aws_client.lambda_.update_event_source_mapping(
996-
UUID=uuid,
997-
FunctionName=function_name,
998-
BatchSize=MAX_SQS_BATCH_SIZE_FIFO + 1,
999-
)
1000-
snapshot.match("invalid-update-event-source-mapping", e.value.response)
1001-
e.match(InvalidParameterValueException.code)
1002-
1003-
queue_url_2 = sqs_create_queue(QueueName=queue_name_2)
1004-
queue_arn_2 = sqs_get_queue_arn(queue_url_2)
1005-
1006-
with pytest.raises(ClientError) as e:
1007-
# Create event source mapping with invalid batch size value
1008-
rs = aws_client.lambda_.create_event_source_mapping(
1009-
EventSourceArn=queue_arn_2,
1010-
FunctionName=function_name,
1011-
BatchSize=MAX_SQS_BATCH_SIZE_FIFO + 1,
1012-
)
1013-
snapshot.match("invalid-create-event-source-mapping", e.value.response)
1014-
e.match(InvalidParameterValueException.code)
1015-
finally:
1016-
aws_client.lambda_.delete_event_source_mapping(UUID=uuid)
1007+
snapshot.match("invalid-create-event-source-mapping", e.value.response)
1008+
e.match(InvalidParameterValueException.code)
10171009

10181010
@markers.aws.validated
10191011
def test_sqs_event_source_mapping(
@@ -1244,9 +1236,6 @@ def test_sqs_invalid_event_filter(
12441236
snapshot.match("create_event_source_mapping_exception", expected.value.response)
12451237
expected.match(InvalidParameterValueException.code)
12461238

1247-
@pytest.mark.skipif(
1248-
is_v2_esm(), reason="ESM v2 does not yet implement update_event_source_mapping properly"
1249-
)
12501239
@markers.aws.validated
12511240
def test_sqs_event_source_mapping_update(
12521241
self,

0 commit comments

Comments
 (0)