@@ -3921,6 +3921,173 @@ def _clean_headers(response_headers: dict):
3921
3921
snapshot .match ("http-message" , payload )
3922
3922
snapshot .match ("http-message-headers" , _clean_headers (notification_request .headers ))
3923
3923
3924
+ @markers .aws .validated
3925
+ def test_subscribe_external_http_endpoint_lambda_url_sig_validation (
3926
+ self ,
3927
+ create_sns_http_endpoint_and_queue ,
3928
+ sns_create_topic ,
3929
+ sns_subscription ,
3930
+ aws_client ,
3931
+ snapshot ,
3932
+ sqs_collect_messages ,
3933
+ ):
3934
+ def _get_snapshot_from_lambda_url_msg (events : list [dict ]) -> dict :
3935
+ formatted_events = []
3936
+
3937
+ def _filter_headers (headers : dict ) -> dict :
3938
+ filtered_headers = {}
3939
+ for key , value in headers .items ():
3940
+ l_key = key .lower ()
3941
+ if l_key .startswith ("x-amz-sns" ) or key in (
3942
+ "content-type" ,
3943
+ "accept-encoding" ,
3944
+ "user-agent" ,
3945
+ ):
3946
+ filtered_headers [key ] = value
3947
+
3948
+ return filtered_headers
3949
+
3950
+ for event in events :
3951
+ msg = json .loads (event ["Body" ])["event" ]
3952
+ formatted_events .append (
3953
+ {"headers" : _filter_headers (msg ["headers" ]), "body" : json .loads (msg ["body" ])}
3954
+ )
3955
+
3956
+ return {"events" : formatted_events }
3957
+
3958
+ def validate_message_signature (msg_event : dict , msg_type : str ):
3959
+ cert_url = msg_event ["SigningCertURL" ]
3960
+ get_cert_req = requests .get (cert_url )
3961
+ assert get_cert_req .ok
3962
+
3963
+ cert = x509 .load_pem_x509_certificate (get_cert_req .content )
3964
+ message_signature = msg_event ["Signature" ]
3965
+ # create the canonical string
3966
+ if msg_type == "Notification" :
3967
+ fields = ["Message" , "MessageId" , "Subject" , "Timestamp" , "TopicArn" , "Type" ]
3968
+ else :
3969
+ fields = [
3970
+ "Message" ,
3971
+ "MessageId" ,
3972
+ "SubscribeURL" ,
3973
+ "Timestamp" ,
3974
+ "Token" ,
3975
+ "TopicArn" ,
3976
+ "Type" ,
3977
+ ]
3978
+
3979
+ # Build the string to be signed.
3980
+ string_to_sign = "" .join (
3981
+ [f"{ field } \n { msg_event [field ]} \n " for field in fields if field in msg_event ]
3982
+ )
3983
+
3984
+ # decode the signature from base64.
3985
+ decoded_signature = base64 .b64decode (message_signature )
3986
+
3987
+ message_sig_version = msg_event ["SignatureVersion" ]
3988
+ # this is a bug on AWS side, assert our behaviour is the same for now, this might get fixed
3989
+ assert message_sig_version == "1"
3990
+ signature_hash = hashes .SHA1 () if message_sig_version == "1" else hashes .SHA256 ()
3991
+
3992
+ # calculate signature value with cert
3993
+ # if the signature is invalid, this will raise an exception
3994
+ cert .public_key ().verify (
3995
+ decoded_signature ,
3996
+ to_bytes (string_to_sign ),
3997
+ padding = padding .PKCS1v15 (),
3998
+ algorithm = signature_hash ,
3999
+ )
4000
+
4001
+ snapshot .add_transformer (
4002
+ [
4003
+ snapshot .transform .key_value ("RequestId" ),
4004
+ snapshot .transform .key_value ("Token" ),
4005
+ snapshot .transform .key_value ("Host" ),
4006
+ snapshot .transform .regex (
4007
+ r"(?i)(?<=SubscribeURL[\"|']:\s[\"|'])(https?.*?)(?=/\?Action=ConfirmSubscription)" ,
4008
+ replacement = "<subscribe-domain>" ,
4009
+ ),
4010
+ ]
4011
+ )
4012
+ http_endpoint_url , queue_url = create_sns_http_endpoint_and_queue ()
4013
+ topic_arn = sns_create_topic ()["TopicArn" ]
4014
+ sns_protocol = http_endpoint_url .split ("://" )[0 ]
4015
+ subscription = sns_subscription (
4016
+ TopicArn = topic_arn , Protocol = sns_protocol , Endpoint = http_endpoint_url
4017
+ )
4018
+ subscription_arn = subscription ["SubscriptionArn" ]
4019
+ delivery_policy = {
4020
+ "healthyRetryPolicy" : {
4021
+ "minDelayTarget" : 1 ,
4022
+ "maxDelayTarget" : 1 ,
4023
+ "numRetries" : 0 ,
4024
+ "numNoDelayRetries" : 0 ,
4025
+ "numMinDelayRetries" : 0 ,
4026
+ "numMaxDelayRetries" : 0 ,
4027
+ "backoffFunction" : "linear" ,
4028
+ },
4029
+ "sicklyRetryPolicy" : None ,
4030
+ "throttlePolicy" : {"maxReceivesPerSecond" : 1000 },
4031
+ "guaranteed" : False ,
4032
+ }
4033
+ aws_client .sns .set_subscription_attributes (
4034
+ SubscriptionArn = subscription_arn ,
4035
+ AttributeName = "DeliveryPolicy" ,
4036
+ AttributeValue = json .dumps (delivery_policy ),
4037
+ )
4038
+
4039
+ messages = sqs_collect_messages (queue_url , expected = 1 , timeout = 10 )
4040
+ subscribe_event = _get_snapshot_from_lambda_url_msg (messages )
4041
+ snapshot .match ("subscription-confirmation" , subscribe_event )
4042
+
4043
+ subscribe_payload = subscribe_event ["events" ][0 ]["body" ]
4044
+
4045
+ validate_message_signature (
4046
+ subscribe_payload ,
4047
+ msg_type = subscribe_event ["events" ][0 ]["headers" ]["x-amz-sns-message-type" ],
4048
+ )
4049
+
4050
+ token = subscribe_payload ["Token" ]
4051
+ subscribe_url = subscribe_payload ["SubscribeURL" ]
4052
+ service_url , subscribe_url_path = subscribe_url .rsplit ("/" , maxsplit = 1 )
4053
+ # we manually assert here to be sure the format is right, as it hard to verify with snapshots
4054
+ assert subscribe_url == (
4055
+ f"{ service_url } /?Action=ConfirmSubscription&TopicArn={ topic_arn } &Token={ token } "
4056
+ )
4057
+
4058
+ confirm_subscription = aws_client .sns .confirm_subscription (TopicArn = topic_arn , Token = token )
4059
+ snapshot .match ("confirm-subscription" , confirm_subscription )
4060
+
4061
+ subscription_attributes = aws_client .sns .get_subscription_attributes (
4062
+ SubscriptionArn = subscription_arn
4063
+ )
4064
+ assert subscription_attributes ["Attributes" ]["PendingConfirmation" ] == "false"
4065
+
4066
+ message = "test_external_http_endpoint"
4067
+ aws_client .sns .publish (TopicArn = topic_arn , Message = message )
4068
+
4069
+ messages = sqs_collect_messages (queue_url , expected = 1 , timeout = 10 )
4070
+ publish_event = _get_snapshot_from_lambda_url_msg (messages )
4071
+ snapshot .match ("publish-event" , publish_event )
4072
+ publish_payload = publish_event ["events" ][0 ]["body" ]
4073
+ validate_message_signature (
4074
+ publish_payload ,
4075
+ msg_type = publish_event ["events" ][0 ]["headers" ]["x-amz-sns-message-type" ],
4076
+ )
4077
+
4078
+ unsub_request = requests .get (publish_payload ["UnsubscribeURL" ])
4079
+ assert b"UnsubscribeResponse" in unsub_request .content
4080
+
4081
+ messages = sqs_collect_messages (queue_url , expected = 1 , timeout = 10 )
4082
+ unsubscribe_event = _get_snapshot_from_lambda_url_msg (messages )
4083
+ snapshot .match ("unsubscribe-event" , unsubscribe_event )
4084
+
4085
+ unsubscribe_payload = unsubscribe_event ["events" ][0 ]["body" ]
4086
+ validate_message_signature (
4087
+ unsubscribe_payload ,
4088
+ msg_type = unsubscribe_event ["events" ][0 ]["headers" ]["x-amz-sns-message-type" ],
4089
+ )
4090
+
3924
4091
3925
4092
class TestSNSSubscriptionFirehose :
3926
4093
@markers .aws .validated
0 commit comments