Skip to content

Commit f27739f

Browse files
authored
Add MessageAttributes to SQS messages when executed via Step Function (#11405)
1 parent e183f75 commit f27739f

File tree

7 files changed

+315
-3
lines changed

7 files changed

+315
-3
lines changed

localstack-core/localstack/services/stepfunctions/asl/component/state/state_execution/state_task/service/state_task_service_sqs.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
_SUPPORTED_API_PARAM_BINDINGS: Final[dict[str, set[str]]] = {
3030
"sendmessage": {
3131
"DelaySeconds",
32-
"MessageAttribute",
32+
"MessageAttributes",
3333
"MessageBody",
3434
"MessageDeduplicationId",
3535
"MessageGroupId",
@@ -75,11 +75,15 @@ def _normalise_response(
7575
boto_service_name=boto_service_name,
7676
service_action_name=service_action_name,
7777
)
78-
# Normalise output value key to SFN standard for Md5OfMessageBody.
78+
# Normalise output value keys to SFN standard for Md5OfMessageBody and Md5OfMessageAttributes
7979
if response and "Md5OfMessageBody" in response:
8080
md5_message_body = response.pop("Md5OfMessageBody")
8181
response["MD5OfMessageBody"] = md5_message_body
8282

83+
if response and "Md5OfMessageAttributes" in response:
84+
md5_message_attributes = response.pop("Md5OfMessageAttributes")
85+
response["MD5OfMessageAttributes"] = md5_message_attributes
86+
8387
def _eval_service_task(
8488
self,
8589
env: Environment,

localstack-core/localstack/testing/snapshots/transformer_utility.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,7 @@ def sfn_sqs_integration():
705705
# Transform MD5OfMessageBody value bindings as in StepFunctions these are not deterministic
706706
# about the input message.
707707
TransformerUtility.key_value("MD5OfMessageBody"),
708+
TransformerUtility.key_value("MD5OfMessageAttributes"),
708709
]
709710

710711
@staticmethod

tests/aws/services/stepfunctions/templates/services/services_templates.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ class ServicesTemplates(TemplateLoader):
5454
SQS_SEND_MESSAGE_AND_WAIT: Final[str] = os.path.join(
5555
_THIS_FOLDER, "statemachines/sqs_send_msg_and_wait.json5"
5656
)
57+
SQS_SEND_MESSAGE_ATTRIBUTES: Final[str] = os.path.join(
58+
_THIS_FOLDER, "statemachines/sqs_send_msg_attributes.json5"
59+
)
5760
SNS_PUBLISH: Final[str] = os.path.join(_THIS_FOLDER, "statemachines/sns_publish.json5")
5861
SNS_FIFO_PUBLISH: Final[str] = os.path.join(
5962
_THIS_FOLDER, "statemachines/sns_fifo_publish.json5"
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"Comment": "SQS_SEND_MESSAGE_ATTRIBUTES",
3+
"StartAt": "SendSQS",
4+
"States": {
5+
"SendSQS": {
6+
"Type": "Task",
7+
"Resource": "arn:aws:states:::sqs:sendMessage",
8+
"Parameters": {
9+
"QueueUrl.$": "$.QueueUrl",
10+
"MessageBody.$": "$.Message",
11+
"MessageAttributes": {
12+
"my_attribute_no_1": {
13+
"DataType": "String",
14+
"StringValue.$": "$.MessageAttributeValue1"
15+
},
16+
"my_attribute_no_2": {
17+
"DataType": "String",
18+
"StringValue.$": "$.MessageAttributeValue2"
19+
}
20+
}
21+
},
22+
"End": true
23+
}
24+
}
25+
}

tests/aws/services/stepfunctions/v2/services/test_sqs_task_service.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,60 @@ def test_send_message_unsupported_parameters(
9595
definition,
9696
exec_input,
9797
)
98+
99+
@markers.aws.validated
100+
def test_send_message_attributes(
101+
self,
102+
aws_client,
103+
create_iam_role_for_sfn,
104+
create_state_machine,
105+
sqs_create_queue,
106+
sfn_snapshot,
107+
):
108+
sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sqs_integration())
109+
110+
queue_name = f"queue-{short_uid()}"
111+
queue_url = sqs_create_queue(QueueName=queue_name)
112+
sfn_snapshot.add_transformer(RegexTransformer(queue_url, "<sqs_queue_url>"))
113+
sfn_snapshot.add_transformer(RegexTransformer(queue_name, "<sqs_queue_name>"))
114+
115+
template = ST.load_sfn_template(ST.SQS_SEND_MESSAGE_ATTRIBUTES)
116+
definition = json.dumps(template)
117+
118+
message_body = "test_message_body"
119+
message_attr_1 = "Hello"
120+
message_attr_2 = "World"
121+
122+
exec_input = json.dumps(
123+
{
124+
"QueueUrl": queue_url,
125+
"Message": message_body,
126+
"MessageAttributeValue1": message_attr_1,
127+
"MessageAttributeValue2": message_attr_2,
128+
}
129+
)
130+
create_and_record_execution(
131+
aws_client.stepfunctions,
132+
create_iam_role_for_sfn,
133+
create_state_machine,
134+
sfn_snapshot,
135+
definition,
136+
exec_input,
137+
)
138+
139+
receive_message_res = aws_client.sqs.receive_message(
140+
QueueUrl=queue_url, MessageAttributeNames=["All"]
141+
)
142+
assert len(receive_message_res["Messages"]) == 1
143+
144+
sqs_message = receive_message_res["Messages"][0]
145+
assert sqs_message["Body"] == message_body
146+
147+
sqs_message_attributes = sqs_message["MessageAttributes"]
148+
assert len(sqs_message_attributes) == 2
149+
150+
assert sqs_message_attributes["my_attribute_no_1"]["StringValue"] == message_attr_1
151+
assert sqs_message_attributes["my_attribute_no_1"]["DataType"] == "String"
152+
153+
assert sqs_message_attributes["my_attribute_no_2"]["StringValue"] == message_attr_2
154+
assert sqs_message_attributes["my_attribute_no_2"]["DataType"] == "String"

tests/aws/services/stepfunctions/v2/services/test_sqs_task_service.snapshot.json

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,5 +408,224 @@
408408
}
409409
}
410410
}
411+
},
412+
"tests/aws/services/stepfunctions/v2/services/test_sqs_task_service.py::TestTaskServiceSqs::test_send_message_attributes": {
413+
"recorded-date": "22-08-2024, 10:04:56",
414+
"recorded-content": {
415+
"get_execution_history": {
416+
"events": [
417+
{
418+
"executionStartedEventDetails": {
419+
"input": {
420+
"QueueUrl": "<sqs_queue_url>",
421+
"Message": "test_message_body",
422+
"MessageAttributeValue1": "Hello",
423+
"MessageAttributeValue2": "World"
424+
},
425+
"inputDetails": {
426+
"truncated": false
427+
},
428+
"roleArn": "arn:<partition>:iam::111111111111:role/<resource:1>"
429+
},
430+
"id": 1,
431+
"previousEventId": 0,
432+
"timestamp": "timestamp",
433+
"type": "ExecutionStarted"
434+
},
435+
{
436+
"id": 2,
437+
"previousEventId": 0,
438+
"stateEnteredEventDetails": {
439+
"input": {
440+
"QueueUrl": "<sqs_queue_url>",
441+
"Message": "test_message_body",
442+
"MessageAttributeValue1": "Hello",
443+
"MessageAttributeValue2": "World"
444+
},
445+
"inputDetails": {
446+
"truncated": false
447+
},
448+
"name": "SendSQS"
449+
},
450+
"timestamp": "timestamp",
451+
"type": "TaskStateEntered"
452+
},
453+
{
454+
"id": 3,
455+
"previousEventId": 2,
456+
"taskScheduledEventDetails": {
457+
"parameters": {
458+
"MessageAttributes": {
459+
"my_attribute_no_1": {
460+
"DataType": "String",
461+
"StringValue": "Hello"
462+
},
463+
"my_attribute_no_2": {
464+
"DataType": "String",
465+
"StringValue": "World"
466+
}
467+
},
468+
"QueueUrl": "<sqs_queue_url>",
469+
"MessageBody": "test_message_body"
470+
},
471+
"region": "<region>",
472+
"resource": "sendMessage",
473+
"resourceType": "sqs"
474+
},
475+
"timestamp": "timestamp",
476+
"type": "TaskScheduled"
477+
},
478+
{
479+
"id": 4,
480+
"previousEventId": 3,
481+
"taskStartedEventDetails": {
482+
"resource": "sendMessage",
483+
"resourceType": "sqs"
484+
},
485+
"timestamp": "timestamp",
486+
"type": "TaskStarted"
487+
},
488+
{
489+
"id": 5,
490+
"previousEventId": 4,
491+
"taskSucceededEventDetails": {
492+
"output": {
493+
"MD5OfMessageAttributes": "<m-d5-of-message-attributes:1>",
494+
"MD5OfMessageBody": "<m-d5-of-message-body:1>",
495+
"MessageId": "<uuid:1>",
496+
"SdkHttpMetadata": {
497+
"AllHttpHeaders": {
498+
"x-amzn-RequestId": [
499+
"<uuid:2>"
500+
],
501+
"connection": [
502+
"keep-alive"
503+
],
504+
"Content-Length": [
505+
"166"
506+
],
507+
"Date": "date",
508+
"Content-Type": [
509+
"application/x-amz-json-1.0"
510+
]
511+
},
512+
"HttpHeaders": {
513+
"connection": "keep-alive",
514+
"Content-Length": "166",
515+
"Content-Type": "application/x-amz-json-1.0",
516+
"Date": "date",
517+
"x-amzn-RequestId": "<uuid:2>"
518+
},
519+
"HttpStatusCode": 200
520+
},
521+
"SdkResponseMetadata": {
522+
"RequestId": "<uuid:2>"
523+
}
524+
},
525+
"outputDetails": {
526+
"truncated": false
527+
},
528+
"resource": "sendMessage",
529+
"resourceType": "sqs"
530+
},
531+
"timestamp": "timestamp",
532+
"type": "TaskSucceeded"
533+
},
534+
{
535+
"id": 6,
536+
"previousEventId": 5,
537+
"stateExitedEventDetails": {
538+
"name": "SendSQS",
539+
"output": {
540+
"MD5OfMessageAttributes": "<m-d5-of-message-attributes:1>",
541+
"MD5OfMessageBody": "<m-d5-of-message-body:1>",
542+
"MessageId": "<uuid:1>",
543+
"SdkHttpMetadata": {
544+
"AllHttpHeaders": {
545+
"x-amzn-RequestId": [
546+
"<uuid:2>"
547+
],
548+
"connection": [
549+
"keep-alive"
550+
],
551+
"Content-Length": [
552+
"166"
553+
],
554+
"Date": "date",
555+
"Content-Type": [
556+
"application/x-amz-json-1.0"
557+
]
558+
},
559+
"HttpHeaders": {
560+
"connection": "keep-alive",
561+
"Content-Length": "166",
562+
"Content-Type": "application/x-amz-json-1.0",
563+
"Date": "date",
564+
"x-amzn-RequestId": "<uuid:2>"
565+
},
566+
"HttpStatusCode": 200
567+
},
568+
"SdkResponseMetadata": {
569+
"RequestId": "<uuid:2>"
570+
}
571+
},
572+
"outputDetails": {
573+
"truncated": false
574+
}
575+
},
576+
"timestamp": "timestamp",
577+
"type": "TaskStateExited"
578+
},
579+
{
580+
"executionSucceededEventDetails": {
581+
"output": {
582+
"MD5OfMessageAttributes": "<m-d5-of-message-attributes:1>",
583+
"MD5OfMessageBody": "<m-d5-of-message-body:1>",
584+
"MessageId": "<uuid:1>",
585+
"SdkHttpMetadata": {
586+
"AllHttpHeaders": {
587+
"x-amzn-RequestId": [
588+
"<uuid:2>"
589+
],
590+
"connection": [
591+
"keep-alive"
592+
],
593+
"Content-Length": [
594+
"166"
595+
],
596+
"Date": "date",
597+
"Content-Type": [
598+
"application/x-amz-json-1.0"
599+
]
600+
},
601+
"HttpHeaders": {
602+
"connection": "keep-alive",
603+
"Content-Length": "166",
604+
"Content-Type": "application/x-amz-json-1.0",
605+
"Date": "date",
606+
"x-amzn-RequestId": "<uuid:2>"
607+
},
608+
"HttpStatusCode": 200
609+
},
610+
"SdkResponseMetadata": {
611+
"RequestId": "<uuid:2>"
612+
}
613+
},
614+
"outputDetails": {
615+
"truncated": false
616+
}
617+
},
618+
"id": 7,
619+
"previousEventId": 6,
620+
"timestamp": "timestamp",
621+
"type": "ExecutionSucceeded"
622+
}
623+
],
624+
"ResponseMetadata": {
625+
"HTTPHeaders": {},
626+
"HTTPStatusCode": 200
627+
}
628+
}
629+
}
411630
}
412631
}

tests/aws/services/stepfunctions/v2/services/test_sqs_task_service.validation.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
{
22
"tests/aws/services/stepfunctions/v2/services/test_sqs_task_service.py::TestTaskServiceSqs::test_send_message": {
3-
"last_validated_date": "2024-04-18T06:37:09+00:00"
3+
"last_validated_date": "2024-08-21T15:47:00+00:00"
4+
},
5+
"tests/aws/services/stepfunctions/v2/services/test_sqs_task_service.py::TestTaskServiceSqs::test_send_message_attributes": {
6+
"last_validated_date": "2024-08-22T10:04:56+00:00"
47
},
58
"tests/aws/services/stepfunctions/v2/services/test_sqs_task_service.py::TestTaskServiceSqs::test_send_message_unsupported_parameters": {
69
"last_validated_date": "2024-04-18T06:37:24+00:00"

0 commit comments

Comments
 (0)