-
Notifications
You must be signed in to change notification settings - Fork 325
feat: Add trust boundary support for service accounts and impersonation. #1778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Add trust boundary support for service accounts and impersonation. #1778
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you handle feature gating the same way we do for pluggable auth? We should align this across all languages.
Also, please add the following test cases:
-
Please add a test scenario for both service_account.Credentials and impersonated_credentials.Credentials where a valid (non-noop) trust boundary is successfully fetched and cached on the first refresh. A second refresh should then mock a RefreshError from the lookup endpoint. The test should assert that no exception is raised and that the initially cached trust boundary data is preserved on the credentials object. This would validate the fallback logic in _refresh_trust_boundary.
-
Include explicit tests for both service_account.Credentials in tests/oauth2/test_service_account.py and impersonated_credentials.Credentials in tests/test_impersonated_credentials.py to confirm that the trust boundary lookup is skipped when the credential is instantiated with a non-default universe_domain. This ensures the check is correctly triggered by the concrete classes.
|
||
request = google_auth_requests.Request() | ||
try: | ||
info = _metadata.get_service_account_info(request, "default") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@harkamaljot you had worked on optimizing this call somewhere else in the code recently. Can you PTAL at this method and see if there are any issues in doing this call or something can be optimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I optimized this call when getting token for the service account, however for getting the trust boundary looks like we need service account email. Based on the documentation(http://cloud/iam/docs/reference/credentials/rest/v1/projects.serviceAccounts/getAllowedLocations) and also trying it out manually as well, I get 400 error when querying this endpoint using default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good! The main thing is to refactor this, if possible, so that the burden of refreshing trust boundaries is handled by the base class.
For better test coverage, we should add a few test cases for some edge scenarios.
-
For the new
with_trust_boundary()
method, it would be great to have tests for each credential type confirming that it returns a new instance and correctly copies all other attributes, modifying only the trust boundary itself. -
What is the expected behavior if
impersonated_credentials
are created withsource_credentials
that don't have trust boundary support? We should probably add a test to ensure thex-allowed-locations
header is not sent in that case. -
For
service_account.Credentials
withalways_use_jwt_access=True
, does the trust boundary lookup still work as expected? -
A test case for when
_build_trust_boundary_lookup_url
is called whenservice_account_email
isNone
would be valuable. The code seems to handle this by raising aValueError
, and it's worth having a test to confirm this behavior.
google/auth/external_account.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets revert these changes and keep them for a future PR. We should only add this with the changes for supporting the feature
|
||
def _metric_header_for_usage(self): | ||
return metrics.CRED_TYPE_SA_IMPERSONATE | ||
|
||
@_helpers.copy_docstring(credentials.Credentials) | ||
def refresh(self, request): | ||
self._update_token(request) | ||
self._refresh_trust_boundary(request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One suggestion for a potential refactoring to improve the design:
Currently, each class that inherits from CredentialsWithTrustBoundary
(e.g., compute_engine.Credentials
, impersonated_credentials.Credentials
, service_account.Credentials
) is required to explicitly call self._refresh_trust_boundary(request)
within its own refresh
method.
To enhance encapsulation and reduce the burden on subclasses, it would be preferable if this call was handled automatically by the base class. Could this be refactored?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great suggestion!
I've refactored the CredentialsWithTrustBoundary base class to handle this. The base refresh() method now handles the entire refresh process. It calls a new abstract method, _refresh_token(), which subclasses must implement, and then it calls _refresh_trust_boundary() itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this work for self signed JWTs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated service_account.Credentials to handle self signed JWTs. When a credential that uses a self-signed JWT also needs to perform a trust boundary lookup, the refresh() method now follows a two-step process:
-
IAM-Specific JWT for Lookup: It first generates a temporary self-signed JWT where the audience is specifically set to the IAM Credentials API. This token is used exclusively to authenticate the request that fetches the trust boundary information.
-
Final JWT for Target API: After the trust boundary is successfully retrieved, the method then proceeds to generate the final self-signed JWT with the audience required by the application's target API (e.g., Pub/Sub, Storage).
This approach ensures that the trust boundary lookup is authenticated correctly using a token the IAM service expects, while the final token used by the application is properly configured for its intended service.
I agree that the header shouldn't be sent in those cases. Added a unit test to cover this scenario.
Also implemented the suggested refactoring and test scenarios. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, great job!
""" | ||
raise NotImplementedError("_refresh_token must be implemented") | ||
|
||
def with_trust_boundary(self, trust_boundary): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method necessary? When would it be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The with_* method is part of a design pattern used for credential objects throughout this library.
We treat credential objects as mostly immutable. You'll see similar factory methods like with_scopes(), with_quota_project(), and with_universe_domain().
@@ -920,7 +920,9 @@ def test_refresh_impersonation_without_client_auth_success( | |||
"Content-Type": "application/json", | |||
"authorization": "Bearer {}".format(token_response["access_token"]), | |||
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, | |||
"x-allowed-locations": "0x0", | |||
# TODO(negarb): Uncomment and update when trust boundary is supported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would prefer to not have commented placeholders here and just add in another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback—I agree it's best to avoid placeholders.
The challenge here is that the commented-out value ("x-allowed-locations": "0x0") is from two years ago and is incorrect under the new design.. Since support for external accounts is out of scope for this particular PR, uncommenting this line would cause tests to fail with the wrong assertion.
I've kept it commented to preserve the test structure for the follow-up PR that will correctly implement this feature for external accounts.
Given this context, would you be okay with keeping these placeholders for this PR, or would you prefer I remove the them and create a separate tracking issue?
@@ -2057,7 +2059,9 @@ def test_refresh_success_with_impersonation_ignore_default_scopes( | |||
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), | |||
"x-goog-user-project": QUOTA_PROJECT_ID, | |||
"x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, | |||
"x-allowed-locations": "0x0", | |||
# TODO(negarb): Uncomment and update when trust boundary is supported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
@@ -570,6 +602,7 @@ def refresh(self, request): | |||
"Content-Type": "application/json", | |||
metrics.API_CLIENT_HEADER: metrics.token_request_id_token_impersonate(), | |||
} | |||
headers.update(self._target_credentials._get_trust_boundary_header()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have to pass trust boundary header for id token request? _get_trust_boundary_header
might not give the headers because an access token is not available in this credential.
# We temporarily set self.token for the base lookup method. | ||
# The base lookup method will call self.apply() which adds the | ||
# authorization header. | ||
original_token = self.token |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be welcoming race conditions. What happens if one thread is updating trust boundary and set self.token
to iam audience and another thread tries to read self.token
@@ -806,13 +920,17 @@ def _refresh_with_iam_endpoint(self, request): | |||
additional_claims={"scope": "https://www.googleapis.com/auth/iam"}, | |||
) | |||
jwt_credentials.refresh(request) | |||
|
|||
headers = self._get_trust_boundary_header() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you will get NotImplementedError here, thrown from ln 863?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, there is no access token in this cred to call trust boundary endpoint
design: go/trust-boundaries-auth-sdk-v2