@@ -1428,3 +1428,114 @@ def _assert_messages():
1428
1428
assert len (messages ) > 0
1429
1429
1430
1430
retry (_assert_messages , ** retry_config )
1431
+
1432
+ @markers .aws .validated
1433
+ @pytest .mark .skipif (is_old_provider (), reason = "not supported by the old provider" )
1434
+ def test_put_events_with_target_statefunction_machine_and_input_transformer (
1435
+ self , infrastructure_setup , aws_client , snapshot
1436
+ ):
1437
+ infra = infrastructure_setup (namespace = "EventsTests" )
1438
+ stack_name = "stack-events-target-stepfunctions"
1439
+ stack = cdk .Stack (infra .cdk_app , stack_name = stack_name )
1440
+
1441
+ bus_name = "MyEventBus"
1442
+ bus = cdk .aws_events .EventBus (stack , "MyEventBus" , event_bus_name = bus_name )
1443
+
1444
+ queue = cdk .aws_sqs .Queue (stack , "MyQueue" , queue_name = "MyQueue" )
1445
+
1446
+ send_to_sqs_task = cdk .aws_stepfunctions_tasks .SqsSendMessage (
1447
+ stack ,
1448
+ "SendToQueue" ,
1449
+ queue = queue ,
1450
+ message_body = cdk .aws_stepfunctions .TaskInput .from_object (
1451
+ {"message" : cdk .aws_stepfunctions .JsonPath .entire_payload }
1452
+ ),
1453
+ )
1454
+
1455
+ state_machine = cdk .aws_stepfunctions .StateMachine (
1456
+ stack ,
1457
+ "MyStateMachine" ,
1458
+ definition = send_to_sqs_task ,
1459
+ state_machine_name = "MyStateMachine" ,
1460
+ )
1461
+
1462
+ detail_type = "myDetailType"
1463
+ rule = cdk .aws_events .Rule (
1464
+ stack ,
1465
+ "MyRule" ,
1466
+ event_bus = bus ,
1467
+ event_pattern = cdk .aws_events .EventPattern (detail_type = [detail_type ]),
1468
+ )
1469
+
1470
+ input_transformer_property = cdk .aws_events .CfnRule .InputTransformerProperty (
1471
+ input_template = "Message with key <detail-key>" ,
1472
+ input_paths_map = {"detail-key" : "$.detail.Key1" },
1473
+ )
1474
+ rule .add_target (
1475
+ cdk .aws_events_targets .SfnStateMachine (
1476
+ state_machine ,
1477
+ input = cdk .aws_events .RuleTargetInput .from_object (input_transformer_property ),
1478
+ )
1479
+ )
1480
+
1481
+ cdk .CfnOutput (stack , "MachineArn" , value = state_machine .state_machine_arn )
1482
+ cdk .CfnOutput (stack , "QueueUrl" , value = queue .queue_url )
1483
+
1484
+ with infra .provisioner () as prov :
1485
+ outputs = prov .get_stack_outputs (stack_name = stack_name )
1486
+
1487
+ entries = [
1488
+ {
1489
+ "Source" : "com.sample.resource" ,
1490
+ "DetailType" : detail_type ,
1491
+ "Detail" : json .dumps ({"Key1" : "Value" }),
1492
+ "EventBusName" : bus_name ,
1493
+ }
1494
+ for i in range (5 )
1495
+ ]
1496
+ put_events = aws_client .events .put_events (Entries = entries )
1497
+
1498
+ state_machine_arn = outputs ["MachineArn" ]
1499
+
1500
+ def _assert_executions ():
1501
+ executions = (
1502
+ aws_client .stepfunctions .get_paginator ("list_executions" )
1503
+ .paginate (stateMachineArn = state_machine_arn )
1504
+ .build_full_result ()
1505
+ )
1506
+ assert len (executions ["executions" ]) > 0
1507
+
1508
+ matched_executions = [
1509
+ e
1510
+ for e in executions ["executions" ]
1511
+ if e ["name" ].startswith (put_events ["Entries" ][0 ]["EventId" ])
1512
+ ]
1513
+ assert len (matched_executions ) > 0
1514
+
1515
+ retry_config = {
1516
+ "retries" : (20 if is_aws_cloud () else 5 ),
1517
+ "sleep" : (2 if is_aws_cloud () else 1 ),
1518
+ "sleep_before" : (2 if is_aws_cloud () else 0 ),
1519
+ }
1520
+ retry (_assert_executions , ** retry_config )
1521
+
1522
+ messages = []
1523
+ queue_url = outputs ["QueueUrl" ]
1524
+
1525
+ def _get_messages ():
1526
+ queue_msgs = aws_client .sqs .receive_message (QueueUrl = queue_url )
1527
+ for msg in queue_msgs .get ("Messages" , []):
1528
+ messages .append (msg )
1529
+
1530
+ assert len (messages ) > 0
1531
+ return messages
1532
+
1533
+ messages = retry (_get_messages , ** retry_config )
1534
+
1535
+ snapshot .add_transformers_list (
1536
+ [
1537
+ snapshot .transform .key_value ("ReceiptHandle" , reference_replacement = False ),
1538
+ snapshot .transform .key_value ("MD5OfBody" , reference_replacement = False ),
1539
+ ]
1540
+ )
1541
+ snapshot .match ("messages" , messages )
0 commit comments