Skip to content

ESM v2: Add EventSourceMappingArn field #11675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,50 @@
DestinationConfig,
EventSourceMappingConfiguration,
EventSourcePosition,
RequestContext,
)
from localstack.services.lambda_ import hooks as lambda_hooks
from localstack.services.lambda_.event_source_mapping.esm_worker import EsmState, EsmStateReason
from localstack.services.lambda_.event_source_mapping.pipe_utils import (
get_standardized_service_name,
)
from localstack.utils.aws.arns import parse_arn
from localstack.utils.aws.arns import lambda_event_source_mapping_arn, parse_arn
from localstack.utils.collections import merge_recursive
from localstack.utils.strings import long_uid


class EsmConfigFactory:
request: CreateEventSourceMappingRequest
context: RequestContext
function_arn: str

def __init__(self, request: CreateEventSourceMappingRequest, function_arn: str):
def __init__(
self, request: CreateEventSourceMappingRequest, context: RequestContext, function_arn: str
):
self.request = request
self.function_arn = function_arn
self.context = context

def get_esm_config(self) -> EventSourceMappingConfiguration:
"""Creates an Event Source Mapping (ESM) configuration based on a create ESM request.
* CreateEventSourceMapping API: https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html
* CreatePipe API: https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_CreatePipe.html
The CreatePipe API covers largely the same parameters, but is better structured using hierarchical parameters.
"""

service = ""
if source_arn := self.request.get("EventSourceArn"):
parsed_arn = parse_arn(source_arn)
service = get_standardized_service_name(parsed_arn["service"])

uuid = long_uid()

default_source_parameters = {}
default_source_parameters["UUID"] = long_uid()
default_source_parameters["UUID"] = uuid
default_source_parameters["EventSourceMappingArn"] = lambda_event_source_mapping_arn(
uuid, self.context.account_id, self.context.region
)
default_source_parameters["StateTransitionReason"] = EsmStateReason.USER_ACTION

if service == "sqs":
default_source_parameters["BatchSize"] = 10
default_source_parameters["MaximumBatchingWindowInSeconds"] = 0
Expand Down
7 changes: 5 additions & 2 deletions localstack-core/localstack/services/lambda_/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
from localstack.services.plugins import ServiceLifecycleHook
from localstack.state import StateVisitor
from localstack.utils.aws.arns import extract_service_from_arn, get_partition
from localstack.utils.aws.arns import (
extract_service_from_arn,
get_partition,
)
from localstack.utils.bootstrap import is_api_enabled
from localstack.utils.collections import PaginatedList
from localstack.utils.files import load_file
Expand Down Expand Up @@ -1859,7 +1862,7 @@ def create_event_source_mapping_v2(
# Validations
function_arn, function_name, state = self.validate_event_source_mapping(context, request)

esm_config = EsmConfigFactory(request, function_arn).get_esm_config()
esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()

# Copy esm_config to avoid a race condition with potential async update in the store
state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()
Expand Down
5 changes: 5 additions & 0 deletions localstack-core/localstack/utils/aws/arns.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ def lambda_code_signing_arn(code_signing_id: str, account_id: str, region_name:
return _resource_arn(code_signing_id, pattern, account_id=account_id, region_name=region_name)


def lambda_event_source_mapping_arn(uuid: str, account_id: str, region_name: str) -> str:
pattern = "arn:%s:lambda:%s:%s:event-source-mapping:%s"
return _resource_arn(uuid, pattern, account_id=account_id, region_name=region_name)


def lambda_function_or_layer_arn(
type: str,
entity_name: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ def _assert_single_lambda_call():


@markers.snapshot.skip_snapshot_verify(
["$..EventSourceMappings..FunctionArn", "$..EventSourceMappings..LastProcessingResult"]
[
"$..EventSourceMappings..FunctionArn",
"$..EventSourceMappings..LastProcessingResult",
]
)
@markers.aws.validated
def test_lambda_w_dynamodb_event_filter_update(deploy_cfn_template, snapshot, aws_client):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@
}
},
"tests/aws/services/cloudformation/resources/test_lambda.py::TestCfnLambdaIntegrations::test_cfn_lambda_dynamodb_source": {
"recorded-date": "09-04-2024, 07:34:20",
"recorded-date": "12-10-2024, 10:46:17",
"recorded-content": {
"stack_resources": {
"StackResources": [
Expand All @@ -846,7 +846,7 @@
"StackResourceDriftStatus": "NOT_CHECKED"
},
"LogicalResourceId": "fnDynamoDBEventSourceLambdaDynamodbSourceStacktable153BBA79064FDF1D",
"PhysicalResourceId": "<uuid:1>",
"PhysicalResourceId": "<resource:6>",
"ResourceStatus": "CREATE_COMPLETE",
"ResourceType": "AWS::Lambda::EventSourceMapping",
"StackId": "arn:<partition>:cloudformation:<region>:111111111111:stack/<stack-name:1>/<resource:1>",
Expand Down Expand Up @@ -882,7 +882,7 @@
"StackResourceDriftStatus": "NOT_CHECKED"
},
"LogicalResourceId": "table8235A42E",
"PhysicalResourceId": "<resource:6>",
"PhysicalResourceId": "<resource:7>",
"ResourceStatus": "CREATE_COMPLETE",
"ResourceType": "AWS::DynamoDB::Table",
"StackId": "arn:<partition>:cloudformation:<region>:111111111111:stack/<stack-name:1>/<resource:1>",
Expand Down Expand Up @@ -912,7 +912,7 @@
"dynamodb:GetShardIterator"
],
"Effect": "Allow",
"Resource": "arn:<partition>:dynamodb:<region>:111111111111:table/<resource:6>/stream/<resource:2>"
"Resource": "arn:<partition>:dynamodb:<region>:111111111111:table/<resource:7>/stream/<resource:2>"
}
],
"Version": "2012-10-17"
Expand Down Expand Up @@ -949,7 +949,7 @@
},
"MemorySize": 128,
"PackageType": "Zip",
"RevisionId": "<uuid:2>",
"RevisionId": "<uuid:1>",
"Role": "arn:<partition>:iam::111111111111:role/<resource:4>",
"Runtime": "python3.9",
"RuntimeVersionConfig": {
Expand Down Expand Up @@ -982,7 +982,8 @@
"DestinationConfig": {
"OnFailure": {}
},
"EventSourceArn": "arn:<partition>:dynamodb:<region>:111111111111:table/<resource:6>/stream/<resource:2>",
"EventSourceArn": "arn:<partition>:dynamodb:<region>:111111111111:table/<resource:7>/stream/<resource:2>",
"EventSourceMappingArn": "arn:<partition>:lambda:<region>:111111111111:event-source-mapping:<resource:6>",
"FunctionArn": "arn:<partition>:lambda:<region>:111111111111:function:<resource:3>",
"FunctionResponseTypes": [],
"LastModified": "datetime",
Expand All @@ -995,7 +996,7 @@
"State": "Enabled",
"StateTransitionReason": "User action",
"TumblingWindowInSeconds": 0,
"UUID": "<uuid:1>",
"UUID": "<resource:6>",
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 200
Expand All @@ -1018,7 +1019,7 @@
"KeyType": "HASH"
}
],
"LatestStreamArn": "arn:<partition>:dynamodb:<region>:111111111111:table/<resource:6>/stream/<resource:2>",
"LatestStreamArn": "arn:<partition>:dynamodb:<region>:111111111111:table/<resource:7>/stream/<resource:2>",
"LatestStreamLabel": "<resource:2>",
"ProvisionedThroughput": {
"NumberOfDecreasesToday": 0,
Expand All @@ -1029,9 +1030,9 @@
"StreamEnabled": true,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"TableArn": "arn:<partition>:dynamodb:<region>:111111111111:table/<resource:6>",
"TableId": "<uuid:3>",
"TableName": "<resource:6>",
"TableArn": "arn:<partition>:dynamodb:<region>:111111111111:table/<resource:7>",
"TableId": "<uuid:2>",
"TableName": "<resource:7>",
"TableSizeBytes": 0,
"TableStatus": "ACTIVE"
},
Expand All @@ -1057,11 +1058,11 @@
"ShardId": "shard-id"
}
],
"StreamArn": "arn:<partition>:dynamodb:<region>:111111111111:table/<resource:6>/stream/<resource:2>",
"StreamArn": "arn:<partition>:dynamodb:<region>:111111111111:table/<resource:7>/stream/<resource:2>",
"StreamLabel": "<resource:2>",
"StreamStatus": "ENABLED",
"StreamViewType": "NEW_AND_OLD_IMAGES",
"TableName": "<resource:6>"
"TableName": "<resource:7>"
},
"ResponseMetadata": {
"HTTPHeaders": {},
Expand Down Expand Up @@ -1122,7 +1123,7 @@
}
},
"tests/aws/services/cloudformation/resources/test_lambda.py::TestCfnLambdaIntegrations::test_cfn_lambda_kinesis_source": {
"recorded-date": "09-04-2024, 07:37:55",
"recorded-date": "12-10-2024, 10:52:28",
"recorded-content": {
"stack_resources": {
"StackResources": [
Expand All @@ -1143,7 +1144,7 @@
"StackResourceDriftStatus": "NOT_CHECKED"
},
"LogicalResourceId": "fnKinesisEventSourceLambdaKinesisSourceStackstream996A3395ED86A30E",
"PhysicalResourceId": "<uuid:1>",
"PhysicalResourceId": "<resource:6>",
"ResourceStatus": "CREATE_COMPLETE",
"ResourceType": "AWS::Lambda::EventSourceMapping",
"StackId": "arn:<partition>:cloudformation:<region>:111111111111:stack/<stack-name:1>/<resource:1>",
Expand Down Expand Up @@ -1250,7 +1251,7 @@
},
"MemorySize": 128,
"PackageType": "Zip",
"RevisionId": "<uuid:2>",
"RevisionId": "<uuid:1>",
"Role": "arn:<partition>:iam::111111111111:role/<resource:4>",
"Runtime": "python3.9",
"RuntimeVersionConfig": {
Expand Down Expand Up @@ -1284,6 +1285,7 @@
"OnFailure": {}
},
"EventSourceArn": "arn:<partition>:kinesis:<region>:111111111111:stream/<resource:2>",
"EventSourceMappingArn": "arn:<partition>:lambda:<region>:111111111111:event-source-mapping:<resource:6>",
"FunctionArn": "arn:<partition>:lambda:<region>:111111111111:function:<resource:3>",
"FunctionResponseTypes": [],
"LastModified": "datetime",
Expand All @@ -1296,7 +1298,7 @@
"State": "Enabled",
"StateTransitionReason": "User action",
"TumblingWindowInSeconds": 0,
"UUID": "<uuid:1>",
"UUID": "<resource:6>",
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 200
Expand Down Expand Up @@ -1446,7 +1448,7 @@
}
},
"tests/aws/services/cloudformation/resources/test_lambda.py::test_lambda_w_dynamodb_event_filter_update": {
"recorded-date": "11-04-2024, 18:29:12",
"recorded-date": "12-10-2024, 10:42:00",
"recorded-content": {
"source_mappings": {
"EventSourceMappings": [
Expand All @@ -1457,6 +1459,7 @@
"OnFailure": {}
},
"EventSourceArn": "arn:<partition>:dynamodb:<region>:111111111111:table/<table_name>/stream/<resource:1>",
"EventSourceMappingArn": "arn:<partition>:lambda:<region>:111111111111:event-source-mapping:<resource:2>",
"FilterCriteria": {
"Filters": [
{
Expand All @@ -1468,7 +1471,7 @@
}
]
},
"FunctionArn": "arn:<partition>:lambda:<region>:111111111111:function:<resource:2>",
"FunctionArn": "arn:<partition>:lambda:<region>:111111111111:function:<resource:3>",
"FunctionResponseTypes": [],
"LastModified": "datetime",
"LastProcessingResult": "No records processed",
Expand All @@ -1480,7 +1483,7 @@
"State": "Enabled",
"StateTransitionReason": "User action",
"TumblingWindowInSeconds": 0,
"UUID": "<uuid:1>"
"UUID": "<resource:2>"
}
],
"ResponseMetadata": {
Expand All @@ -1497,6 +1500,7 @@
"OnFailure": {}
},
"EventSourceArn": "arn:<partition>:dynamodb:<region>:111111111111:table/<table_name>/stream/<resource:1>",
"EventSourceMappingArn": "arn:<partition>:lambda:<region>:111111111111:event-source-mapping:<resource:2>",
"FilterCriteria": {
"Filters": [
{
Expand All @@ -1508,7 +1512,7 @@
}
]
},
"FunctionArn": "arn:<partition>:lambda:<region>:111111111111:function:<resource:2>",
"FunctionArn": "arn:<partition>:lambda:<region>:111111111111:function:<resource:3>",
"FunctionResponseTypes": [],
"LastModified": "datetime",
"LastProcessingResult": "No records processed",
Expand All @@ -1520,7 +1524,7 @@
"State": "Enabled",
"StateTransitionReason": "User action",
"TumblingWindowInSeconds": 0,
"UUID": "<uuid:1>"
"UUID": "<resource:2>"
}
],
"ResponseMetadata": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"tests/aws/services/cloudformation/resources/test_lambda.py::TestCfnLambdaIntegrations::test_cfn_lambda_dynamodb_source": {
"last_validated_date": "2024-04-09T07:34:20+00:00"
"last_validated_date": "2024-10-12T10:46:17+00:00"
},
"tests/aws/services/cloudformation/resources/test_lambda.py::TestCfnLambdaIntegrations::test_cfn_lambda_kinesis_source": {
"last_validated_date": "2024-04-09T07:37:55+00:00"
"last_validated_date": "2024-10-12T10:52:28+00:00"
},
"tests/aws/services/cloudformation/resources/test_lambda.py::TestCfnLambdaIntegrations::test_cfn_lambda_permissions": {
"last_validated_date": "2024-04-09T07:26:03+00:00"
Expand Down Expand Up @@ -39,7 +39,7 @@
"last_validated_date": "2024-04-09T07:21:37+00:00"
},
"tests/aws/services/cloudformation/resources/test_lambda.py::test_lambda_w_dynamodb_event_filter_update": {
"last_validated_date": "2024-04-11T18:29:12+00:00"
"last_validated_date": "2024-10-12T10:42:00+00:00"
},
"tests/aws/services/cloudformation/resources/test_lambda.py::test_multiple_lambda_permissions_for_singlefn": {
"last_validated_date": "2024-04-09T07:25:05+00:00"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ def _get_lambda_logs_event(function_name, expected_num_events, retries=30):
return _get_lambda_logs_event


@markers.snapshot.skip_snapshot_verify(
condition=is_old_esm,
# Only match EventSourceMappingArn field if ESM v2 and above
paths=["$..EventSourceMappingArn"],
)
@markers.snapshot.skip_snapshot_verify(
condition=is_v2_esm,
paths=[
Expand Down Expand Up @@ -979,7 +984,6 @@ def test_dynamodb_report_batch_item_failure_scenarios(
):
snapshot.add_transformer(snapshot.transform.key_value("MD5OfBody"))
snapshot.add_transformer(snapshot.transform.key_value("ReceiptHandle"))
snapshot.add_transformer(snapshot.transform.key_value("startSequenceNumber"))

function_name = f"lambda_func-{short_uid()}"
table_name = f"test-table-{short_uid()}"
Expand Down
Loading
Loading