Skip to content

Commit 727155e

Browse files
authored
Secret Manager: Solve the issue for rotate secret after sub-sequent r… (#12391)
1 parent 8999cc4 commit 727155e

File tree

6 files changed

+369
-69
lines changed

6 files changed

+369
-69
lines changed

localstack-core/localstack/services/secretsmanager/provider.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -729,17 +729,28 @@ def backend_rotate_secret(
729729
if not self._is_valid_identifier(secret_id):
730730
raise SecretNotFoundException()
731731

732-
if self.secrets[secret_id].is_deleted():
732+
secret = self.secrets[secret_id]
733+
if secret.is_deleted():
733734
raise InvalidRequestException(
734735
"An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \
735736
perform the operation on a secret that's currently marked deleted."
736737
)
738+
# Resolve rotation_lambda_arn and fallback to previous value if its missing
739+
# from the current request
740+
rotation_lambda_arn = rotation_lambda_arn or secret.rotation_lambda_arn
741+
if not rotation_lambda_arn:
742+
raise InvalidRequestException(
743+
"No Lambda rotation function ARN is associated with this secret."
744+
)
737745

738746
if rotation_lambda_arn:
739747
if len(rotation_lambda_arn) > 2048:
740748
msg = "RotationLambdaARN must <= 2048 characters long."
741749
raise InvalidParameterException(msg)
742750

751+
# In case rotation_period is not provided, resolve auto_rotate_after_days
752+
# and fallback to previous value if its missing from the current request.
753+
rotation_period = secret.auto_rotate_after_days or 0
743754
if rotation_rules:
744755
if rotation_days in rotation_rules:
745756
rotation_period = rotation_rules[rotation_days]
@@ -753,8 +764,6 @@ def backend_rotate_secret(
753764
except Exception:
754765
raise ResourceNotFoundException("Lambda does not exist or could not be accessed")
755766

756-
secret = self.secrets[secret_id]
757-
758767
# The rotation function must end with the versions of the secret in
759768
# one of two states:
760769
#
@@ -782,7 +791,7 @@ def backend_rotate_secret(
782791
pass
783792

784793
secret.rotation_lambda_arn = rotation_lambda_arn
785-
secret.auto_rotate_after_days = rotation_rules.get(rotation_days, 0)
794+
secret.auto_rotate_after_days = rotation_period
786795
if secret.auto_rotate_after_days > 0:
787796
wait_interval_s = int(rotation_period) * 86400
788797
secret.next_rotation_date = int(time.time()) + wait_interval_s

localstack-core/localstack/testing/snapshots/transformer_utility.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,19 @@ def secretsmanager_api():
648648
),
649649
"version_uuid",
650650
),
651+
KeyValueBasedTransformer(
652+
lambda k, v: (
653+
v
654+
if (
655+
isinstance(k, str)
656+
and k == "RotationLambdaARN"
657+
and isinstance(v, str)
658+
and re.match(PATTERN_ARN, v)
659+
)
660+
else None
661+
),
662+
"lambda-arn",
663+
),
651664
SortingTransformer("VersionStages"),
652665
SortingTransformer("Versions", lambda e: e.get("CreatedDate")),
653666
]

tests/aws/services/secretsmanager/functions/lambda_rotate_secret.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,3 +224,14 @@ def finish_secret(service_client, arn, token):
224224
token,
225225
arn,
226226
)
227+
if "AWSPENDING" in metadata["VersionIdsToStages"].get(token, []):
228+
service_client.update_secret_version_stage(
229+
SecretId=arn,
230+
VersionStage="AWSPENDING",
231+
RemoveFromVersionId=token,
232+
)
233+
logger.info(
234+
"finishSecret: Successfully removed AWSPENDING stage from version %s for secret %s.",
235+
token,
236+
arn,
237+
)

tests/aws/services/secretsmanager/test_secretsmanager.py

Lines changed: 135 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,62 @@ def sm_snapshot(self, snapshot):
7070
snapshot.add_transformers_list(snapshot.transform.secretsmanager_api())
7171
return snapshot
7272

73+
@pytest.fixture
74+
def setup_invalid_rotation_secret(self, secret_name, aws_client, account_id, sm_snapshot):
75+
def _setup(invalid_arn: str | None):
76+
create_secret = aws_client.secretsmanager.create_secret(
77+
Name=secret_name, SecretString="init"
78+
)
79+
sm_snapshot.add_transformer(
80+
sm_snapshot.transform.secretsmanager_secret_id_arn(create_secret, 0)
81+
)
82+
sm_snapshot.match("create_secret", create_secret)
83+
rotation_config = {
84+
"SecretId": secret_name,
85+
"RotationRules": {
86+
"AutomaticallyAfterDays": 1,
87+
},
88+
}
89+
if invalid_arn:
90+
rotation_config["RotationLambdaARN"] = invalid_arn
91+
aws_client.secretsmanager.rotate_secret(**rotation_config)
92+
93+
return _setup
94+
95+
@pytest.fixture
96+
def setup_rotation_secret(
97+
self,
98+
sm_snapshot,
99+
secret_name,
100+
create_secret,
101+
create_lambda_function,
102+
aws_client,
103+
):
104+
cre_res = create_secret(
105+
Name=secret_name,
106+
SecretString="my_secret",
107+
Description="testing rotation of secrets",
108+
)
109+
110+
sm_snapshot.add_transformers_list(
111+
sm_snapshot.transform.secretsmanager_secret_id_arn(cre_res, 0)
112+
)
113+
114+
function_name = f"s-{short_uid()}"
115+
function_arn = create_lambda_function(
116+
handler_file=TEST_LAMBDA_ROTATE_SECRET,
117+
func_name=function_name,
118+
runtime=Runtime.python3_12,
119+
)["CreateFunctionResponse"]["FunctionArn"]
120+
121+
aws_client.lambda_.add_permission(
122+
FunctionName=function_name,
123+
StatementId="secretsManagerPermission",
124+
Action="lambda:InvokeFunction",
125+
Principal="secretsmanager.amazonaws.com",
126+
)
127+
return cre_res["VersionId"], function_arn
128+
73129
@staticmethod
74130
def _wait_created_is_listed(client, secret_id: str):
75131
def _is_secret_in_list():
@@ -527,49 +583,27 @@ def test_rotate_secret_with_lambda_success(
527583
create_secret,
528584
create_lambda_function,
529585
aws_client,
586+
setup_rotation_secret,
530587
rotate_immediately,
531588
):
532589
"""
533590
Tests secret rotation via a lambda function.
534591
Parametrization ensures we test the default behavior which is an immediate rotation.
535592
"""
536-
cre_res = create_secret(
537-
Name=secret_name,
538-
SecretString="my_secret",
539-
Description="testing rotation of secrets",
540-
)
541-
542-
sm_snapshot.add_transformer(
543-
sm_snapshot.transform.key_value("RotationLambdaARN", "lambda-arn")
544-
)
545-
sm_snapshot.add_transformers_list(
546-
sm_snapshot.transform.secretsmanager_secret_id_arn(cre_res, 0)
547-
)
548-
549-
function_name = f"s-{short_uid()}"
550-
function_arn = create_lambda_function(
551-
handler_file=TEST_LAMBDA_ROTATE_SECRET,
552-
func_name=function_name,
553-
runtime=Runtime.python3_12,
554-
)["CreateFunctionResponse"]["FunctionArn"]
593+
rotation_config = {
594+
"RotationRules": {"AutomaticallyAfterDays": 1},
595+
}
596+
if rotate_immediately:
597+
rotation_config["RotateImmediately"] = rotate_immediately
598+
initial_secret_version, function_arn = setup_rotation_secret
555599

556-
aws_client.lambda_.add_permission(
557-
FunctionName=function_name,
558-
StatementId="secretsManagerPermission",
559-
Action="lambda:InvokeFunction",
560-
Principal="secretsmanager.amazonaws.com",
561-
)
600+
rotation_config = rotation_config or {}
601+
if function_arn:
602+
rotation_config["RotationLambdaARN"] = function_arn
562603

563-
rotation_kwargs = {}
564-
if rotate_immediately is not None:
565-
rotation_kwargs["RotateImmediately"] = rotate_immediately
566604
rot_res = aws_client.secretsmanager.rotate_secret(
567605
SecretId=secret_name,
568-
RotationLambdaARN=function_arn,
569-
RotationRules={
570-
"AutomaticallyAfterDays": 1,
571-
},
572-
**rotation_kwargs,
606+
**rotation_config,
573607
)
574608

575609
sm_snapshot.match("rotate_secret_immediately", rot_res)
@@ -585,31 +619,75 @@ def test_rotate_secret_with_lambda_success(
585619

586620
sm_snapshot.match("list_secret_versions_rotated_1", list_secret_versions_1)
587621

622+
# As a result of the Lambda invocations. current version should be
623+
# pointed to `AWSCURRENT` & previous version to `AWSPREVIOUS`
624+
assert response["VersionIdsToStages"][initial_secret_version] == ["AWSPREVIOUS"]
625+
assert response["VersionIdsToStages"][rot_res["VersionId"]] == ["AWSCURRENT"]
626+
627+
@markers.snapshot.skip_snapshot_verify(
628+
paths=["$..VersionIdsToStages", "$..Versions", "$..VersionId"]
629+
)
630+
@markers.aws.validated
631+
def test_rotate_secret_multiple_times_with_lambda_success(
632+
self,
633+
sm_snapshot,
634+
secret_name,
635+
create_secret,
636+
create_lambda_function,
637+
aws_client,
638+
setup_rotation_secret,
639+
):
640+
secret_initial_version, function_arn = setup_rotation_secret
641+
runs_config = {
642+
1: {
643+
"RotationRules": {"AutomaticallyAfterDays": 1},
644+
"RotateImmediately": True,
645+
"RotationLambdaARN": function_arn,
646+
},
647+
2: {},
648+
}
649+
650+
for index in range(1, 3):
651+
rotation_config = runs_config[index]
652+
653+
rot_res = aws_client.secretsmanager.rotate_secret(
654+
SecretId=secret_name,
655+
**rotation_config,
656+
)
657+
658+
sm_snapshot.match(f"rotate_secret_immediately_{index}", rot_res)
659+
660+
self._wait_rotation(aws_client.secretsmanager, secret_name, rot_res["VersionId"])
661+
662+
response = aws_client.secretsmanager.describe_secret(SecretId=secret_name)
663+
sm_snapshot.match(f"describe_secret_rotated_{index}", response)
664+
665+
list_secret_versions_1 = aws_client.secretsmanager.list_secret_version_ids(
666+
SecretId=secret_name
667+
)
668+
669+
sm_snapshot.match(f"list_secret_versions_rotated_1_{index}", list_secret_versions_1)
670+
671+
# As a result of the Lambda invocations. current version should be
672+
# pointed to `AWSCURRENT` & previous version to `AWSPREVIOUS`
673+
assert response["VersionIdsToStages"][secret_initial_version] == ["AWSPREVIOUS"]
674+
assert response["VersionIdsToStages"][rot_res["VersionId"]] == ["AWSCURRENT"]
675+
676+
secret_initial_version = aws_client.secretsmanager.get_secret_value(
677+
SecretId=secret_name
678+
)["VersionId"]
679+
588680
@markers.snapshot.skip_snapshot_verify(paths=["$..Error", "$..Message"])
589681
@markers.aws.validated
590682
def test_rotate_secret_invalid_lambda_arn(
591-
self, secret_name, aws_client, account_id, sm_snapshot
683+
self, setup_invalid_rotation_secret, aws_client, sm_snapshot, secret_name, account_id
592684
):
593-
create_secret = aws_client.secretsmanager.create_secret(
594-
Name=secret_name, SecretString="init"
595-
)
596-
sm_snapshot.add_transformer(
597-
sm_snapshot.transform.secretsmanager_secret_id_arn(create_secret, 0)
598-
)
599-
sm_snapshot.match("create_secret", create_secret)
600-
601685
region_name = aws_client.secretsmanager.meta.region_name
602686
invalid_arn = (
603687
f"arn:aws:lambda:{region_name}:{account_id}:function:rotate_secret_invalid_lambda_arn"
604688
)
605689
with pytest.raises(Exception) as e:
606-
aws_client.secretsmanager.rotate_secret(
607-
SecretId=secret_name,
608-
RotationLambdaARN=invalid_arn,
609-
RotationRules={
610-
"AutomaticallyAfterDays": 1,
611-
},
612-
)
690+
setup_invalid_rotation_secret(invalid_arn)
613691
sm_snapshot.match("rotate_secret_invalid_arn_exc", e.value.response)
614692

615693
describe_secret = aws_client.secretsmanager.describe_secret(SecretId=secret_name)
@@ -618,6 +696,14 @@ def test_rotate_secret_invalid_lambda_arn(
618696
assert "RotationRules" not in describe_secret
619697
assert "RotationLambdaARN" not in describe_secret
620698

699+
@markers.aws.validated
700+
def test_first_rotate_secret_with_missing_lambda_arn(
701+
self, setup_invalid_rotation_secret, sm_snapshot
702+
):
703+
with pytest.raises(Exception) as e:
704+
setup_invalid_rotation_secret(None)
705+
sm_snapshot.match("rotate_secret_no_arn_exc", e.value.response)
706+
621707
@markers.aws.validated
622708
def test_put_secret_value_with_version_stages(self, sm_snapshot, secret_name, aws_client):
623709
secret_string_v0: str = "secret_string_v0"

0 commit comments

Comments
 (0)