-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
Secret Manager: Solve the issue for rotate secret after sub-sequent r… #12391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
63ea482
1203f02
2148b3d
8b3a1a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -729,17 +729,28 @@ def backend_rotate_secret( | |||||
if not self._is_valid_identifier(secret_id): | ||||||
raise SecretNotFoundException() | ||||||
|
||||||
if self.secrets[secret_id].is_deleted(): | ||||||
secret = self.secrets[secret_id] | ||||||
if secret.is_deleted(): | ||||||
raise InvalidRequestException( | ||||||
"An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \ | ||||||
perform the operation on a secret that's currently marked deleted." | ||||||
) | ||||||
# Resolve rotation_lambda_arn and fallback to previous value if its missing | ||||||
# from the current request | ||||||
rotation_lambda_arn = rotation_lambda_arn or secret.rotation_lambda_arn | ||||||
if not rotation_lambda_arn: | ||||||
raise InvalidRequestException( | ||||||
"No Lambda rotation function ARN is associated with this secret." | ||||||
) | ||||||
|
||||||
if rotation_lambda_arn: | ||||||
if len(rotation_lambda_arn) > 2048: | ||||||
msg = "RotationLambdaARN must <= 2048 characters long." | ||||||
raise InvalidParameterException(msg) | ||||||
|
||||||
# In case rotation_period is not provided, resolve auto_rotate_after_days | ||||||
# and fallback to previous value if its missing from the current request. | ||||||
rotation_period = secret.auto_rotate_after_days or 0 | ||||||
if rotation_rules: | ||||||
if rotation_days in rotation_rules: | ||||||
rotation_period = rotation_rules[rotation_days] | ||||||
|
@@ -753,8 +764,6 @@ def backend_rotate_secret( | |||||
except Exception: | ||||||
raise ResourceNotFoundException("Lambda does not exist or could not be accessed") | ||||||
|
||||||
secret = self.secrets[secret_id] | ||||||
|
||||||
# The rotation function must end with the versions of the secret in | ||||||
# one of two states: | ||||||
# | ||||||
|
@@ -782,7 +791,7 @@ def backend_rotate_secret( | |||||
pass | ||||||
|
||||||
secret.rotation_lambda_arn = rotation_lambda_arn | ||||||
secret.auto_rotate_after_days = rotation_rules.get(rotation_days, 0) | ||||||
secret.auto_rotate_after_days = rotation_period | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. better to add default value in case
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added this in purpose without For extra check, we can still also fallback to 0 I will update this |
||||||
if secret.auto_rotate_after_days > 0: | ||||||
wait_interval_s = int(rotation_period) * 86400 | ||||||
secret.next_rotation_date = int(time.time()) + wait_interval_s | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -224,3 +224,14 @@ def finish_secret(service_client, arn, token): | |
token, | ||
arn, | ||
) | ||
if "AWSPENDING" in metadata["VersionIdsToStages"].get(token, []): | ||
service_client.update_secret_version_stage( | ||
SecretId=arn, | ||
VersionStage="AWSPENDING", | ||
RemoveFromVersionId=token, | ||
) | ||
logger.info( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we can use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason why I choose the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is only in the lambda function body though right @macnev2013? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that’s right @simonrw. This can be ignored. |
||
"finishSecret: Successfully removed AWSPENDING stage from version %s for secret %s.", | ||
token, | ||
arn, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,6 +70,62 @@ def sm_snapshot(self, snapshot): | |
snapshot.add_transformers_list(snapshot.transform.secretsmanager_api()) | ||
return snapshot | ||
|
||
@pytest.fixture | ||
def setup_invalid_rotation_secret(self, secret_name, aws_client, account_id, sm_snapshot): | ||
def _setup(invalid_arn: str | None): | ||
create_secret = aws_client.secretsmanager.create_secret( | ||
Name=secret_name, SecretString="init" | ||
) | ||
sm_snapshot.add_transformer( | ||
sm_snapshot.transform.secretsmanager_secret_id_arn(create_secret, 0) | ||
) | ||
sm_snapshot.match("create_secret", create_secret) | ||
rotation_config = { | ||
"SecretId": secret_name, | ||
"RotationRules": { | ||
"AutomaticallyAfterDays": 1, | ||
}, | ||
} | ||
if invalid_arn: | ||
rotation_config["RotationLambdaARN"] = invalid_arn | ||
aws_client.secretsmanager.rotate_secret(**rotation_config) | ||
|
||
return _setup | ||
|
||
@pytest.fixture | ||
def setup_rotation_secret( | ||
self, | ||
sm_snapshot, | ||
secret_name, | ||
create_secret, | ||
create_lambda_function, | ||
aws_client, | ||
): | ||
cre_res = create_secret( | ||
Name=secret_name, | ||
SecretString="my_secret", | ||
Description="testing rotation of secrets", | ||
) | ||
|
||
sm_snapshot.add_transformers_list( | ||
sm_snapshot.transform.secretsmanager_secret_id_arn(cre_res, 0) | ||
) | ||
|
||
function_name = f"s-{short_uid()}" | ||
function_arn = create_lambda_function( | ||
handler_file=TEST_LAMBDA_ROTATE_SECRET, | ||
func_name=function_name, | ||
runtime=Runtime.python3_12, | ||
)["CreateFunctionResponse"]["FunctionArn"] | ||
|
||
aws_client.lambda_.add_permission( | ||
FunctionName=function_name, | ||
StatementId="secretsManagerPermission", | ||
Action="lambda:InvokeFunction", | ||
Principal="secretsmanager.amazonaws.com", | ||
) | ||
return cre_res["VersionId"], function_arn | ||
|
||
@staticmethod | ||
def _wait_created_is_listed(client, secret_id: str): | ||
def _is_secret_in_list(): | ||
|
@@ -527,49 +583,27 @@ def test_rotate_secret_with_lambda_success( | |
create_secret, | ||
create_lambda_function, | ||
aws_client, | ||
setup_rotation_secret, | ||
rotate_immediately, | ||
): | ||
""" | ||
Tests secret rotation via a lambda function. | ||
Parametrization ensures we test the default behavior which is an immediate rotation. | ||
""" | ||
cre_res = create_secret( | ||
Name=secret_name, | ||
SecretString="my_secret", | ||
Description="testing rotation of secrets", | ||
) | ||
|
||
sm_snapshot.add_transformer( | ||
sm_snapshot.transform.key_value("RotationLambdaARN", "lambda-arn") | ||
) | ||
sm_snapshot.add_transformers_list( | ||
sm_snapshot.transform.secretsmanager_secret_id_arn(cre_res, 0) | ||
) | ||
|
||
function_name = f"s-{short_uid()}" | ||
function_arn = create_lambda_function( | ||
handler_file=TEST_LAMBDA_ROTATE_SECRET, | ||
func_name=function_name, | ||
runtime=Runtime.python3_12, | ||
)["CreateFunctionResponse"]["FunctionArn"] | ||
rotation_config = { | ||
"RotationRules": {"AutomaticallyAfterDays": 1}, | ||
} | ||
if rotate_immediately: | ||
rotation_config["RotateImmediately"] = rotate_immediately | ||
initial_secret_version, function_arn = setup_rotation_secret | ||
|
||
aws_client.lambda_.add_permission( | ||
FunctionName=function_name, | ||
StatementId="secretsManagerPermission", | ||
Action="lambda:InvokeFunction", | ||
Principal="secretsmanager.amazonaws.com", | ||
) | ||
rotation_config = rotation_config or {} | ||
if function_arn: | ||
rotation_config["RotationLambdaARN"] = function_arn | ||
|
||
rotation_kwargs = {} | ||
if rotate_immediately is not None: | ||
rotation_kwargs["RotateImmediately"] = rotate_immediately | ||
rot_res = aws_client.secretsmanager.rotate_secret( | ||
SecretId=secret_name, | ||
RotationLambdaARN=function_arn, | ||
RotationRules={ | ||
"AutomaticallyAfterDays": 1, | ||
}, | ||
**rotation_kwargs, | ||
**rotation_config, | ||
) | ||
|
||
sm_snapshot.match("rotate_secret_immediately", rot_res) | ||
|
@@ -585,31 +619,75 @@ def test_rotate_secret_with_lambda_success( | |
|
||
sm_snapshot.match("list_secret_versions_rotated_1", list_secret_versions_1) | ||
|
||
# As a result of the Lambda invocations. current version should be | ||
# pointed to `AWSCURRENT` & previous version to `AWSPREVIOUS` | ||
assert response["VersionIdsToStages"][initial_secret_version] == ["AWSPREVIOUS"] | ||
assert response["VersionIdsToStages"][rot_res["VersionId"]] == ["AWSCURRENT"] | ||
|
||
@markers.snapshot.skip_snapshot_verify( | ||
paths=["$..VersionIdsToStages", "$..Versions", "$..VersionId"] | ||
) | ||
@markers.aws.validated | ||
def test_rotate_secret_multiple_times_with_lambda_success( | ||
self, | ||
sm_snapshot, | ||
secret_name, | ||
create_secret, | ||
create_lambda_function, | ||
aws_client, | ||
setup_rotation_secret, | ||
): | ||
secret_initial_version, function_arn = setup_rotation_secret | ||
runs_config = { | ||
1: { | ||
"RotationRules": {"AutomaticallyAfterDays": 1}, | ||
"RotateImmediately": True, | ||
"RotationLambdaARN": function_arn, | ||
}, | ||
2: {}, | ||
} | ||
|
||
for index in range(1, 3): | ||
rotation_config = runs_config[index] | ||
|
||
rot_res = aws_client.secretsmanager.rotate_secret( | ||
SecretId=secret_name, | ||
**rotation_config, | ||
) | ||
|
||
sm_snapshot.match(f"rotate_secret_immediately_{index}", rot_res) | ||
|
||
self._wait_rotation(aws_client.secretsmanager, secret_name, rot_res["VersionId"]) | ||
|
||
response = aws_client.secretsmanager.describe_secret(SecretId=secret_name) | ||
sm_snapshot.match(f"describe_secret_rotated_{index}", response) | ||
|
||
list_secret_versions_1 = aws_client.secretsmanager.list_secret_version_ids( | ||
SecretId=secret_name | ||
) | ||
|
||
sm_snapshot.match(f"list_secret_versions_rotated_1_{index}", list_secret_versions_1) | ||
|
||
# As a result of the Lambda invocations. current version should be | ||
# pointed to `AWSCURRENT` & previous version to `AWSPREVIOUS` | ||
assert response["VersionIdsToStages"][secret_initial_version] == ["AWSPREVIOUS"] | ||
assert response["VersionIdsToStages"][rot_res["VersionId"]] == ["AWSCURRENT"] | ||
|
||
secret_initial_version = aws_client.secretsmanager.get_secret_value( | ||
SecretId=secret_name | ||
)["VersionId"] | ||
|
||
@markers.snapshot.skip_snapshot_verify(paths=["$..Error", "$..Message"]) | ||
@markers.aws.validated | ||
def test_rotate_secret_invalid_lambda_arn( | ||
self, secret_name, aws_client, account_id, sm_snapshot | ||
self, setup_invalid_rotation_secret, aws_client, sm_snapshot, secret_name, account_id | ||
): | ||
create_secret = aws_client.secretsmanager.create_secret( | ||
Name=secret_name, SecretString="init" | ||
) | ||
sm_snapshot.add_transformer( | ||
sm_snapshot.transform.secretsmanager_secret_id_arn(create_secret, 0) | ||
) | ||
sm_snapshot.match("create_secret", create_secret) | ||
|
||
region_name = aws_client.secretsmanager.meta.region_name | ||
invalid_arn = ( | ||
f"arn:aws:lambda:{region_name}:{account_id}:function:rotate_secret_invalid_lambda_arn" | ||
) | ||
with pytest.raises(Exception) as e: | ||
aws_client.secretsmanager.rotate_secret( | ||
SecretId=secret_name, | ||
RotationLambdaARN=invalid_arn, | ||
RotationRules={ | ||
"AutomaticallyAfterDays": 1, | ||
}, | ||
) | ||
setup_invalid_rotation_secret(invalid_arn) | ||
sm_snapshot.match("rotate_secret_invalid_arn_exc", e.value.response) | ||
|
||
describe_secret = aws_client.secretsmanager.describe_secret(SecretId=secret_name) | ||
|
@@ -618,6 +696,14 @@ def test_rotate_secret_invalid_lambda_arn( | |
assert "RotationRules" not in describe_secret | ||
assert "RotationLambdaARN" not in describe_secret | ||
|
||
@markers.aws.validated | ||
def test_first_rotate_secret_with_missing_lambda_arn( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Here is a demonstration of the confusion that extracting these helper methods causes. I (incorrectly) believed that you hadn't tried to rotate the lambda function, but when I looked in the snapshot I saw that the I would find it much clearer if the test was more like:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now I have been more lean toward refactor this since it created this confusion. Thanks for raising this |
||
self, setup_invalid_rotation_secret, sm_snapshot | ||
): | ||
with pytest.raises(Exception) as e: | ||
setup_invalid_rotation_secret(None) | ||
sm_snapshot.match("rotate_secret_no_arn_exc", e.value.response) | ||
|
||
@markers.aws.validated | ||
def test_put_secret_value_with_version_stages(self, sm_snapshot, secret_name, aws_client): | ||
secret_string_v0: str = "secret_string_v0" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the secret is being rotated first time and we haven't specified the
rotation_lambda_arn
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this was an existing behavior even before I introduce my fix. In case for the first time rotation we did not specify
rotation_lambda_arn
then it will raise error, check this code hereThis will raise error whenever we invoked it from the cli
An error occurred (ResourceNotFoundException) when calling the RotateSecret operation: Lambda does not exist or could not be accessed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What error message does AWS return in this case? Does it match the error we raise in this circumstance? I suspect not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right. I will update the code because I need to add a check earlier to raise an error that match AWS one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error mainly look like this from AWS