1
1
import base64
2
+ import copy
2
3
import functools
3
4
import json
4
5
import logging
@@ -696,26 +697,29 @@ def subscribe(
696
697
"Invalid parameter: Invalid parameter: Endpoint Reason: FIFO SQS Queues can not be subscribed to standard SNS topics"
697
698
)
698
699
699
- if attributes :
700
- for attr_name , attr_value in attributes .items ():
700
+ sub_attributes = copy .deepcopy (attributes ) if attributes else None
701
+ if sub_attributes :
702
+ for attr_name , attr_value in sub_attributes .items ():
701
703
validate_subscription_attribute (
702
704
attribute_name = attr_name ,
703
705
attribute_value = attr_value ,
704
706
topic_arn = topic_arn ,
705
707
endpoint = endpoint ,
706
708
is_subscribe_call = True ,
707
709
)
710
+ if raw_msg_delivery := sub_attributes .get ("RawMessageDelivery" ):
711
+ sub_attributes ["RawMessageDelivery" ] = raw_msg_delivery .lower ()
708
712
709
713
# An endpoint may only be subscribed to a topic once. Subsequent
710
714
# subscribe calls do nothing (subscribe is idempotent), except if its attributes are different.
711
715
for existing_topic_subscription in store .topic_subscriptions .get (topic_arn , []):
712
716
sub = store .subscriptions .get (existing_topic_subscription , {})
713
717
if sub .get ("Endpoint" ) == endpoint :
714
- if attributes :
718
+ if sub_attributes :
715
719
# validate the subscription attributes aren't different
716
720
for attr in sns_constants .VALID_SUBSCRIPTION_ATTR_NAME :
717
721
# if a new attribute is present and different from an existent one, raise
718
- if (new_attr := attributes .get (attr )) and sub .get (attr ) != new_attr :
722
+ if (new_attr := sub_attributes .get (attr )) and sub .get (attr ) != new_attr :
719
723
raise InvalidParameterException (
720
724
"Invalid parameter: Attributes Reason: Subscription already exists with different attributes"
721
725
)
@@ -738,25 +742,22 @@ def subscribe(
738
742
FilterPolicyScope = "MessageAttributes" , # default value, will be overridden if set
739
743
SubscriptionPrincipal = principal , # dummy value, could be fetched with a call to STS?
740
744
)
741
- if attributes :
742
- subscription .update (attributes )
743
- if "FilterPolicy" in attributes :
745
+ if sub_attributes :
746
+ subscription .update (sub_attributes )
747
+ if "FilterPolicy" in sub_attributes :
744
748
filter_policy = (
745
- json .loads (attributes ["FilterPolicy" ]) if attributes ["FilterPolicy" ] else None
749
+ json .loads (sub_attributes ["FilterPolicy" ])
750
+ if sub_attributes ["FilterPolicy" ]
751
+ else None
746
752
)
747
753
if filter_policy :
748
754
validator = FilterPolicyValidator (
749
- scope = attributes .get ("FilterPolicyScope" , "MessageAttributes" ),
755
+ scope = subscription .get ("FilterPolicyScope" , "MessageAttributes" ),
750
756
is_subscribe_call = True ,
751
757
)
752
758
validator .validate_filter_policy (filter_policy )
753
759
754
- store .subscription_filter_policy [subscription_arn ] = (
755
- json .loads (attributes ["FilterPolicy" ]) if attributes ["FilterPolicy" ] else None
756
- )
757
-
758
- if raw_msg_delivery := attributes .get ("RawMessageDelivery" ):
759
- subscription ["RawMessageDelivery" ] = raw_msg_delivery .lower ()
760
+ store .subscription_filter_policy [subscription_arn ] = filter_policy
760
761
761
762
store .subscriptions [subscription_arn ] = subscription
762
763
0 commit comments