Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 28 additions & 17 deletions localstack-core/localstack/services/route53resolver/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,12 @@ def create_firewall_rule_group(
) -> CreateFirewallRuleGroupResponse:
"""Create a Firewall Rule Group."""
store = self.get_store(context.account_id, context.region)
id = get_route53_resolver_firewall_rule_group_id()
arn = arns.route53_resolver_firewall_rule_group_arn(id, context.account_id, context.region)
firewall_rule_group_id = get_route53_resolver_firewall_rule_group_id()
arn = arns.route53_resolver_firewall_rule_group_arn(
firewall_rule_group_id, context.account_id, context.region
)
firewall_rule_group = FirewallRuleGroup(
Id=id,
Id=firewall_rule_group_id,
Arn=arn,
Name=name,
RuleCount=0,
Expand All @@ -136,7 +138,8 @@ def create_firewall_rule_group(
CreationTime=datetime.now(timezone.utc).isoformat(),
ModificationTime=datetime.now(timezone.utc).isoformat(),
)
store.firewall_rule_groups[id] = firewall_rule_group
store.firewall_rule_groups[firewall_rule_group_id] = firewall_rule_group
store.firewall_rules[firewall_rule_group_id] = {}
route53resolver_backends[context.account_id][context.region].tagger.tag_resource(
arn, tags or []
)
Expand Down Expand Up @@ -342,11 +345,9 @@ def create_firewall_rule(
FirewallDomainRedirectionAction=firewall_domain_redirection_action,
Qtype=qtype,
)
if store.firewall_rules.get(firewall_rule_group_id):
store.firewall_rules[firewall_rule_group_id][firewall_domain_list_id] = firewall_rule
else:
store.firewall_rules[firewall_rule_group_id] = {}
if firewall_rule_group_id in store.firewall_rules:
store.firewall_rules[firewall_rule_group_id][firewall_domain_list_id] = firewall_rule
# TODO: handle missing firewall-rule-group-id
return CreateFirewallRuleResponse(FirewallRule=firewall_rule)

def delete_firewall_rule(
Expand Down Expand Up @@ -377,19 +378,29 @@ def list_firewall_rules(
next_token: NextToken = None,
**kwargs,
) -> ListFirewallRulesResponse:
"""List all the firewall rules in a firewall rule group."""
# TODO: implement priority and action filtering
"""List firewall rules in a firewall rule group.

Rules will be filtered by priority and action if values for these params are provided.

Raises:
ResourceNotFound: If a firewall group by the provided id does not exist.
"""
store = self.get_store(context.account_id, context.region)
firewall_rules = []
for firewall_rule in store.firewall_rules.get(firewall_rule_group_id, {}).values():
firewall_rules.append(FirewallRule(firewall_rule))
if len(firewall_rules) == 0:
firewall_rule_group = store.firewall_rules.get(firewall_rule_group_id)
if firewall_rule_group is None:
raise ResourceNotFoundException(
f"Can't find the resource with ID '{firewall_rule_group_id}'. Trace Id: '{localstack.services.route53resolver.utils.get_trace_id()}'"
)
return ListFirewallRulesResponse(
FirewallRules=firewall_rules,
)

firewall_rules = [
FirewallRule(rule)
for rule in firewall_rule_group.values()
if (action is None or action == rule["Action"])
and (priority is None or priority == rule["Priority"])
]

# TODO: implement max_results filtering and next_token handling
return ListFirewallRulesResponse(FirewallRules=firewall_rules)

def update_firewall_rule(
self,
Expand Down
144 changes: 144 additions & 0 deletions tests/aws/services/route53resolver/test_route53resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from localstack.aws.api.route53resolver import (
Action,
ListResolverEndpointsResponse,
ListResolverQueryLogConfigsResponse,
ListResolverRuleAssociationsResponse,
Expand All @@ -23,6 +24,27 @@ def route53resolver_api_snapshot_transformer(snapshot):
snapshot.add_transformer(snapshot.transform.route53resolver_api())


@pytest.fixture
def create_firewall_rule(aws_client: ServiceLevelClientFactory):
rules = []

def inner(**kwargs):
kwargs.setdefault("Name", f"rule-name-{short_uid()}")
rule_group_id = kwargs["FirewallRuleGroupId"]
domain_list_id = kwargs["FirewallDomainListId"]
response = aws_client.route53resolver.create_firewall_rule(**kwargs)
rules.append((rule_group_id, domain_list_id))
return response

yield inner

for rule_group_id, domain_list_id in rules[::-1]:
aws_client.route53resolver.delete_firewall_rule(
FirewallRuleGroupId=rule_group_id,
FirewallDomainListId=domain_list_id,
)


# TODO: extract this somewhere so that we can reuse it in other places
def _cleanup_vpc(aws_client: ServiceLevelClientFactory, vpc_id: str):
"""
Expand Down Expand Up @@ -721,3 +743,125 @@ def test_list_firewall_domain_lists(self, cleanups, snapshot, aws_client):

tag_result = aws_client.route53resolver.list_tags_for_resource(ResourceArn=arn)
snapshot.match("list-tags-for-resource", tag_result)

@markers.aws.validated
@markers.snapshot.skip_snapshot_verify(paths=["$..Message"])
def test_list_firewall_rules_for_missing_rule_group(self, snapshot, aws_client):
"""Test listing firewall rules for a non-existing rule-group."""
with pytest.raises(
aws_client.route53resolver.exceptions.ResourceNotFoundException
) as resource_not_found:
aws_client.route53resolver.list_firewall_rules(FirewallRuleGroupId="missing-id")

snapshot.add_transformer(
snapshot.transform.regex(r"\d{1}-[a-f0-9]{8}-[a-f0-9]{24}", "trace-id")
)
snapshot.match("missing-firewall-rule-group-id", resource_not_found.value.response)

@markers.aws.validated
def test_list_firewall_rules_for_empty_rule_group(self, cleanups, snapshot, aws_client):
snapshot.add_transformer(snapshot.transform.key_value("Name"))

rule_group_response = aws_client.route53resolver.create_firewall_rule_group(
Name=f"empty-{short_uid()}"
)
cleanups.append(
lambda: aws_client.route53resolver.delete_firewall_rule_group(
FirewallRuleGroupId=rule_group_response["FirewallRuleGroup"]["Id"]
)
)
snapshot.match("create-firewall-rule-group", rule_group_response)

response = aws_client.route53resolver.list_firewall_rules(
FirewallRuleGroupId=rule_group_response["FirewallRuleGroup"]["Id"]
)
snapshot.match("empty-firewall-rule-group", response)

@markers.aws.validated
@markers.snapshot.skip_snapshot_verify(paths=["$..FirewallDomainRedirectionAction"])
def test_list_firewall_rules(
self,
cleanups,
snapshot,
aws_client,
create_firewall_rule,
):
"""Test listing firewall rules.

We test listing:
- all rules in the rule-group
- rules filtered by priority
- rules filtered by action
- rules filtered by priority and action
"""

snapshot.add_transformer(
[
snapshot.transform.key_value("Name"),
snapshot.transform.key_value("FirewallRuleGroupId"),
snapshot.transform.key_value("FirewallDomainListId"),
]
)

firewall_rule_group_name = f"fw-rule-group-{short_uid()}"
rule_group_response = aws_client.route53resolver.create_firewall_rule_group(
Name=firewall_rule_group_name
)
cleanups.append(
lambda rule_group_id=rule_group_response["FirewallRuleGroup"][
"Id"
]: aws_client.route53resolver.delete_firewall_rule_group(
FirewallRuleGroupId=rule_group_id
)
)
# Parameters for creating resources
priorities = [1, 2, 3, 4]
actions = [Action.ALLOW, Action.ALERT, Action.ALERT, Action.ALLOW]

for action, priority in zip(actions, priorities):
domain_list_response = aws_client.route53resolver.create_firewall_domain_list(
Name=f"fw-domain-list-{short_uid()}"
)
cleanups.append(
lambda domain_list_id=domain_list_response["FirewallDomainList"][
"Id"
]: aws_client.route53resolver.delete_firewall_domain_list(
FirewallDomainListId=domain_list_id
)
)
create_firewall_rule(
FirewallRuleGroupId=rule_group_response["FirewallRuleGroup"]["Id"],
FirewallDomainListId=domain_list_response["FirewallDomainList"]["Id"],
Priority=priority,
Action=action,
)

# Check list filtering
list_all_response = aws_client.route53resolver.list_firewall_rules(
FirewallRuleGroupId=rule_group_response["FirewallRuleGroup"]["Id"]
)
snapshot.match("firewall-rules-list-all", list_all_response)

filter_by_priority_response = aws_client.route53resolver.list_firewall_rules(
FirewallRuleGroupId=rule_group_response["FirewallRuleGroup"]["Id"], Priority=1
)
snapshot.match("firewall-rules-list-by-priority", filter_by_priority_response)

filter_by_action_response = aws_client.route53resolver.list_firewall_rules(
FirewallRuleGroupId=rule_group_response["FirewallRuleGroup"]["Id"], Action=Action.ALLOW
)
snapshot.match("firewall-rules-list-by-action", filter_by_action_response)

action_and_priority_response = aws_client.route53resolver.list_firewall_rules(
FirewallRuleGroupId=rule_group_response["FirewallRuleGroup"]["Id"],
Action=Action.ALLOW,
Priority=4,
)
snapshot.match("firewall-rules-list-by-action-and-priority", action_and_priority_response)

filter_empty_response = aws_client.route53resolver.list_firewall_rules(
FirewallRuleGroupId=rule_group_response["FirewallRuleGroup"]["Id"],
Action=Action.ALLOW,
Priority=0, # 0 catches cases when integers pose as booleans
)
snapshot.match("firewall-rules-list-no-match", filter_empty_response)
Loading
Loading