Skip to content

Commit e5e5bc1

Browse files
committed
feat: add test put events target step functions with input transformer
1 parent 31b4c29 commit e5e5bc1

File tree

3 files changed

+141
-0
lines changed

3 files changed

+141
-0
lines changed

tests/aws/services/events/test_events_targets.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1428,3 +1428,114 @@ def _assert_messages():
14281428
assert len(messages) > 0
14291429

14301430
retry(_assert_messages, **retry_config)
1431+
1432+
@markers.aws.validated
1433+
@pytest.mark.skipif(is_old_provider(), reason="not supported by the old provider")
1434+
def test_put_events_with_target_statefunction_machine_and_input_transformer(
1435+
self, infrastructure_setup, aws_client, snapshot
1436+
):
1437+
infra = infrastructure_setup(namespace="EventsTests")
1438+
stack_name = "stack-events-target-stepfunctions"
1439+
stack = cdk.Stack(infra.cdk_app, stack_name=stack_name)
1440+
1441+
bus_name = "MyEventBus"
1442+
bus = cdk.aws_events.EventBus(stack, "MyEventBus", event_bus_name=bus_name)
1443+
1444+
queue = cdk.aws_sqs.Queue(stack, "MyQueue", queue_name="MyQueue")
1445+
1446+
send_to_sqs_task = cdk.aws_stepfunctions_tasks.SqsSendMessage(
1447+
stack,
1448+
"SendToQueue",
1449+
queue=queue,
1450+
message_body=cdk.aws_stepfunctions.TaskInput.from_object(
1451+
{"message": cdk.aws_stepfunctions.JsonPath.entire_payload}
1452+
),
1453+
)
1454+
1455+
state_machine = cdk.aws_stepfunctions.StateMachine(
1456+
stack,
1457+
"MyStateMachine",
1458+
definition=send_to_sqs_task,
1459+
state_machine_name="MyStateMachine",
1460+
)
1461+
1462+
detail_type = "myDetailType"
1463+
rule = cdk.aws_events.Rule(
1464+
stack,
1465+
"MyRule",
1466+
event_bus=bus,
1467+
event_pattern=cdk.aws_events.EventPattern(detail_type=[detail_type]),
1468+
)
1469+
1470+
input_transformer_property = cdk.aws_events.CfnRule.InputTransformerProperty(
1471+
input_template="Message with key <detail-key>",
1472+
input_paths_map={"detail-key": "$.detail.Key1"},
1473+
)
1474+
rule.add_target(
1475+
cdk.aws_events_targets.SfnStateMachine(
1476+
state_machine,
1477+
input=cdk.aws_events.RuleTargetInput.from_object(input_transformer_property),
1478+
)
1479+
)
1480+
1481+
cdk.CfnOutput(stack, "MachineArn", value=state_machine.state_machine_arn)
1482+
cdk.CfnOutput(stack, "QueueUrl", value=queue.queue_url)
1483+
1484+
with infra.provisioner() as prov:
1485+
outputs = prov.get_stack_outputs(stack_name=stack_name)
1486+
1487+
entries = [
1488+
{
1489+
"Source": "com.sample.resource",
1490+
"DetailType": detail_type,
1491+
"Detail": json.dumps({"Key1": "Value"}),
1492+
"EventBusName": bus_name,
1493+
}
1494+
for i in range(5)
1495+
]
1496+
put_events = aws_client.events.put_events(Entries=entries)
1497+
1498+
state_machine_arn = outputs["MachineArn"]
1499+
1500+
def _assert_executions():
1501+
executions = (
1502+
aws_client.stepfunctions.get_paginator("list_executions")
1503+
.paginate(stateMachineArn=state_machine_arn)
1504+
.build_full_result()
1505+
)
1506+
assert len(executions["executions"]) > 0
1507+
1508+
matched_executions = [
1509+
e
1510+
for e in executions["executions"]
1511+
if e["name"].startswith(put_events["Entries"][0]["EventId"])
1512+
]
1513+
assert len(matched_executions) > 0
1514+
1515+
retry_config = {
1516+
"retries": (20 if is_aws_cloud() else 5),
1517+
"sleep": (2 if is_aws_cloud() else 1),
1518+
"sleep_before": (2 if is_aws_cloud() else 0),
1519+
}
1520+
retry(_assert_executions, **retry_config)
1521+
1522+
messages = []
1523+
queue_url = outputs["QueueUrl"]
1524+
1525+
def _get_messages():
1526+
queue_msgs = aws_client.sqs.receive_message(QueueUrl=queue_url)
1527+
for msg in queue_msgs.get("Messages", []):
1528+
messages.append(msg)
1529+
1530+
assert len(messages) > 0
1531+
return messages
1532+
1533+
messages = retry(_get_messages, **retry_config)
1534+
1535+
snapshot.add_transformers_list(
1536+
[
1537+
snapshot.transform.key_value("ReceiptHandle", reference_replacement=False),
1538+
snapshot.transform.key_value("MD5OfBody", reference_replacement=False),
1539+
]
1540+
)
1541+
snapshot.match("messages", messages)

tests/aws/services/events/test_events_targets.snapshot.json

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -932,5 +932,32 @@
932932
}
933933
}
934934
}
935+
},
936+
"tests/aws/services/events/test_events_targets.py::TestEventsTargetStepFunctions::test_put_events_with_target_statefunction_machine_and_input_transformer": {
937+
"recorded-date": "03-01-2025, 15:19:29",
938+
"recorded-content": {
939+
"messages": [
940+
{
941+
"MessageId": "<uuid:1>",
942+
"ReceiptHandle": "receipt-handle",
943+
"MD5OfBody": "m-d5-of-body",
944+
"Body": {
945+
"message": {
946+
"version": "0",
947+
"id": "<uuid:2>",
948+
"detail-type": "myDetailType",
949+
"source": "com.sample.resource",
950+
"account": "111111111111",
951+
"time": "date",
952+
"region": "<region>",
953+
"resources": [],
954+
"detail": {
955+
"Key1": "Value"
956+
}
957+
}
958+
}
959+
}
960+
]
961+
}
935962
}
936963
}

tests/aws/services/events/test_events_targets.validation.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@
5353
"tests/aws/services/events/test_events_targets.py::TestEventsTargetStepFunctions::test_put_events_with_target_statefunction_machine": {
5454
"last_validated_date": "2024-08-29T18:06:56+00:00"
5555
},
56+
"tests/aws/services/events/test_events_targets.py::TestEventsTargetStepFunctions::test_put_events_with_target_statefunction_machine_and_input_transformer": {
57+
"last_validated_date": "2025-01-03T15:19:29+00:00"
58+
},
5659
"tests/aws/services/events/test_events_targets.py::test_put_events_with_target_lambda_list_entries_partial_match": {
5760
"last_validated_date": "2024-04-08T17:36:24+00:00"
5861
},

0 commit comments

Comments
 (0)