Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions localstack-core/localstack/services/secretsmanager/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,17 +729,28 @@ def backend_rotate_secret(
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()

if self.secrets[secret_id].is_deleted():
secret = self.secrets[secret_id]
if secret.is_deleted():
raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \
perform the operation on a secret that's currently marked deleted."
)
# Resolve rotation_lambda_arn and fallback to previous value if its missing
# from the current request
rotation_lambda_arn = rotation_lambda_arn or secret.rotation_lambda_arn
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the secret is being rotated first time and we haven't specified the rotation_lambda_arn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this was an existing behavior even before I introduce my fix. In case for the first time rotation we did not specify rotation_lambda_arn then it will raise error, check this code here


    try:
        lm_client = connect_to(region_name=self.region_name).lambda_
        lm_client.get_function(FunctionName=rotation_lambda_arn)
    except Exception:
        raise ResourceNotFoundException("Lambda does not exist or could not be accessed")

This will raise error whenever we invoked it from the cli

An error occurred (ResourceNotFoundException) when calling the RotateSecret operation: Lambda does not exist or could not be accessed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What error message does AWS return in this case? Does it match the error we raise in this circumstance? I suspect not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right. I will update the code because I need to add a check earlier to raise an error that match AWS one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error mainly look like this from AWS

        "Error": {
          "Code": "InvalidRequestException",
          "Message": "No Lambda rotation function ARN is associated with this secret."
        },

if not rotation_lambda_arn:
raise InvalidRequestException(
"No Lambda rotation function ARN is associated with this secret."
)

if rotation_lambda_arn:
if len(rotation_lambda_arn) > 2048:
msg = "RotationLambdaARN must <= 2048 characters long."
raise InvalidParameterException(msg)

# In case rotation_period is not provided, resolve auto_rotate_after_days
# and fallback to previous value if its missing from the current request.
rotation_period = secret.auto_rotate_after_days or 0
if rotation_rules:
if rotation_days in rotation_rules:
rotation_period = rotation_rules[rotation_days]
Expand All @@ -753,8 +764,6 @@ def backend_rotate_secret(
except Exception:
raise ResourceNotFoundException("Lambda does not exist or could not be accessed")

secret = self.secrets[secret_id]

# The rotation function must end with the versions of the secret in
# one of two states:
#
Expand Down Expand Up @@ -782,7 +791,7 @@ def backend_rotate_secret(
pass

secret.rotation_lambda_arn = rotation_lambda_arn
secret.auto_rotate_after_days = rotation_rules.get(rotation_days, 0)
secret.auto_rotate_after_days = rotation_period
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to add default value in case rotation_period is not set

Suggested change
secret.auto_rotate_after_days = rotation_period
secret.auto_rotate_after_days = rotation_period or 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this in purpose without or 0 because usually the default value for https://github.com/localstack/moto/blob/localstack/moto/secretsmanager/models.py#L118 based on this logic usually is 0 in case its not set.

For extra check, we can still also fallback to 0
secret.auto_rotate_after_days = rotation_period or 0

I will update this

if secret.auto_rotate_after_days > 0:
wait_interval_s = int(rotation_period) * 86400
secret.next_rotation_date = int(time.time()) + wait_interval_s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,19 @@ def secretsmanager_api():
),
"version_uuid",
),
KeyValueBasedTransformer(
lambda k, v: (
v
if (
isinstance(k, str)
and k == "RotationLambdaARN"
and isinstance(v, str)
and re.match(PATTERN_ARN, v)
)
else None
),
"lambda-arn",
),
SortingTransformer("VersionStages"),
SortingTransformer("Versions", lambda e: e.get("CreatedDate")),
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,14 @@ def finish_secret(service_client, arn, token):
token,
arn,
)
if "AWSPENDING" in metadata["VersionIdsToStages"].get(token, []):
service_client.update_secret_version_stage(
SecretId=arn,
VersionStage="AWSPENDING",
RemoveFromVersionId=token,
)
logger.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can use logger.debug instead of logger.info here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I choose the info is because in the same function for AWSCURRENT and others log levels we are also using info so I thought this was added in purpose, so I stick to use info in this case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only in the lambda function body though right @macnev2013?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that’s right @simonrw. This can be ignored.

"finishSecret: Successfully removed AWSPENDING stage from version %s for secret %s.",
token,
arn,
)
184 changes: 135 additions & 49 deletions tests/aws/services/secretsmanager/test_secretsmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,62 @@ def sm_snapshot(self, snapshot):
snapshot.add_transformers_list(snapshot.transform.secretsmanager_api())
return snapshot

@pytest.fixture
def setup_invalid_rotation_secret(self, secret_name, aws_client, account_id, sm_snapshot):
def _setup(invalid_arn: str | None):
create_secret = aws_client.secretsmanager.create_secret(
Name=secret_name, SecretString="init"
)
sm_snapshot.add_transformer(
sm_snapshot.transform.secretsmanager_secret_id_arn(create_secret, 0)
)
sm_snapshot.match("create_secret", create_secret)
rotation_config = {
"SecretId": secret_name,
"RotationRules": {
"AutomaticallyAfterDays": 1,
},
}
if invalid_arn:
rotation_config["RotationLambdaARN"] = invalid_arn
aws_client.secretsmanager.rotate_secret(**rotation_config)

return _setup

@pytest.fixture
def setup_rotation_secret(
self,
sm_snapshot,
secret_name,
create_secret,
create_lambda_function,
aws_client,
):
cre_res = create_secret(
Name=secret_name,
SecretString="my_secret",
Description="testing rotation of secrets",
)

sm_snapshot.add_transformers_list(
sm_snapshot.transform.secretsmanager_secret_id_arn(cre_res, 0)
)

function_name = f"s-{short_uid()}"
function_arn = create_lambda_function(
handler_file=TEST_LAMBDA_ROTATE_SECRET,
func_name=function_name,
runtime=Runtime.python3_12,
)["CreateFunctionResponse"]["FunctionArn"]

aws_client.lambda_.add_permission(
FunctionName=function_name,
StatementId="secretsManagerPermission",
Action="lambda:InvokeFunction",
Principal="secretsmanager.amazonaws.com",
)
return cre_res["VersionId"], function_arn

@staticmethod
def _wait_created_is_listed(client, secret_id: str):
def _is_secret_in_list():
Expand Down Expand Up @@ -527,49 +583,27 @@ def test_rotate_secret_with_lambda_success(
create_secret,
create_lambda_function,
aws_client,
setup_rotation_secret,
rotate_immediately,
):
"""
Tests secret rotation via a lambda function.
Parametrization ensures we test the default behavior which is an immediate rotation.
"""
cre_res = create_secret(
Name=secret_name,
SecretString="my_secret",
Description="testing rotation of secrets",
)

sm_snapshot.add_transformer(
sm_snapshot.transform.key_value("RotationLambdaARN", "lambda-arn")
)
sm_snapshot.add_transformers_list(
sm_snapshot.transform.secretsmanager_secret_id_arn(cre_res, 0)
)

function_name = f"s-{short_uid()}"
function_arn = create_lambda_function(
handler_file=TEST_LAMBDA_ROTATE_SECRET,
func_name=function_name,
runtime=Runtime.python3_12,
)["CreateFunctionResponse"]["FunctionArn"]
rotation_config = {
"RotationRules": {"AutomaticallyAfterDays": 1},
}
if rotate_immediately:
rotation_config["RotateImmediately"] = rotate_immediately
initial_secret_version, function_arn = setup_rotation_secret

aws_client.lambda_.add_permission(
FunctionName=function_name,
StatementId="secretsManagerPermission",
Action="lambda:InvokeFunction",
Principal="secretsmanager.amazonaws.com",
)
rotation_config = rotation_config or {}
if function_arn:
rotation_config["RotationLambdaARN"] = function_arn

rotation_kwargs = {}
if rotate_immediately is not None:
rotation_kwargs["RotateImmediately"] = rotate_immediately
rot_res = aws_client.secretsmanager.rotate_secret(
SecretId=secret_name,
RotationLambdaARN=function_arn,
RotationRules={
"AutomaticallyAfterDays": 1,
},
**rotation_kwargs,
**rotation_config,
)

sm_snapshot.match("rotate_secret_immediately", rot_res)
Expand All @@ -585,31 +619,75 @@ def test_rotate_secret_with_lambda_success(

sm_snapshot.match("list_secret_versions_rotated_1", list_secret_versions_1)

# As a result of the Lambda invocations. current version should be
# pointed to `AWSCURRENT` & previous version to `AWSPREVIOUS`
assert response["VersionIdsToStages"][initial_secret_version] == ["AWSPREVIOUS"]
assert response["VersionIdsToStages"][rot_res["VersionId"]] == ["AWSCURRENT"]

@markers.snapshot.skip_snapshot_verify(
paths=["$..VersionIdsToStages", "$..Versions", "$..VersionId"]
)
@markers.aws.validated
def test_rotate_secret_multiple_times_with_lambda_success(
self,
sm_snapshot,
secret_name,
create_secret,
create_lambda_function,
aws_client,
setup_rotation_secret,
):
secret_initial_version, function_arn = setup_rotation_secret
runs_config = {
1: {
"RotationRules": {"AutomaticallyAfterDays": 1},
"RotateImmediately": True,
"RotationLambdaARN": function_arn,
},
2: {},
}

for index in range(1, 3):
rotation_config = runs_config[index]

rot_res = aws_client.secretsmanager.rotate_secret(
SecretId=secret_name,
**rotation_config,
)

sm_snapshot.match(f"rotate_secret_immediately_{index}", rot_res)

self._wait_rotation(aws_client.secretsmanager, secret_name, rot_res["VersionId"])

response = aws_client.secretsmanager.describe_secret(SecretId=secret_name)
sm_snapshot.match(f"describe_secret_rotated_{index}", response)

list_secret_versions_1 = aws_client.secretsmanager.list_secret_version_ids(
SecretId=secret_name
)

sm_snapshot.match(f"list_secret_versions_rotated_1_{index}", list_secret_versions_1)

# As a result of the Lambda invocations. current version should be
# pointed to `AWSCURRENT` & previous version to `AWSPREVIOUS`
assert response["VersionIdsToStages"][secret_initial_version] == ["AWSPREVIOUS"]
assert response["VersionIdsToStages"][rot_res["VersionId"]] == ["AWSCURRENT"]

secret_initial_version = aws_client.secretsmanager.get_secret_value(
SecretId=secret_name
)["VersionId"]

@markers.snapshot.skip_snapshot_verify(paths=["$..Error", "$..Message"])
@markers.aws.validated
def test_rotate_secret_invalid_lambda_arn(
self, secret_name, aws_client, account_id, sm_snapshot
self, setup_invalid_rotation_secret, aws_client, sm_snapshot, secret_name, account_id
):
create_secret = aws_client.secretsmanager.create_secret(
Name=secret_name, SecretString="init"
)
sm_snapshot.add_transformer(
sm_snapshot.transform.secretsmanager_secret_id_arn(create_secret, 0)
)
sm_snapshot.match("create_secret", create_secret)

region_name = aws_client.secretsmanager.meta.region_name
invalid_arn = (
f"arn:aws:lambda:{region_name}:{account_id}:function:rotate_secret_invalid_lambda_arn"
)
with pytest.raises(Exception) as e:
aws_client.secretsmanager.rotate_secret(
SecretId=secret_name,
RotationLambdaARN=invalid_arn,
RotationRules={
"AutomaticallyAfterDays": 1,
},
)
setup_invalid_rotation_secret(invalid_arn)
sm_snapshot.match("rotate_secret_invalid_arn_exc", e.value.response)

describe_secret = aws_client.secretsmanager.describe_secret(SecretId=secret_name)
Expand All @@ -618,6 +696,14 @@ def test_rotate_secret_invalid_lambda_arn(
assert "RotationRules" not in describe_secret
assert "RotationLambdaARN" not in describe_secret

@markers.aws.validated
def test_first_rotate_secret_with_missing_lambda_arn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is a start, however it does not execute the rotation itself, therefore testing the error message given by AWS if a lambda function arn is not specified. This will help clarify the error handling here.

Here is a demonstration of the confusion that extracting these helper methods causes. I (incorrectly) believed that you hadn't tried to rotate the lambda function, but when I looked in the snapshot I saw that the _setup_invalid_rotation_secret method tries to rotate a secret without specifying a function ARN.

I would find it much clearer if the test was more like:

  • create secret (using our create_secret fixture)
  • try to rotate secret, wrapped in pytest.raises(...)
  • capturing a snapshot of the error message
  • (optional) a describe secret call, however this is covered by other more specific tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I have been more lean toward refactor this since it created this confusion. Thanks for raising this

self, setup_invalid_rotation_secret, sm_snapshot
):
with pytest.raises(Exception) as e:
setup_invalid_rotation_secret(None)
sm_snapshot.match("rotate_secret_no_arn_exc", e.value.response)

@markers.aws.validated
def test_put_secret_value_with_version_stages(self, sm_snapshot, secret_name, aws_client):
secret_string_v0: str = "secret_string_v0"
Expand Down
Loading