Skip to content

Commit bdf6e1c

Browse files
bentskumacnev2013
authored andcommitted
SNS: implement Content-Type logic for http/https target (#11634)
1 parent 84cb016 commit bdf6e1c

File tree

6 files changed

+844
-5
lines changed

6 files changed

+844
-5
lines changed

localstack-core/localstack/services/sns/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ class SnsSubscription(TypedDict, total=False):
107107
RawMessageDelivery: Literal["true", "false"]
108108
ConfirmationWasAuthenticated: Literal["true", "false"]
109109
SubscriptionRoleArn: Optional[str]
110+
DeliveryPolicy: Optional[str]
110111

111112

112113
class SnsStore(BaseStore):

localstack-core/localstack/services/sns/provider.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,16 @@ def get_topic_attributes(
157157
# TODO: very hacky way to get the attributes we need instead of a moto patch
158158
# see the attributes we need: https://docs.aws.amazon.com/sns/latest/dg/sns-topic-attributes.html
159159
# would need more work to have the proper format out of moto, maybe extract the model to our store
160+
attributes = moto_response["Attributes"]
160161
for attr in vars(moto_topic_model):
161162
if "_feedback" in attr:
162163
key = camelcase_to_pascal(underscores_to_camelcase(attr))
163-
moto_response["Attributes"][key] = getattr(moto_topic_model, attr)
164+
attributes[key] = getattr(moto_topic_model, attr)
164165
elif attr == "signature_version":
165-
moto_response["Attributes"]["SignatureVersion"] = moto_topic_model.signature_version
166+
attributes["SignatureVersion"] = moto_topic_model.signature_version
166167
elif attr == "archive_policy":
167-
moto_response["Attributes"]["ArchivePolicy"] = moto_topic_model.archive_policy
168+
attributes["ArchivePolicy"] = moto_topic_model.archive_policy
169+
168170
return moto_response
169171

170172
def publish_batch(

localstack-core/localstack/services/sns/publisher.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -489,8 +489,11 @@ def _publish(self, context: SnsPublishContext, subscriber: SnsSubscription):
489489
# When raw message delivery is enabled, x-amz-sns-rawdelivery needs to be set to 'true'
490490
# indicating that the message has been published without JSON formatting.
491491
# https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html
492-
if message_context.type == "Notification" and is_raw_message_delivery(subscriber):
493-
message_headers["x-amz-sns-rawdelivery"] = "true"
492+
if message_context.type == "Notification":
493+
if is_raw_message_delivery(subscriber):
494+
message_headers["x-amz-sns-rawdelivery"] = "true"
495+
if content_type := self._get_content_type(subscriber, context.topic_attributes):
496+
message_headers["Content-Type"] = content_type
494497

495498
response = requests.post(
496499
subscriber["Endpoint"],
@@ -526,6 +529,30 @@ def _publish(self, context: SnsPublishContext, subscriber: SnsSubscription):
526529
if message_context.type != "UnsubscribeConfirmation":
527530
sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))
528531

532+
@staticmethod
533+
def _get_content_type(subscriber: SnsSubscription, topic_attributes: dict) -> str | None:
534+
# TODO: we need to load the DeliveryPolicy every time if there's one, we should probably save the loaded
535+
# policy on the subscription and dumps it when requested instead
536+
# to be much faster, once the logic is implemented in moto, we would only need to fetch EffectiveDeliveryPolicy,
537+
# which would already have the value from the topic
538+
if json_sub_delivery_policy := subscriber.get("DeliveryPolicy"):
539+
sub_delivery_policy = json.loads(json_sub_delivery_policy)
540+
if sub_content_type := sub_delivery_policy.get("requestPolicy", {}).get(
541+
"headerContentType"
542+
):
543+
return sub_content_type
544+
545+
if json_topic_delivery_policy := topic_attributes.get("delivery_policy"):
546+
topic_delivery_policy = json.loads(json_topic_delivery_policy)
547+
if not (
548+
topic_content_type := topic_delivery_policy.get(subscriber["Protocol"].lower())
549+
):
550+
return
551+
if content_type := topic_content_type.get("defaultRequestPolicy", {}).get(
552+
"headerContentType"
553+
):
554+
return content_type
555+
529556

530557
class EmailJsonTopicPublisher(TopicPublisher):
531558
"""

tests/aws/services/sns/test_sns.py

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,82 @@ def test_create_topic_after_delete_with_new_tags(self, sns_create_topic, snapsho
229229
topic1 = sns_create_topic(Name=topic_name, Tags=[{"Key": "Name", "Value": "abc"}])
230230
snapshot.match("topic-1", topic1)
231231

232+
@markers.aws.validated
233+
@pytest.mark.skip(reason="Not properly implemented in Moto, only mocked")
234+
def test_topic_delivery_policy_crud(self, sns_create_topic, snapshot, aws_client):
235+
# https://docs.aws.amazon.com/sns/latest/dg/sns-message-delivery-retries.html
236+
create_topic = sns_create_topic(
237+
Name="topictest.fifo",
238+
Attributes={
239+
"DisplayName": "TestTopic",
240+
"SignatureVersion": "2",
241+
"FifoTopic": "true",
242+
"DeliveryPolicy": json.dumps(
243+
{
244+
"http": {
245+
"defaultRequestPolicy": {"headerContentType": "application/json"},
246+
}
247+
}
248+
),
249+
},
250+
)
251+
topic_arn = create_topic["TopicArn"]
252+
253+
get_attrs = aws_client.sns.get_topic_attributes(
254+
TopicArn=topic_arn,
255+
)
256+
snapshot.match("get-topic-attrs", get_attrs)
257+
258+
set_attrs = aws_client.sns.set_topic_attributes(
259+
TopicArn=topic_arn,
260+
AttributeName="DeliveryPolicy",
261+
AttributeValue=json.dumps(
262+
{
263+
"http": {
264+
"defaultHealthyRetryPolicy": {
265+
"minDelayTarget": 5,
266+
"maxDelayTarget": 6,
267+
"numRetries": 1,
268+
}
269+
}
270+
}
271+
),
272+
)
273+
snapshot.match("set-topic-attrs", set_attrs)
274+
275+
get_attrs_updated = aws_client.sns.get_topic_attributes(
276+
TopicArn=topic_arn,
277+
)
278+
snapshot.match("get-topic-attrs-after-update", get_attrs_updated)
279+
280+
set_attrs_none = aws_client.sns.set_topic_attributes(
281+
TopicArn=topic_arn,
282+
AttributeName="DeliveryPolicy",
283+
AttributeValue=json.dumps(
284+
{
285+
"http": {"defaultHealthyRetryPolicy": None},
286+
}
287+
),
288+
)
289+
snapshot.match("set-topic-attrs-none", set_attrs_none)
290+
291+
get_attrs_updated = aws_client.sns.get_topic_attributes(
292+
TopicArn=topic_arn,
293+
)
294+
snapshot.match("get-topic-attrs-after-none", get_attrs_updated)
295+
296+
set_attrs_none_full = aws_client.sns.set_topic_attributes(
297+
TopicArn=topic_arn,
298+
AttributeName="DeliveryPolicy",
299+
AttributeValue="",
300+
)
301+
snapshot.match("set-topic-attrs-full-none", set_attrs_none_full)
302+
303+
get_attrs_updated = aws_client.sns.get_topic_attributes(
304+
TopicArn=topic_arn,
305+
)
306+
snapshot.match("get-topic-attrs-after-delete", get_attrs_updated)
307+
232308

233309
class TestSNSPublishCrud:
234310
"""
@@ -3656,6 +3732,116 @@ def test_dlq_external_http_endpoint(
36563732
# AWS doesn't send to the DLQ if the UnsubscribeConfirmation fails to be delivered
36573733
assert "Messages" not in response or response["Messages"] == []
36583734

3735+
@markers.aws.manual_setup_required
3736+
@pytest.mark.parametrize("raw_message_delivery", [True, False])
3737+
@markers.snapshot.skip_snapshot_verify(
3738+
paths=[
3739+
"$.http-message-headers.Accept", # requests adds the header but not SNS, not very important
3740+
"$.http-message-headers-raw.Accept",
3741+
"$.http-confirm-sub-headers.Accept",
3742+
# TODO: we need to fix this parity in Moto, in order to make the retrieval logic of those values easier
3743+
"$.sub-attrs.Attributes.ConfirmationWasAuthenticated",
3744+
"$.sub-attrs.Attributes.DeliveryPolicy",
3745+
"$.sub-attrs.Attributes.EffectiveDeliveryPolicy",
3746+
"$.topic-attrs.Attributes.DeliveryPolicy",
3747+
"$.topic-attrs.Attributes.EffectiveDeliveryPolicy",
3748+
"$.topic-attrs.Attributes.Policy.Statement..Action",
3749+
]
3750+
)
3751+
def test_subscribe_external_http_endpoint_content_type(
3752+
self,
3753+
sns_create_http_endpoint,
3754+
raw_message_delivery,
3755+
aws_client,
3756+
snapshot,
3757+
):
3758+
def _clean_headers(response_headers: dict):
3759+
return {key: val for key, val in response_headers.items() if "Forwarded" not in key}
3760+
3761+
snapshot.add_transformer(
3762+
[
3763+
snapshot.transform.key_value("RequestId"),
3764+
snapshot.transform.key_value("Token"),
3765+
snapshot.transform.key_value("Host"),
3766+
snapshot.transform.key_value(
3767+
"Content-Length", reference_replacement=False
3768+
), # might change depending on compression
3769+
snapshot.transform.key_value(
3770+
"Connection", reference_replacement=False
3771+
), # casing might change
3772+
snapshot.transform.regex(
3773+
r"(?i)(?<=SubscribeURL[\"|']:\s[\"|'])(https?.*?)(?=/\?Action=ConfirmSubscription)",
3774+
replacement="<subscribe-domain>",
3775+
),
3776+
]
3777+
)
3778+
3779+
# Necessitate manual set up to allow external access to endpoint, only in local testing
3780+
topic_arn, subscription_arn, endpoint_url, server = sns_create_http_endpoint(
3781+
raw_message_delivery
3782+
)
3783+
3784+
# try both setting the Topic attribute or Subscription attribute
3785+
# https://docs.aws.amazon.com/sns/latest/dg/sns-message-delivery-retries.html#creating-delivery-policy
3786+
if raw_message_delivery:
3787+
aws_client.sns.set_subscription_attributes(
3788+
SubscriptionArn=subscription_arn,
3789+
AttributeName="DeliveryPolicy",
3790+
AttributeValue=json.dumps(
3791+
{
3792+
"requestPolicy": {"headerContentType": "text/csv"},
3793+
}
3794+
),
3795+
)
3796+
else:
3797+
aws_client.sns.set_topic_attributes(
3798+
TopicArn=topic_arn,
3799+
AttributeName="DeliveryPolicy",
3800+
AttributeValue=json.dumps(
3801+
{
3802+
"http": {
3803+
"defaultRequestPolicy": {"headerContentType": "application/json"},
3804+
}
3805+
}
3806+
),
3807+
)
3808+
3809+
topic_attrs = aws_client.sns.get_topic_attributes(TopicArn=topic_arn)
3810+
snapshot.match("topic-attrs", topic_attrs)
3811+
3812+
sub_attrs = aws_client.sns.get_subscription_attributes(SubscriptionArn=subscription_arn)
3813+
snapshot.match("sub-attrs", sub_attrs)
3814+
3815+
assert poll_condition(
3816+
lambda: len(server.log) >= 1,
3817+
timeout=5,
3818+
)
3819+
sub_request, _ = server.log[0]
3820+
payload = sub_request.get_json(force=True)
3821+
snapshot.match("subscription-confirmation", payload)
3822+
snapshot.match("http-confirm-sub-headers", _clean_headers(sub_request.headers))
3823+
3824+
token = payload["Token"]
3825+
aws_client.sns.confirm_subscription(TopicArn=topic_arn, Token=token)
3826+
3827+
message = "test_external_http_endpoint"
3828+
aws_client.sns.publish(TopicArn=topic_arn, Message=message)
3829+
3830+
assert poll_condition(
3831+
lambda: len(server.log) >= 2,
3832+
timeout=5,
3833+
)
3834+
notification_request, _ = server.log[1]
3835+
assert notification_request.headers["x-amz-sns-message-type"] == "Notification"
3836+
if raw_message_delivery:
3837+
payload = notification_request.data.decode()
3838+
assert payload == message
3839+
snapshot.match("http-message-headers-raw", _clean_headers(notification_request.headers))
3840+
else:
3841+
payload = notification_request.get_json(force=True)
3842+
snapshot.match("http-message", payload)
3843+
snapshot.match("http-message-headers", _clean_headers(notification_request.headers))
3844+
36593845

36603846
class TestSNSSubscriptionFirehose:
36613847
@markers.aws.validated

0 commit comments

Comments
 (0)