diff --git a/localstack-core/localstack/services/secretsmanager/provider.py b/localstack-core/localstack/services/secretsmanager/provider.py index efefe6220819d..5838732f2c4b0 100644 --- a/localstack-core/localstack/services/secretsmanager/provider.py +++ b/localstack-core/localstack/services/secretsmanager/provider.py @@ -729,17 +729,28 @@ def backend_rotate_secret( if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() - if self.secrets[secret_id].is_deleted(): + secret = self.secrets[secret_id] + if secret.is_deleted(): raise InvalidRequestException( "An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \ perform the operation on a secret that's currently marked deleted." ) + # Resolve rotation_lambda_arn and fallback to previous value if its missing + # from the current request + rotation_lambda_arn = rotation_lambda_arn or secret.rotation_lambda_arn + if not rotation_lambda_arn: + raise InvalidRequestException( + "No Lambda rotation function ARN is associated with this secret." + ) if rotation_lambda_arn: if len(rotation_lambda_arn) > 2048: msg = "RotationLambdaARN must <= 2048 characters long." raise InvalidParameterException(msg) + # In case rotation_period is not provided, resolve auto_rotate_after_days + # and fallback to previous value if its missing from the current request. + rotation_period = secret.auto_rotate_after_days or 0 if rotation_rules: if rotation_days in rotation_rules: rotation_period = rotation_rules[rotation_days] @@ -753,8 +764,6 @@ def backend_rotate_secret( except Exception: raise ResourceNotFoundException("Lambda does not exist or could not be accessed") - secret = self.secrets[secret_id] - # The rotation function must end with the versions of the secret in # one of two states: # @@ -782,7 +791,7 @@ def backend_rotate_secret( pass secret.rotation_lambda_arn = rotation_lambda_arn - secret.auto_rotate_after_days = rotation_rules.get(rotation_days, 0) + secret.auto_rotate_after_days = rotation_period if secret.auto_rotate_after_days > 0: wait_interval_s = int(rotation_period) * 86400 secret.next_rotation_date = int(time.time()) + wait_interval_s diff --git a/localstack-core/localstack/testing/snapshots/transformer_utility.py b/localstack-core/localstack/testing/snapshots/transformer_utility.py index 77a9bdfc6e0b5..7d2d73c844dbb 100644 --- a/localstack-core/localstack/testing/snapshots/transformer_utility.py +++ b/localstack-core/localstack/testing/snapshots/transformer_utility.py @@ -648,6 +648,19 @@ def secretsmanager_api(): ), "version_uuid", ), + KeyValueBasedTransformer( + lambda k, v: ( + v + if ( + isinstance(k, str) + and k == "RotationLambdaARN" + and isinstance(v, str) + and re.match(PATTERN_ARN, v) + ) + else None + ), + "lambda-arn", + ), SortingTransformer("VersionStages"), SortingTransformer("Versions", lambda e: e.get("CreatedDate")), ] diff --git a/tests/aws/services/secretsmanager/functions/lambda_rotate_secret.py b/tests/aws/services/secretsmanager/functions/lambda_rotate_secret.py index 97dcf17736256..ccfebb8621bf3 100644 --- a/tests/aws/services/secretsmanager/functions/lambda_rotate_secret.py +++ b/tests/aws/services/secretsmanager/functions/lambda_rotate_secret.py @@ -224,3 +224,14 @@ def finish_secret(service_client, arn, token): token, arn, ) + if "AWSPENDING" in metadata["VersionIdsToStages"].get(token, []): + service_client.update_secret_version_stage( + SecretId=arn, + VersionStage="AWSPENDING", + RemoveFromVersionId=token, + ) + logger.info( + "finishSecret: Successfully removed AWSPENDING stage from version %s for secret %s.", + token, + arn, + ) diff --git a/tests/aws/services/secretsmanager/test_secretsmanager.py b/tests/aws/services/secretsmanager/test_secretsmanager.py index e8c7456156047..7a91414c6879e 100644 --- a/tests/aws/services/secretsmanager/test_secretsmanager.py +++ b/tests/aws/services/secretsmanager/test_secretsmanager.py @@ -70,6 +70,62 @@ def sm_snapshot(self, snapshot): snapshot.add_transformers_list(snapshot.transform.secretsmanager_api()) return snapshot + @pytest.fixture + def setup_invalid_rotation_secret(self, secret_name, aws_client, account_id, sm_snapshot): + def _setup(invalid_arn: str | None): + create_secret = aws_client.secretsmanager.create_secret( + Name=secret_name, SecretString="init" + ) + sm_snapshot.add_transformer( + sm_snapshot.transform.secretsmanager_secret_id_arn(create_secret, 0) + ) + sm_snapshot.match("create_secret", create_secret) + rotation_config = { + "SecretId": secret_name, + "RotationRules": { + "AutomaticallyAfterDays": 1, + }, + } + if invalid_arn: + rotation_config["RotationLambdaARN"] = invalid_arn + aws_client.secretsmanager.rotate_secret(**rotation_config) + + return _setup + + @pytest.fixture + def setup_rotation_secret( + self, + sm_snapshot, + secret_name, + create_secret, + create_lambda_function, + aws_client, + ): + cre_res = create_secret( + Name=secret_name, + SecretString="my_secret", + Description="testing rotation of secrets", + ) + + sm_snapshot.add_transformers_list( + sm_snapshot.transform.secretsmanager_secret_id_arn(cre_res, 0) + ) + + function_name = f"s-{short_uid()}" + function_arn = create_lambda_function( + handler_file=TEST_LAMBDA_ROTATE_SECRET, + func_name=function_name, + runtime=Runtime.python3_12, + )["CreateFunctionResponse"]["FunctionArn"] + + aws_client.lambda_.add_permission( + FunctionName=function_name, + StatementId="secretsManagerPermission", + Action="lambda:InvokeFunction", + Principal="secretsmanager.amazonaws.com", + ) + return cre_res["VersionId"], function_arn + @staticmethod def _wait_created_is_listed(client, secret_id: str): def _is_secret_in_list(): @@ -527,49 +583,27 @@ def test_rotate_secret_with_lambda_success( create_secret, create_lambda_function, aws_client, + setup_rotation_secret, rotate_immediately, ): """ Tests secret rotation via a lambda function. Parametrization ensures we test the default behavior which is an immediate rotation. """ - cre_res = create_secret( - Name=secret_name, - SecretString="my_secret", - Description="testing rotation of secrets", - ) - - sm_snapshot.add_transformer( - sm_snapshot.transform.key_value("RotationLambdaARN", "lambda-arn") - ) - sm_snapshot.add_transformers_list( - sm_snapshot.transform.secretsmanager_secret_id_arn(cre_res, 0) - ) - - function_name = f"s-{short_uid()}" - function_arn = create_lambda_function( - handler_file=TEST_LAMBDA_ROTATE_SECRET, - func_name=function_name, - runtime=Runtime.python3_12, - )["CreateFunctionResponse"]["FunctionArn"] + rotation_config = { + "RotationRules": {"AutomaticallyAfterDays": 1}, + } + if rotate_immediately: + rotation_config["RotateImmediately"] = rotate_immediately + initial_secret_version, function_arn = setup_rotation_secret - aws_client.lambda_.add_permission( - FunctionName=function_name, - StatementId="secretsManagerPermission", - Action="lambda:InvokeFunction", - Principal="secretsmanager.amazonaws.com", - ) + rotation_config = rotation_config or {} + if function_arn: + rotation_config["RotationLambdaARN"] = function_arn - rotation_kwargs = {} - if rotate_immediately is not None: - rotation_kwargs["RotateImmediately"] = rotate_immediately rot_res = aws_client.secretsmanager.rotate_secret( SecretId=secret_name, - RotationLambdaARN=function_arn, - RotationRules={ - "AutomaticallyAfterDays": 1, - }, - **rotation_kwargs, + **rotation_config, ) sm_snapshot.match("rotate_secret_immediately", rot_res) @@ -585,31 +619,75 @@ def test_rotate_secret_with_lambda_success( sm_snapshot.match("list_secret_versions_rotated_1", list_secret_versions_1) + # As a result of the Lambda invocations. current version should be + # pointed to `AWSCURRENT` & previous version to `AWSPREVIOUS` + assert response["VersionIdsToStages"][initial_secret_version] == ["AWSPREVIOUS"] + assert response["VersionIdsToStages"][rot_res["VersionId"]] == ["AWSCURRENT"] + + @markers.snapshot.skip_snapshot_verify( + paths=["$..VersionIdsToStages", "$..Versions", "$..VersionId"] + ) + @markers.aws.validated + def test_rotate_secret_multiple_times_with_lambda_success( + self, + sm_snapshot, + secret_name, + create_secret, + create_lambda_function, + aws_client, + setup_rotation_secret, + ): + secret_initial_version, function_arn = setup_rotation_secret + runs_config = { + 1: { + "RotationRules": {"AutomaticallyAfterDays": 1}, + "RotateImmediately": True, + "RotationLambdaARN": function_arn, + }, + 2: {}, + } + + for index in range(1, 3): + rotation_config = runs_config[index] + + rot_res = aws_client.secretsmanager.rotate_secret( + SecretId=secret_name, + **rotation_config, + ) + + sm_snapshot.match(f"rotate_secret_immediately_{index}", rot_res) + + self._wait_rotation(aws_client.secretsmanager, secret_name, rot_res["VersionId"]) + + response = aws_client.secretsmanager.describe_secret(SecretId=secret_name) + sm_snapshot.match(f"describe_secret_rotated_{index}", response) + + list_secret_versions_1 = aws_client.secretsmanager.list_secret_version_ids( + SecretId=secret_name + ) + + sm_snapshot.match(f"list_secret_versions_rotated_1_{index}", list_secret_versions_1) + + # As a result of the Lambda invocations. current version should be + # pointed to `AWSCURRENT` & previous version to `AWSPREVIOUS` + assert response["VersionIdsToStages"][secret_initial_version] == ["AWSPREVIOUS"] + assert response["VersionIdsToStages"][rot_res["VersionId"]] == ["AWSCURRENT"] + + secret_initial_version = aws_client.secretsmanager.get_secret_value( + SecretId=secret_name + )["VersionId"] + @markers.snapshot.skip_snapshot_verify(paths=["$..Error", "$..Message"]) @markers.aws.validated def test_rotate_secret_invalid_lambda_arn( - self, secret_name, aws_client, account_id, sm_snapshot + self, setup_invalid_rotation_secret, aws_client, sm_snapshot, secret_name, account_id ): - create_secret = aws_client.secretsmanager.create_secret( - Name=secret_name, SecretString="init" - ) - sm_snapshot.add_transformer( - sm_snapshot.transform.secretsmanager_secret_id_arn(create_secret, 0) - ) - sm_snapshot.match("create_secret", create_secret) - region_name = aws_client.secretsmanager.meta.region_name invalid_arn = ( f"arn:aws:lambda:{region_name}:{account_id}:function:rotate_secret_invalid_lambda_arn" ) with pytest.raises(Exception) as e: - aws_client.secretsmanager.rotate_secret( - SecretId=secret_name, - RotationLambdaARN=invalid_arn, - RotationRules={ - "AutomaticallyAfterDays": 1, - }, - ) + setup_invalid_rotation_secret(invalid_arn) sm_snapshot.match("rotate_secret_invalid_arn_exc", e.value.response) describe_secret = aws_client.secretsmanager.describe_secret(SecretId=secret_name) @@ -618,6 +696,14 @@ def test_rotate_secret_invalid_lambda_arn( assert "RotationRules" not in describe_secret assert "RotationLambdaARN" not in describe_secret + @markers.aws.validated + def test_first_rotate_secret_with_missing_lambda_arn( + self, setup_invalid_rotation_secret, sm_snapshot + ): + with pytest.raises(Exception) as e: + setup_invalid_rotation_secret(None) + sm_snapshot.match("rotate_secret_no_arn_exc", e.value.response) + @markers.aws.validated def test_put_secret_value_with_version_stages(self, sm_snapshot, secret_name, aws_client): secret_string_v0: str = "secret_string_v0" diff --git a/tests/aws/services/secretsmanager/test_secretsmanager.snapshot.json b/tests/aws/services/secretsmanager/test_secretsmanager.snapshot.json index 003987e7c32e2..8e52ed68a419c 100644 --- a/tests/aws/services/secretsmanager/test_secretsmanager.snapshot.json +++ b/tests/aws/services/secretsmanager/test_secretsmanager.snapshot.json @@ -3687,12 +3687,12 @@ } }, "tests/aws/services/secretsmanager/test_secretsmanager.py::TestSecretsManager::test_rotate_secret_with_lambda_success[True]": { - "recorded-date": "28-03-2024, 06:58:46", + "recorded-date": "30-03-2025, 11:45:42", "recorded-content": { "rotate_secret_immediately": { "ARN": "arn::secretsmanager::111111111111:secret:", "Name": "", - "VersionId": "", + "VersionId": "", "ResponseMetadata": { "HTTPHeaders": {}, "HTTPStatusCode": 200 @@ -3714,11 +3714,10 @@ }, "VersionIdsToStages": { "": [ - "AWSCURRENT", - "AWSPENDING" + "AWSPREVIOUS" ], "": [ - "AWSPREVIOUS" + "AWSCURRENT" ] }, "ResponseMetadata": { @@ -3736,7 +3735,7 @@ "DefaultEncryptionKey" ], "LastAccessedDate": "datetime", - "VersionId": "", + "VersionId": "", "VersionStages": [ "AWSPREVIOUS" ] @@ -3746,10 +3745,9 @@ "KmsKeyIds": [ "DefaultEncryptionKey" ], - "VersionId": "", + "VersionId": "", "VersionStages": [ - "AWSCURRENT", - "AWSPENDING" + "AWSCURRENT" ] } ], @@ -3761,7 +3759,7 @@ } }, "tests/aws/services/secretsmanager/test_secretsmanager.py::TestSecretsManager::test_rotate_secret_with_lambda_success[None]": { - "recorded-date": "28-03-2024, 06:58:58", + "recorded-date": "30-03-2025, 11:45:54", "recorded-content": { "rotate_secret_immediately": { "ARN": "arn::secretsmanager::111111111111:secret:", @@ -3791,8 +3789,7 @@ "AWSPREVIOUS" ], "": [ - "AWSCURRENT", - "AWSPENDING" + "AWSCURRENT" ] }, "ResponseMetadata": { @@ -3822,8 +3819,7 @@ ], "VersionId": "", "VersionStages": [ - "AWSCURRENT", - "AWSPENDING" + "AWSCURRENT" ] } ], @@ -4586,5 +4582,184 @@ } } } + }, + "tests/aws/services/secretsmanager/test_secretsmanager.py::TestSecretsManager::test_first_rotate_secret_with_missing_lambda_arn": { + "recorded-date": "27-03-2025, 16:33:46", + "recorded-content": { + "create_secret": { + "ARN": "arn::secretsmanager::111111111111:secret:", + "Name": "", + "VersionId": "", + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "rotate_secret_no_arn_exc": { + "Error": { + "Code": "InvalidRequestException", + "Message": "No Lambda rotation function ARN is associated with this secret." + }, + "Message": "No Lambda rotation function ARN is associated with this secret.", + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 400 + } + }, + "describe_secret": { + "ARN": "arn::secretsmanager::111111111111:secret:", + "CreatedDate": "datetime", + "LastChangedDate": "datetime", + "Name": "", + "VersionIdsToStages": { + "": [ + "AWSCURRENT" + ] + }, + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + } + } + }, + "tests/aws/services/secretsmanager/test_secretsmanager.py::TestSecretsManager::test_rotate_secret_multiple_times_with_lambda_success": { + "recorded-date": "29-03-2025, 09:40:15", + "recorded-content": { + "rotate_secret_immediately_1": { + "ARN": "arn::secretsmanager::111111111111:secret:", + "Name": "", + "VersionId": "", + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "describe_secret_rotated_1": { + "ARN": "arn::secretsmanager::111111111111:secret:", + "CreatedDate": "datetime", + "Description": "testing rotation of secrets", + "LastAccessedDate": "datetime", + "LastChangedDate": "datetime", + "LastRotatedDate": "datetime", + "Name": "", + "NextRotationDate": "datetime", + "RotationEnabled": true, + "RotationLambdaARN": "", + "RotationRules": { + "AutomaticallyAfterDays": 1 + }, + "VersionIdsToStages": { + "": [ + "AWSPREVIOUS" + ], + "": [ + "AWSCURRENT" + ] + }, + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "list_secret_versions_rotated_1_1": { + "ARN": "arn::secretsmanager::111111111111:secret:", + "Name": "", + "Versions": [ + { + "CreatedDate": "datetime", + "KmsKeyIds": [ + "DefaultEncryptionKey" + ], + "LastAccessedDate": "datetime", + "VersionId": "", + "VersionStages": [ + "AWSPREVIOUS" + ] + }, + { + "CreatedDate": "datetime", + "KmsKeyIds": [ + "DefaultEncryptionKey" + ], + "VersionId": "", + "VersionStages": [ + "AWSCURRENT" + ] + } + ], + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "rotate_secret_immediately_2": { + "ARN": "arn::secretsmanager::111111111111:secret:", + "Name": "", + "VersionId": "", + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "describe_secret_rotated_2": { + "ARN": "arn::secretsmanager::111111111111:secret:", + "CreatedDate": "datetime", + "Description": "testing rotation of secrets", + "LastAccessedDate": "datetime", + "LastChangedDate": "datetime", + "LastRotatedDate": "datetime", + "Name": "", + "NextRotationDate": "datetime", + "RotationEnabled": true, + "RotationLambdaARN": "", + "RotationRules": { + "AutomaticallyAfterDays": 1 + }, + "VersionIdsToStages": { + "": [ + "AWSPREVIOUS" + ], + "": [ + "AWSCURRENT" + ] + }, + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "list_secret_versions_rotated_1_2": { + "ARN": "arn::secretsmanager::111111111111:secret:", + "Name": "", + "Versions": [ + { + "CreatedDate": "datetime", + "KmsKeyIds": [ + "DefaultEncryptionKey" + ], + "LastAccessedDate": "datetime", + "VersionId": "", + "VersionStages": [ + "AWSPREVIOUS" + ] + }, + { + "CreatedDate": "datetime", + "KmsKeyIds": [ + "DefaultEncryptionKey" + ], + "VersionId": "", + "VersionStages": [ + "AWSCURRENT" + ] + } + ], + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + } + } } } diff --git a/tests/aws/services/secretsmanager/test_secretsmanager.validation.json b/tests/aws/services/secretsmanager/test_secretsmanager.validation.json index a85ca0d9e3e4a..d44fb5cb56bc5 100644 --- a/tests/aws/services/secretsmanager/test_secretsmanager.validation.json +++ b/tests/aws/services/secretsmanager/test_secretsmanager.validation.json @@ -41,6 +41,9 @@ "tests/aws/services/secretsmanager/test_secretsmanager.py::TestSecretsManager::test_exp_raised_on_creation_of_secret_scheduled_for_deletion": { "last_validated_date": "2024-03-15T08:13:16+00:00" }, + "tests/aws/services/secretsmanager/test_secretsmanager.py::TestSecretsManager::test_first_rotate_secret_with_missing_lambda_arn": { + "last_validated_date": "2025-03-27T16:33:46+00:00" + }, "tests/aws/services/secretsmanager/test_secretsmanager.py::TestSecretsManager::test_force_delete_deleted_secret": { "last_validated_date": "2024-10-11T14:33:45+00:00" }, @@ -101,14 +104,17 @@ "tests/aws/services/secretsmanager/test_secretsmanager.py::TestSecretsManager::test_rotate_secret_invalid_lambda_arn": { "last_validated_date": "2024-03-15T10:11:13+00:00" }, + "tests/aws/services/secretsmanager/test_secretsmanager.py::TestSecretsManager::test_rotate_secret_multiple_times_with_lambda_success": { + "last_validated_date": "2025-03-29T09:40:15+00:00" + }, "tests/aws/services/secretsmanager/test_secretsmanager.py::TestSecretsManager::test_rotate_secret_with_lambda_success": { "last_validated_date": "2024-03-15T08:12:22+00:00" }, "tests/aws/services/secretsmanager/test_secretsmanager.py::TestSecretsManager::test_rotate_secret_with_lambda_success[None]": { - "last_validated_date": "2024-03-28T06:58:56+00:00" + "last_validated_date": "2025-03-30T11:45:54+00:00" }, "tests/aws/services/secretsmanager/test_secretsmanager.py::TestSecretsManager::test_rotate_secret_with_lambda_success[True]": { - "last_validated_date": "2024-03-28T06:58:44+00:00" + "last_validated_date": "2025-03-30T11:45:41+00:00" }, "tests/aws/services/secretsmanager/test_secretsmanager.py::TestSecretsManager::test_secret_exists": { "last_validated_date": "2024-03-15T08:14:33+00:00"