Skip to content

SES: fix infinite loop with SNS event destination #12972

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion localstack-core/localstack/services/ses/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import StrEnum
from typing import TypedDict

from localstack.aws.api.ses import Address, Destination, Subject, TemplateData, TemplateName
Expand All @@ -8,7 +9,7 @@ class SentEmailBody(TypedDict):
text_part: str


class SentEmail(TypedDict):
class SentEmail(TypedDict, total=False):
Id: str
Region: str
Timestamp: str
Expand All @@ -19,3 +20,9 @@ class SentEmail(TypedDict):
Template: TemplateName
TemplateData: TemplateData
Body: SentEmailBody


class EmailType(StrEnum):
TEMPLATED = "templated"
RAW = "raw"
EMAIL = "email"
109 changes: 66 additions & 43 deletions localstack-core/localstack/services/ses/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
EventDestination,
EventDestinationDoesNotExistException,
EventDestinationName,
EventType,
GetIdentityVerificationAttributesResponse,
Identity,
IdentityList,
Expand Down Expand Up @@ -60,7 +61,7 @@
from localstack.http import Resource, Response
from localstack.services.moto import call_moto
from localstack.services.plugins import ServiceLifecycleHook
from localstack.services.ses.models import SentEmail, SentEmailBody
from localstack.services.ses.models import EmailType, SentEmail, SentEmailBody
from localstack.utils.aws import arns
from localstack.utils.files import mkdir
from localstack.utils.strings import long_uid, to_str
Expand Down Expand Up @@ -214,7 +215,9 @@ def create_configuration_set_event_destination(
emitter = SNSEmitter(context)
emitter.emit_create_configuration_set_event_destination_test_message(sns_topic_arn)

# only register the event destiation if emitting the message worked
# FIXME: Moto stores the Event Destinations as a single value when it should be a list
# it only considers the last Event Destination created, when AWS is able to store multiple configurations
# only register the event destination if emitting the message worked
try:
result = call_moto(context)
except CommonServiceException as e:
Expand Down Expand Up @@ -273,6 +276,9 @@ def delete_configuration_set_event_destination(
# FIXME: inconsistent state
LOGGER.warning("inconsistent state encountered in ses backend")

# FIXME: Moto stores the Event Destinations as a single value when it should be a list
# it only considers the last Event Destination created, when AWS is able to store multiple configurations
# don't pop the whole value which should be a list but is currently a dict
backend.config_set_event_destination.pop(configuration_set_name)

return DeleteConfigurationSetEventDestinationResponse()
Expand Down Expand Up @@ -362,25 +368,21 @@ def send_email(
response = call_moto(context)

backend = get_ses_backend(context)
emitter = SNSEmitter(context)
recipients = recipients_from_destination(destination)

for event_destination in backend.config_set_event_destination.values():
if not event_destination["Enabled"]:
continue

sns_destination_arn = event_destination.get("SNSDestination")
if not sns_destination_arn:
continue

payload = SNSPayload(
if event_destinations := backend.config_set_event_destination.get(configuration_set_name):
recipients = recipients_from_destination(destination)
payload = EventDestinationPayload(
message_id=response["MessageId"],
sender_email=source,
destination_addresses=recipients,
tags=tags,
)
emitter.emit_send_event(payload, sns_destination_arn)
emitter.emit_delivery_event(payload, sns_destination_arn)
notify_event_destinations(
context=context,
event_destinations=event_destinations,
payload=payload,
email_type=EmailType.EMAIL,
)

text_part = message["Body"].get("Text", {}).get("Data")
html_part = message["Body"].get("Html", {}).get("Data")
Expand Down Expand Up @@ -418,25 +420,21 @@ def send_templated_email(
response = call_moto(context)

backend = get_ses_backend(context)
emitter = SNSEmitter(context)
recipients = recipients_from_destination(destination)

for event_destination in backend.config_set_event_destination.values():
if not event_destination["Enabled"]:
continue

sns_destination_arn = event_destination.get("SNSDestination")
if not sns_destination_arn:
continue

payload = SNSPayload(
if event_destinations := backend.config_set_event_destination.get(configuration_set_name):
recipients = recipients_from_destination(destination)
payload = EventDestinationPayload(
message_id=response["MessageId"],
sender_email=source,
destination_addresses=recipients,
tags=tags,
)
emitter.emit_send_event(payload, sns_destination_arn, emit_source_arn=False)
emitter.emit_delivery_event(payload, sns_destination_arn)
notify_event_destinations(
context=context,
event_destinations=event_destinations,
payload=payload,
email_type=EmailType.TEMPLATED,
)

save_for_retrospection(
SentEmail(
Expand Down Expand Up @@ -481,23 +479,19 @@ def send_raw_email(
backend = get_ses_backend(context)
message = backend.send_raw_email(source, destinations, raw_data)

emitter = SNSEmitter(context)
for event_destination in backend.config_set_event_destination.values():
if not event_destination["Enabled"]:
continue

sns_destination_arn = event_destination.get("SNSDestination")
if not sns_destination_arn:
continue

payload = SNSPayload(
if event_destinations := backend.config_set_event_destination.get(configuration_set_name):
payload = EventDestinationPayload(
message_id=message.id,
sender_email=source,
destination_addresses=destinations,
tags=tags,
)
emitter.emit_send_event(payload, sns_destination_arn)
emitter.emit_delivery_event(payload, sns_destination_arn)
notify_event_destinations(
context=context,
event_destinations=event_destinations,
payload=payload,
email_type=EmailType.RAW,
)

save_for_retrospection(
SentEmail(
Expand Down Expand Up @@ -566,7 +560,7 @@ def set_identity_headers_in_notifications_enabled(


@dataclasses.dataclass(frozen=True)
class SNSPayload:
class EventDestinationPayload:
message_id: str
sender_email: Address
destination_addresses: AddressList
Expand Down Expand Up @@ -598,7 +592,7 @@ def emit_create_configuration_set_event_destination_test_message(
)

def emit_send_event(
self, payload: SNSPayload, sns_topic_arn: str, emit_source_arn: bool = True
self, payload: EventDestinationPayload, sns_topic_arn: str, emit_source_arn: bool = True
):
now = datetime.now(tz=UTC)

Expand Down Expand Up @@ -634,7 +628,7 @@ def emit_send_event(
except ClientError:
LOGGER.exception("sending SNS message")

def emit_delivery_event(self, payload: SNSPayload, sns_topic_arn: str):
def emit_delivery_event(self, payload: EventDestinationPayload, sns_topic_arn: str):
now = datetime.now(tz=UTC)

tags = defaultdict(list)
Expand Down Expand Up @@ -680,6 +674,35 @@ def _client_for_topic(topic_arn: str) -> "SNSClient":
).sns


def notify_event_destinations(
context: RequestContext,
# FIXME: Moto stores the Event Destinations as a single value when it should be a list
event_destinations: dict,
payload: EventDestinationPayload,
email_type: EmailType,
):
emitter = SNSEmitter(context)

if not isinstance(event_destinations, list):
event_destinations = [event_destinations]

for event_destination in event_destinations:
if not event_destination["Enabled"]:
continue

sns_destination_arn = event_destination.get("SNSDestination")
if not sns_destination_arn:
continue

matching_event_types = event_destination.get("EventMatchingTypes") or []
if EventType.send in matching_event_types:
emitter.emit_send_event(
payload, sns_destination_arn, emit_source_arn=email_type != EmailType.TEMPLATED
)
if EventType.delivery in matching_event_types:
emitter.emit_delivery_event(payload, sns_destination_arn)


class InvalidParameterValue(CommonServiceException):
def __init__(self, message=None):
super().__init__(
Expand Down
83 changes: 82 additions & 1 deletion tests/aws/services/ses/test_ses.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from localstack.testing.aws.util import is_aws_cloud
from localstack.testing.pytest import markers
from localstack.utils.strings import short_uid
from localstack.utils.sync import retry
from localstack.utils.sync import poll_condition, retry

SAMPLE_TEMPLATE = {
"TemplateName": "hello-world",
Expand Down Expand Up @@ -918,6 +918,87 @@ def test_special_tags_send_email(self, tag_name, tag_value, aws_client):

assert exc.match("MessageRejected")

# we cannot really introspect received emails in AWS
@markers.aws.only_localstack
def test_ses_sns_topic_integration_send_email_ses_destination(
self,
sns_topic,
sns_subscription,
ses_configuration_set,
ses_configuration_set_sns_event_destination,
setup_email_addresses,
aws_client,
):
"""
Validates that configure Event Destinations and sending an email does not trigger an infinite loop of sending
SNS notifications that sends an email that would trigger SNS.
"""

sender_email_address, recipient_email_address = setup_email_addresses()
config_set_name = f"config-set-{short_uid()}"

emails_url = config.internal_service_url() + EMAILS_ENDPOINT
response = requests.delete(emails_url)
assert response.status_code == 204

# create subscription to get notified about SES events
topic_arn = sns_topic["Attributes"]["TopicArn"]
sns_subscription(
TopicArn=topic_arn,
Protocol="email",
Endpoint=sender_email_address,
)

# create the config set
ses_configuration_set(config_set_name)
event_destination_name = f"config-set-event-destination-{short_uid()}"
ses_configuration_set_sns_event_destination(
config_set_name, event_destination_name, topic_arn
)

# send an email to trigger the SNS message and SES message
destination = {
"ToAddresses": [recipient_email_address],
}
send_email = aws_client.ses.send_email(
Destination=destination,
Message=SAMPLE_SIMPLE_EMAIL,
ConfigurationSetName=config_set_name,
Source=sender_email_address,
Tags=[
{
"Name": "custom-tag",
"Value": "tag-value",
}
],
)

def _get_emails():
_resp = requests.get(emails_url)
return _resp.json()["messages"]

poll_condition(lambda: len(_get_emails()) >= 4, timeout=3)
requests.delete(emails_url, params={"id": send_email["MessageId"]})

emails = _get_emails()
# we assert that we only received 3 emails
assert len(emails) == 3

emails = sorted(emails, key=lambda x: x["Body"]["text_part"])
# the first email is the validation of SNS confirming the SES subscription
ses_delivery_notification = emails[1]
ses_send_notification = emails[2]

assert ses_delivery_notification["Subject"] == "SNS-Subscriber-Endpoint"
delivery_payload = json.loads(ses_delivery_notification["Body"]["text_part"])
assert delivery_payload["eventType"] == "Delivery"
assert delivery_payload["mail"]["source"] == sender_email_address

assert ses_send_notification["Subject"] == "SNS-Subscriber-Endpoint"
send_payload = json.loads(ses_send_notification["Body"]["text_part"])
assert send_payload["eventType"] == "Send"
assert send_payload["mail"]["source"] == sender_email_address


@pytest.mark.usefixtures("openapi_validate")
class TestSESRetrospection:
Expand Down
Loading