Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions localstack-core/localstack/services/iam/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import random
import re
import string
import uuid
from datetime import datetime
from typing import Any, Dict, List, TypeVar
from urllib.parse import quote
Expand Down Expand Up @@ -36,6 +37,7 @@
GetServiceLinkedRoleDeletionStatusResponse,
GetUserResponse,
IamApi,
InvalidInputException,
ListInstanceProfileTagsResponse,
ListRolesResponse,
ListServiceSpecificCredentialsResponse,
Expand Down Expand Up @@ -75,9 +77,9 @@
from localstack.aws.connect import connect_to
from localstack.constants import INTERNAL_AWS_SECRET_ACCESS_KEY
from localstack.services.iam.iam_patches import apply_iam_patches
from localstack.services.iam.resources.service_linked_roles import SERVICE_LINKED_ROLES
from localstack.services.moto import call_moto
from localstack.utils.aws.request_context import extract_access_key_id_from_auth_header
from localstack.utils.common import short_uid

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -311,8 +313,6 @@ def create_service_linked_role(
custom_suffix: customSuffixType = None,
**kwargs,
) -> CreateServiceLinkedRoleResponse:
# TODO: test
# TODO: how to support "CustomSuffix" API request parameter?
policy_doc = json.dumps(
{
"Version": "2012-10-17",
Expand All @@ -325,9 +325,28 @@ def create_service_linked_role(
],
}
)
path = f"{SERVICE_LINKED_ROLE_PATH_PREFIX}/{aws_service_name}"
role_name = f"r-{short_uid()}"
service_role_data = SERVICE_LINKED_ROLES.get(aws_service_name)

path = f"{SERVICE_LINKED_ROLE_PATH_PREFIX}/{aws_service_name}/"
if service_role_data:
if custom_suffix and not service_role_data["suffix_allowed"]:
raise InvalidInputException(f"Custom suffix is not allowed for {aws_service_name}")
role_name = service_role_data.get("role_name")
attached_policies = service_role_data["attached_policies"]
else:
role_name = f"AWSServiceRoleFor{aws_service_name.split('.')[0].capitalize()}"
attached_policies = []
if custom_suffix:
role_name = f"{role_name}_{custom_suffix}"
backend = get_iam_backend(context)

# check for role duplicates
for role in backend.roles.values():
if role.name == role_name:
raise InvalidInputException(
f"Service role name {role_name} has been taken in this account, please try a different suffix."
)

role = backend.create_role(
role_name=role_name,
assume_role_policy_document=policy_doc,
Expand All @@ -336,26 +355,38 @@ def create_service_linked_role(
description=description,
tags={},
max_session_duration=3600,
linked_service=aws_service_name,
)
role.service_linked_role_arn = "arn:{0}:iam::{1}:role/aws-service-role/{2}/{3}".format(
context.partition, context.account_id, aws_service_name, role.name
)
# attach policies
for policy in attached_policies:
try:
backend.attach_role_policy(policy, role_name)
except Exception as e:
LOG.warning(
"Policy %s for service linked role %s does not exist: %s",
policy,
aws_service_name,
e,
)

res_role = self.moto_role_to_role_type(role)
return CreateServiceLinkedRoleResponse(Role=res_role)

def delete_service_linked_role(
self, context: RequestContext, role_name: roleNameType, **kwargs
) -> DeleteServiceLinkedRoleResponse:
# TODO: test
backend = get_iam_backend(context)
role = backend.get_role(role_name=role_name)
role.managed_policies.clear()
backend.delete_role(role_name)
return DeleteServiceLinkedRoleResponse(DeletionTaskId=short_uid())
return DeleteServiceLinkedRoleResponse(
DeletionTaskId=f"task{role.path}{role.name}/{uuid.uuid4()}"
)

def get_service_linked_role_deletion_status(
self, context: RequestContext, deletion_task_id: DeletionTaskIdType, **kwargs
) -> GetServiceLinkedRoleDeletionStatusResponse:
# TODO: test
# TODO: check if task id is valid
return GetServiceLinkedRoleDeletionStatusResponse(Status=DeletionTaskStatusType.SUCCEEDED)

def put_user_permissions_boundary(
Expand Down
Loading
Loading