@@ -1951,6 +1951,7 @@ def create_event_source_mapping_v1(
1951
1951
1952
1952
def validate_event_source_mapping (self , context , request ):
1953
1953
# TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work
1954
+ is_create_esm_request = context .operation .name == self .create_event_source_mapping .operation
1954
1955
1955
1956
service = None
1956
1957
if "SelfManagedEventSource" in request :
@@ -1963,12 +1964,22 @@ def validate_event_source_mapping(self, context, request):
1963
1964
raise InvalidParameterValueException ("Unrecognized event source." , Type = "User" )
1964
1965
if service is None :
1965
1966
service = extract_service_from_arn (request ["EventSourceArn" ])
1967
+
1968
+ batch_size = api_utils .validate_and_set_batch_size (service , request .get ("BatchSize" ))
1966
1969
if service in ["dynamodb" , "kinesis" ] and "StartingPosition" not in request :
1967
1970
raise InvalidParameterValueException (
1968
1971
"1 validation error detected: Value null at 'startingPosition' failed to satisfy constraint: Member must not be null." ,
1969
1972
Type = "User" ,
1970
1973
)
1971
- request_function_name = request ["FunctionName" ]
1974
+ if service in ["sqs" , "sqs-fifo" ]:
1975
+ if batch_size > 10 and request .get ("MaximumBatchingWindowInSeconds" , 0 ) == 0 :
1976
+ raise InvalidParameterValueException (
1977
+ "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10" ,
1978
+ Type = "User" ,
1979
+ )
1980
+ # Can either have a FunctionName (i.e CreateEventSourceMapping request) or
1981
+ # an internal EventSourceMappingConfiguration representation
1982
+ request_function_name = request .get ("FunctionName" ) or request .get ("FunctionArn" )
1972
1983
# can be either a partial arn or a full arn for the version/alias
1973
1984
function_name , qualifier , account , region = function_locators_from_arn (
1974
1985
request_function_name
@@ -1999,48 +2010,51 @@ def validate_event_source_mapping(self, context, request):
1999
2010
else :
2000
2011
fn_arn = api_utils .unqualified_lambda_arn (function_name , account , region )
2001
2012
2002
- def _get_mapping_sources (mapping : dict [str , Any ]) -> list [str ]:
2003
- if event_source_arn := mapping .get ("EventSourceArn" ):
2004
- return [event_source_arn ]
2005
- return (
2006
- mapping .get ("SelfManagedEventSource" , {})
2007
- .get ("Endpoints" , {})
2008
- .get ("KAFKA_BOOTSTRAP_SERVERS" , [])
2009
- )
2010
-
2011
- # check for event source duplicates
2012
- # TODO: currently validated for sqs, kinesis, and dynamodb
2013
- service_id = load_service (service ).service_id
2014
- for uuid , mapping in state .event_source_mappings .items ():
2015
- mapping_sources = _get_mapping_sources (mapping )
2016
- request_sources = _get_mapping_sources (request )
2017
- if mapping ["FunctionArn" ] == fn_arn and (
2018
- set (mapping_sources ).intersection (request_sources )
2019
- ):
2020
- if service == "sqs" :
2021
- # *shakes fist at SQS*
2022
- raise ResourceConflictException (
2023
- f'An event source mapping with { service_id } arn (" { mapping ["EventSourceArn" ]} ") '
2024
- f'and function (" { function_name } ") already exists. Please update or delete the '
2025
- f"existing mapping with UUID { uuid } " ,
2026
- Type = "User" ,
2027
- )
2028
- elif service == "kafka" :
2029
- if set (mapping ["Topics" ]).intersection (request ["Topics" ]):
2013
+ # Check we are validating a CreateEventSourceMapping request
2014
+ if is_create_esm_request :
2015
+
2016
+ def _get_mapping_sources (mapping : dict [str , Any ]) -> list [str ]:
2017
+ if event_source_arn := mapping .get ("EventSourceArn" ):
2018
+ return [event_source_arn ]
2019
+ return (
2020
+ mapping .get ("SelfManagedEventSource" , {})
2021
+ .get ("Endpoints" , {})
2022
+ .get ("KAFKA_BOOTSTRAP_SERVERS" , [])
2023
+ )
2024
+
2025
+ # check for event source duplicates
2026
+ # TODO: currently validated for sqs, kinesis, and dynamodb
2027
+ service_id = load_service (service ).service_id
2028
+ for uuid , mapping in state .event_source_mappings .items ():
2029
+ mapping_sources = _get_mapping_sources (mapping )
2030
+ request_sources = _get_mapping_sources (request )
2031
+ if mapping ["FunctionArn" ] == fn_arn and (
2032
+ set (mapping_sources ).intersection (request_sources )
2033
+ ):
2034
+ if service == "sqs" :
2035
+ # *shakes fist at SQS*
2030
2036
raise ResourceConflictException (
2031
- f'An event source mapping with event source ("{ "," .join (request_sources )} "), '
2032
- f'function ("{ fn_arn } "), '
2033
- f'topics ("{ "," .join (request ["Topics" ])} ") already exists. Please update or delete the '
2037
+ f'An event source mapping with { service_id } arn (" { mapping ["EventSourceArn" ]} ") '
2038
+ f'and function (" { function_name } ") already exists. Please update or delete the '
2039
+ f"existing mapping with UUID { uuid } " ,
2040
+ Type = "User" ,
2041
+ )
2042
+ elif service == "kafka" :
2043
+ if set (mapping ["Topics" ]).intersection (request ["Topics" ]):
2044
+ raise ResourceConflictException (
2045
+ f'An event source mapping with event source ("{ "," .join (request_sources )} "), '
2046
+ f'function ("{ fn_arn } "), '
2047
+ f'topics ("{ "," .join (request ["Topics" ])} ") already exists. Please update or delete the '
2048
+ f"existing mapping with UUID { uuid } " ,
2049
+ Type = "User" ,
2050
+ )
2051
+ else :
2052
+ raise ResourceConflictException (
2053
+ f'The event source arn (" { mapping ["EventSourceArn" ]} ") and function '
2054
+ f'(" { function_name } ") provided mapping already exists. Please update or delete the '
2034
2055
f"existing mapping with UUID { uuid } " ,
2035
2056
Type = "User" ,
2036
2057
)
2037
- else :
2038
- raise ResourceConflictException (
2039
- f'The event source arn (" { mapping ["EventSourceArn" ]} ") and function '
2040
- f'(" { function_name } ") provided mapping already exists. Please update or delete the '
2041
- f"existing mapping with UUID { uuid } " ,
2042
- Type = "User" ,
2043
- )
2044
2058
return fn_arn , function_name , state
2045
2059
2046
2060
@handler ("UpdateEventSourceMapping" , expand = False )
@@ -2141,37 +2155,53 @@ def update_event_source_mapping_v2(
2141
2155
"The resource you requested does not exist." , Type = "User"
2142
2156
) # TODO: test?
2143
2157
2144
- # remove the FunctionName field
2145
- function_name_or_arn = request_data .pop ("FunctionName" , None )
2146
-
2147
2158
# normalize values to overwrite
2148
2159
event_source_mapping = old_event_source_mapping | request_data
2149
2160
2150
- if function_name_or_arn :
2151
- # if the FunctionName field was present, update the FunctionArn of the EventSourceMapping
2152
- account_id , region = api_utils .get_account_and_region (function_name_or_arn , context )
2153
- function_name , qualifier = api_utils .get_name_and_qualifier (
2154
- function_name_or_arn , None , context
2155
- )
2156
- event_source_mapping ["FunctionArn" ] = api_utils .qualified_lambda_arn (
2157
- function_name , qualifier , account_id , region
2158
- )
2159
-
2160
2161
temp_params = {} # values only set for the returned response, not saved internally (e.g. transient state)
2161
2162
2163
+ # Validate the newly updated ESM object. We ignore the output here since we only care whether an Exception is raised.
2164
+ function_arn , _ , _ = self .validate_event_source_mapping (context , event_source_mapping )
2165
+
2166
+ # remove the FunctionName field
2167
+ event_source_mapping .pop ("FunctionName" , None )
2168
+
2169
+ if function_arn :
2170
+ event_source_mapping ["FunctionArn" ] = function_arn
2171
+
2162
2172
esm_worker = self .esm_workers [uuid ]
2163
2173
# Only apply update if the desired state differs
2164
2174
enabled = request .get ("Enabled" )
2165
2175
if enabled is not None :
2166
2176
if enabled and old_event_source_mapping ["State" ] != EsmState .ENABLED :
2167
- esm_worker .start ()
2168
2177
event_source_mapping ["State" ] = EsmState .ENABLING
2169
2178
# TODO: What happens when trying to update during an update or failed state?!
2170
2179
elif not enabled and old_event_source_mapping ["State" ] == EsmState .ENABLED :
2171
- esm_worker .stop ()
2172
2180
event_source_mapping ["State" ] = EsmState .DISABLING
2181
+ else :
2182
+ event_source_mapping ["State" ] = EsmState .UPDATING
2183
+
2184
+ # To ensure parity, certain responses need to be immediately returned
2185
+ temp_params ["State" ] = event_source_mapping ["State" ]
2173
2186
2174
2187
state .event_source_mappings [uuid ] = event_source_mapping
2188
+
2189
+ # TODO: Currently, we re-create the entire ESM worker. Look into approach with better performance.
2190
+ function_version = get_function_version_from_arn (function_arn )
2191
+ function_role = function_version .config .role
2192
+ worker_factory = EsmWorkerFactory (
2193
+ event_source_mapping , function_role , request .get ("Enabled" , esm_worker .enabled )
2194
+ )
2195
+
2196
+ # Get a new ESM worker object but do not active it, since the factory holds all logic for creating new worker from configuration.
2197
+ updated_esm_worker = worker_factory .get_esm_worker ()
2198
+ self .esm_workers [uuid ] = updated_esm_worker
2199
+
2200
+ # We should stop() the worker since the delete() will remove the ESM from the state mapping.
2201
+ esm_worker .stop ()
2202
+ # This will either create an EsmWorker in the CREATING state if enabled. Otherwise, the DISABLING state is set.
2203
+ updated_esm_worker .create ()
2204
+
2175
2205
return {** event_source_mapping , ** temp_params }
2176
2206
2177
2207
def delete_event_source_mapping (
0 commit comments