Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions localstack-core/localstack/services/events/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -1772,7 +1772,7 @@ def create_target_sender(
target_sender = TargetSenderFactory(
target, rule_arn, rule_name, region, account_id
).get_target_sender()
self._target_sender_store[target_sender.arn] = target_sender
self._target_sender_store[target_sender.unique_id] = target_sender
return target_sender

def create_archive_service(
Expand Down Expand Up @@ -1835,11 +1835,11 @@ def _delete_rule_services(self, rules: RuleDict | Rule) -> None:
def _delete_target_sender(self, ids: TargetIdList, rule) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see this is only used when deleting targets. However, from everything I can see in this PR, Targets are living inside a Rule. Shouldn't we delete all TargetSender when deleting a rule? As we're caching the clients inside the sender, it could lead to quite some memory being taken and never released.

I think that we should delete all the TargetSender when deleting a rule. This is somewhat outside of the scope of this PR, but also somewhat in it, because you chose to go the "nested" route in the dictionary key, when in reality, it seems the structure is more rule_arn -> target_id -> TargetSender for the dict (self._target_sender_store[rule_arn][target_id]), so it should have another level of nesting, so that when you delete the rule, you can do del self._target_sender_store[rule_arn] instead of having to iterate over all target and create the "nested key" every time. Both works, you just need to manipulate the key every time.

Copy link
Member Author

@maxhoheiser maxhoheiser Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I like your approach for nesting targets under rules. Regarding deleting them - aws does not allow the deleting of a rule that still has targets - first all targets must be manually deleted - this takes care of the above-mentioned problem.

for target_id in ids:
if target := rule.targets.get(target_id):
target_arn = target["Arn"]
target_unique_id = f"{rule.arn}-{target_id}"
try:
del self._target_sender_store[target_arn]
del self._target_sender_store[target_unique_id]
except KeyError:
LOG.error("Error deleting target service %s.", target_arn)
LOG.error("Error deleting target service %s.", target["Arn"])

def _get_limited_dict_and_next_token(
self, input_dict: dict, next_token: NextToken | None, limit: LimitMax100 | None
Expand Down Expand Up @@ -1889,8 +1889,8 @@ def func(*args, **kwargs):
"resources": [rule.arn],
"detail": {},
}

target_sender = self._target_sender_store[target["Arn"]]
target_unique_id = f"{rule.arn}-{target['Id']}"
target_sender = self._target_sender_store[target_unique_id]
try:
target_sender.process_event(event.copy())
except Exception as e:
Expand Down Expand Up @@ -2178,16 +2178,17 @@ def _process_rules(
return

for target in rule.targets.values():
target_arn = target["Arn"]
if is_archive_arn(target_arn):
target_id = target["Id"]
if is_archive_arn(target["Arn"]):
self._put_to_archive(
region,
account_id,
archive_target_id=target["Id"],
archive_target_id=target_id,
event=event_formatted,
)
else:
target_sender = self._target_sender_store[target_arn]
target_unique_id = f"{rule.arn}-{target_id}"
target_sender = self._target_sender_store[target_unique_id]
try:
target_sender.process_event(event_formatted.copy())
rule_invocation.record(target_sender.service)
Expand All @@ -2198,7 +2199,7 @@ def _process_rules(
json.dumps(
{
"ErrorCode": "TargetDeliveryFailure",
"ErrorMessage": f"Failed to deliver to target {target['Id']}: {str(error)}",
"ErrorMessage": f"Failed to deliver to target {target_id}: {str(error)}",
}
)
)
Expand Down
14 changes: 13 additions & 1 deletion localstack-core/localstack/services/events/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,18 @@ def __init__(
def arn(self):
return self.target["Arn"]

@property
def target_id(self):
return self.target["Id"]

@property
def unique_id(self):
"""Necessary to distinguish between targets with the same ARN but for different rules.
The unique_id is a combination of the rule ARN and the Target Id.
This is necessary since input path and input transformer can be different for the same target ARN,
attached to different rules."""
return f"{self.rule_arn}-{self.target_id}"

@property
def client(self):
"""Lazy initialization of internal botoclient factory."""
Expand Down Expand Up @@ -263,7 +275,7 @@ def _get_predefined_template_replacements(self, event: FormattedEvent) -> dict[s
return predefined_template_replacements


TargetSenderDict = dict[Arn, TargetSender]
TargetSenderDict = dict[str, TargetSender] # rule_arn-target_id as global unique id

# Target Senders are ordered alphabetically by service name

Expand Down
6 changes: 3 additions & 3 deletions tests/aws/services/events/test_archive_and_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
wait_for_replay_in_state,
)
from tests.aws.services.events.test_events import (
EVENT_DETAIL,
TEST_EVENT_DETAIL,
TEST_EVENT_PATTERN,
TEST_EVENT_PATTERN_NO_DETAIL,
)
Expand Down Expand Up @@ -219,7 +219,7 @@ def test_list_archive_with_events(
entry = {
"Source": TEST_EVENT_PATTERN["source"][0],
"DetailType": TEST_EVENT_PATTERN["detail-type"][0],
"Detail": json.dumps(EVENT_DETAIL),
"Detail": json.dumps(TEST_EVENT_DETAIL),
}
entries.append(entry)

Expand Down Expand Up @@ -412,7 +412,7 @@ def test_start_list_describe_canceled_replay(
entry = {
"Source": TEST_EVENT_PATTERN["source"][0],
"DetailType": TEST_EVENT_PATTERN["detail-type"][0],
"Detail": json.dumps(EVENT_DETAIL),
"Detail": json.dumps(TEST_EVENT_DETAIL),
"EventBusName": event_bus_name,
}
entries.append(entry)
Expand Down
Loading
Loading