Skip to content

Commit ceef8f6

Browse files
cloutierMatbentsku
authored andcommitted
added unit tests
1 parent e6e0eec commit ceef8f6

File tree

1 file changed

+240
-0
lines changed

1 file changed

+240
-0
lines changed
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
import pytest
2+
from moto.apigateway.models import APIGatewayBackend, apigateway_backends
3+
from werkzeug.datastructures.headers import Headers
4+
5+
from localstack.aws.api.apigateway import ApiKeySourceType, Method
6+
from localstack.http import Request, Response
7+
from localstack.services.apigateway.models import MergedRestApi, RestApiDeployment
8+
from localstack.services.apigateway.next_gen.execute_api.api import RestApiGatewayHandlerChain
9+
from localstack.services.apigateway.next_gen.execute_api.context import (
10+
ContextVariables,
11+
IdentityContext,
12+
InvocationRequest,
13+
RestApiInvocationContext,
14+
)
15+
from localstack.services.apigateway.next_gen.execute_api.gateway_response import InvalidAPIKeyError
16+
from localstack.services.apigateway.next_gen.execute_api.handlers import ApiKeyValidationHandler
17+
from localstack.testing.config import TEST_AWS_ACCOUNT_ID, TEST_AWS_REGION_NAME
18+
19+
TEST_API_ID = "testapi"
20+
TEST_API_STAGE = "dev"
21+
22+
23+
@pytest.fixture
24+
def moto_backend():
25+
"""
26+
because we depend on Moto here, we have to use the backend because the keys and usage plans
27+
are fetched at runtime in the store directly. We should avoid using this fixture directly in
28+
the tests and favor reusable fixture that could later on be replaced to populate the Localstack
29+
store instead without impacting the tests
30+
"""
31+
moto_backend: APIGatewayBackend = apigateway_backends[TEST_AWS_ACCOUNT_ID][TEST_AWS_REGION_NAME]
32+
yield moto_backend
33+
moto_backend.reset()
34+
35+
36+
@pytest.fixture
37+
def create_context():
38+
"""
39+
Create a context populated with what we would expect to receive from the chain at runtime.
40+
We assume that the parser and other handler have successfully populated the context to this point.
41+
"""
42+
43+
def _create_context(
44+
method: Method = None,
45+
api_key_source: ApiKeySourceType = None,
46+
headers: dict[str, str] = None,
47+
api_key: str = None,
48+
):
49+
context = RestApiInvocationContext(Request())
50+
51+
# The api key validator only relies on the raw headers from the invocation requests
52+
context.invocation_request = InvocationRequest(raw_headers=Headers(headers))
53+
54+
# Frozen deployment populated by the router
55+
context.deployment = RestApiDeployment(
56+
account_id=TEST_AWS_ACCOUNT_ID,
57+
region=TEST_AWS_REGION_NAME,
58+
rest_api=MergedRestApi(
59+
# TODO validate that this value is always populated by localstack. AWS defaults to HEADERS on all new apis
60+
rest_api={"apiKeySource": api_key_source or ApiKeySourceType.HEADER}
61+
),
62+
)
63+
64+
# Context populated by parser handler
65+
context.region = TEST_AWS_REGION_NAME
66+
context.account_id = TEST_AWS_ACCOUNT_ID
67+
context.stage = TEST_API_STAGE
68+
context.api_id = TEST_API_ID
69+
context.resource_method = method or Method()
70+
context.context_variables = ContextVariables()
71+
72+
# Context populated by a Lambda Authorizer
73+
if api_key is not None:
74+
context.context_variables["identity"] = IdentityContext(apiKey=api_key)
75+
return context
76+
77+
return _create_context
78+
79+
80+
@pytest.fixture
81+
def api_key_validation_handler():
82+
"""Returns a dummy api key validation handler invoker for testing."""
83+
84+
def _handler_invoker(context: RestApiInvocationContext):
85+
return ApiKeyValidationHandler()(RestApiGatewayHandlerChain(), context, Response())
86+
87+
return _handler_invoker
88+
89+
90+
@pytest.fixture
91+
def create_usage_plan(moto_backend):
92+
def _create_usage_plan(attach_stage: bool, attach_key_id: str = None, backend=None):
93+
backend = backend or moto_backend
94+
stage_config = {}
95+
if attach_stage:
96+
stage_config = {"apiStages": [{"apiId": TEST_API_ID, "stage": TEST_API_STAGE}]}
97+
usage_plan = backend.create_usage_plan(stage_config)
98+
if attach_key_id:
99+
backend.create_usage_plan_key(
100+
usage_plan_id=usage_plan.id, payload={"keyId": attach_key_id, "keyType": "API_KEY"}
101+
)
102+
return usage_plan
103+
104+
return _create_usage_plan
105+
106+
107+
@pytest.fixture
108+
def create_api_key(moto_backend):
109+
def _create_api_key(key_value: str, enabled: bool = True, backend=None):
110+
backend = backend or moto_backend
111+
return backend.create_api_key({"enabled": enabled, "value": key_value})
112+
113+
return _create_api_key
114+
115+
116+
class TestHandlerApiKeyValidation:
117+
def test_no_api_key_required(self, create_context, api_key_validation_handler):
118+
api_key_validation_handler(create_context())
119+
120+
def test_api_key_headers_valid(
121+
self, create_context, api_key_validation_handler, create_usage_plan, create_api_key
122+
):
123+
method = Method(apiKeyRequired=True)
124+
api_key_value = "01234567890123456789"
125+
126+
# create api key
127+
api_key = create_api_key(api_key_value)
128+
# create usage plan and attach key
129+
create_usage_plan(attach_stage=True, attach_key_id=api_key.id)
130+
# pass the key in the request headers
131+
ctx = create_context(method=method, headers={"x-api-key": api_key_value})
132+
133+
# Call handler
134+
api_key_validation_handler(context=ctx)
135+
136+
assert ctx.context_variables["identity"]["apiKey"] == api_key_value
137+
assert ctx.context_variables["identity"]["apiKeyId"] == api_key.id
138+
139+
def test_api_key_headers_absent(
140+
self, create_context, api_key_validation_handler, create_api_key, create_usage_plan
141+
):
142+
method = Method(apiKeyRequired=True)
143+
api_key_value = "01234567890123456789"
144+
145+
# create api key
146+
api_key = create_api_key(api_key_value)
147+
# create usage plan and attach key
148+
create_usage_plan(attach_stage=True, attach_key_id=api_key.id)
149+
150+
with pytest.raises(InvalidAPIKeyError) as e:
151+
api_key_validation_handler(
152+
# missing headers will raise error
153+
context=create_context(method=method, headers={})
154+
)
155+
assert e.value.message == "Forbidden"
156+
157+
def test_api_key_no_api_key(
158+
self, create_context, api_key_validation_handler, create_usage_plan
159+
):
160+
method = Method(apiKeyRequired=True)
161+
api_key_value = "01234567890123456789"
162+
163+
# Create usage plan with no keys
164+
create_usage_plan(attach_stage=True)
165+
166+
with pytest.raises(InvalidAPIKeyError) as e:
167+
api_key_validation_handler(
168+
context=create_context(method=method, headers={"x-api-key": api_key_value})
169+
)
170+
assert e.value.message == "Forbidden"
171+
172+
def test_api_key_no_usage_plan_key(
173+
self, create_context, api_key_validation_handler, create_api_key, create_usage_plan
174+
):
175+
method = Method(apiKeyRequired=True)
176+
api_key_value = "01234567890123456789"
177+
178+
# create api key
179+
create_api_key(api_key_value)
180+
# Create usage plan but the key won't be associated
181+
create_usage_plan(attach_stage=True)
182+
183+
with pytest.raises(InvalidAPIKeyError) as e:
184+
api_key_validation_handler(
185+
context=create_context(method=method, headers={"x-api-key": api_key_value})
186+
)
187+
assert e.value.message == "Forbidden"
188+
189+
def test_api_key_disabled(
190+
self, create_context, api_key_validation_handler, create_api_key, create_usage_plan
191+
):
192+
method = Method(apiKeyRequired=True)
193+
api_key_value = "01234567890123456789"
194+
195+
# Create api key but set `Enabled` to False
196+
api_key = create_api_key(api_key_value, enabled=False)
197+
# create usage plan and attach key
198+
create_usage_plan(attach_stage=True, attach_key_id=api_key.id)
199+
200+
with pytest.raises(InvalidAPIKeyError) as e:
201+
api_key_validation_handler(
202+
context=create_context(method=method, headers={"x-api-key": api_key_value})
203+
)
204+
assert e.value.message == "Forbidden"
205+
206+
def test_api_key_in_identity_context(
207+
self, create_context, api_key_validation_handler, create_api_key, create_usage_plan
208+
):
209+
method = Method(apiKeyRequired=True)
210+
api_key_value = "01234567890123456789"
211+
212+
# create api key
213+
api_key = create_api_key(api_key_value)
214+
# create usage plan and attach key
215+
create_usage_plan(attach_stage=True, attach_key_id=api_key.id)
216+
217+
api_key_validation_handler(
218+
context=create_context(
219+
# The frozen api has key source set to AUTHORIZER and the api_key was populated by the Authorizer
220+
method=method,
221+
api_key=api_key_value,
222+
api_key_source=ApiKeySourceType.AUTHORIZER,
223+
)
224+
)
225+
226+
def test_api_key_in_identity_context_api_not_configured(
227+
self, create_context, api_key_validation_handler, create_api_key, create_usage_plan
228+
):
229+
method = Method(apiKeyRequired=True)
230+
api_key_value = "01234567890123456789"
231+
232+
# create api key
233+
api_key = create_api_key(api_key_value)
234+
# create usage plan and attach key
235+
create_usage_plan(attach_stage=True, attach_key_id=api_key.id)
236+
237+
with pytest.raises(InvalidAPIKeyError) as e:
238+
# The api_key was populated by the Authorizer, but missing frozen api configuration
239+
api_key_validation_handler(context=create_context(method=method, api_key=api_key_value))
240+
assert e.value.message == "Forbidden"

0 commit comments

Comments
 (0)