Skip to content

Commit 24ea26b

Browse files
committed
test: add API key authorization tests
Signed-off-by: Swati Mukund Bagal <[email protected]>
1 parent f26ce5e commit 24ea26b

File tree

3 files changed

+285
-9
lines changed

3 files changed

+285
-9
lines changed

tests/model_serving/maas_billing/maas_subscription/conftest.py

Lines changed: 77 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import pytest
88
import requests
99
from kubernetes.dynamic import DynamicClient
10+
from ocp_resources.cluster_role_binding import ClusterRoleBinding
1011
from ocp_resources.data_science_cluster import DataScienceCluster
1112
from ocp_resources.deployment import Deployment
1213
from ocp_resources.llm_inference_service import LLMInferenceService
@@ -24,10 +25,12 @@
2425
from tests.model_serving.maas_billing.maas_subscription.utils import (
2526
MAAS_DB_NAMESPACE,
2627
MAAS_SUBSCRIPTION_NAMESPACE,
28+
create_and_yield_api_key_id,
2729
create_api_key,
2830
create_maas_subscription,
2931
get_maas_postgres_resources,
3032
patch_llmisvc_with_maas_router_and_tiers,
33+
resolve_api_key_username,
3134
revoke_api_key,
3235
wait_for_postgres_connection_log,
3336
wait_for_postgres_deployment_ready,
@@ -662,22 +665,87 @@ def active_api_key_id(
662665
"""
663666
Create a single active API key and return its ID for revoke tests.
664667
"""
665-
key_name = f"e2e-fixture-key-{generate_random_name()}"
666-
_, body = create_api_key(
668+
yield from create_and_yield_api_key_id(
669+
request_session_http=request_session_http,
667670
base_url=base_url,
668671
ocp_user_token=ocp_token_for_actor,
669-
request_session_http=request_session_http,
670-
api_key_name=key_name,
672+
key_name_prefix="e2e-fixture-key",
671673
)
672-
LOGGER.info(f"active_api_key_id: created key id={body['id']}")
673-
yield body["id"]
674-
LOGGER.info(f"Fixture teardown: revoking key {body['id']}")
675-
revoke_api_key(
674+
675+
676+
@pytest.fixture(scope="function")
677+
def free_user_username(
678+
request_session_http: requests.Session,
679+
base_url: str,
680+
ocp_token_for_actor: str,
681+
active_api_key_id: str,
682+
) -> str:
683+
"""Resolve and return the free (non-admin) actor's username from their active API key."""
684+
username = resolve_api_key_username(
676685
request_session_http=request_session_http,
677686
base_url=base_url,
678-
key_id=body["id"],
687+
key_id=active_api_key_id,
679688
ocp_user_token=ocp_token_for_actor,
680689
)
690+
LOGGER.info(f"free_user_username: resolved username='{username}' from key id={active_api_key_id}")
691+
return username
692+
693+
694+
@pytest.fixture(scope="function")
695+
def admin_username(
696+
request_session_http: requests.Session,
697+
base_url: str,
698+
admin_ocp_token: str,
699+
admin_active_api_key_id: str,
700+
) -> str:
701+
"""Resolve and return the admin actor's username from their active API key."""
702+
username = resolve_api_key_username(
703+
request_session_http=request_session_http,
704+
base_url=base_url,
705+
key_id=admin_active_api_key_id,
706+
ocp_user_token=admin_ocp_token,
707+
)
708+
LOGGER.info(f"admin_username: resolved username='{username}' from key id={admin_active_api_key_id}")
709+
return username
710+
711+
712+
@pytest.fixture(scope="function")
713+
def admin_active_api_key_id(
714+
request_session_http: requests.Session,
715+
base_url: str,
716+
admin_ocp_token: str,
717+
) -> Generator[str, Any, Any]:
718+
"""Create an active API key as the admin user, yield its ID, and revoke on teardown."""
719+
yield from create_and_yield_api_key_id(
720+
request_session_http=request_session_http,
721+
base_url=base_url,
722+
ocp_user_token=admin_ocp_token,
723+
key_name_prefix="e2e-authz-admin",
724+
)
725+
726+
727+
@pytest.fixture(scope="class")
728+
def admin_ocp_token(admin_client: DynamicClient) -> Generator[str, Any, Any]:
729+
"""OCP bearer token for a dedicated SA with cluster-admin ClusterRole, recognised as admin by MaaS API."""
730+
applications_namespace = py_config["applications_namespace"]
731+
sa_name = f"maas-e2e-admin-{generate_random_name()}"
732+
733+
with ServiceAccount(
734+
client=admin_client,
735+
namespace=applications_namespace,
736+
name=sa_name,
737+
teardown=True,
738+
) as sa:
739+
sa.wait(timeout=60)
740+
741+
with ClusterRoleBinding(
742+
client=admin_client,
743+
name=sa_name,
744+
cluster_role="cluster-admin",
745+
subjects=[{"kind": "ServiceAccount", "name": sa_name, "namespace": applications_namespace}],
746+
teardown=True,
747+
):
748+
yield create_inference_token(model_service_account=sa)
681749

682750

683751
@pytest.fixture(scope="function")
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
from __future__ import annotations
2+
3+
import pytest
4+
import requests
5+
from simple_logger.logger import get_logger
6+
7+
from tests.model_serving.maas_billing.maas_subscription.utils import (
8+
list_api_keys,
9+
get_api_key,
10+
revoke_api_key,
11+
)
12+
13+
LOGGER = get_logger(name=__name__)
14+
15+
16+
@pytest.mark.parametrize("ocp_token_for_actor", [{"type": "free"}], indirect=True)
17+
@pytest.mark.usefixtures(
18+
"maas_subscription_controller_enabled_latest",
19+
"maas_gateway_api",
20+
"maas_api_gateway_reachable",
21+
)
22+
class TestAPIKeyAuthorization:
23+
"""Tests for MaaS API key admin and non-admin access control."""
24+
25+
@pytest.mark.tier1
26+
def test_admin_manage_other_users_keys(
27+
self,
28+
request_session_http: requests.Session,
29+
base_url: str,
30+
admin_ocp_token: str,
31+
active_api_key_id: str,
32+
free_user_username: str,
33+
) -> None:
34+
"""Verify an admin can search for another user's keys by username and revoke them."""
35+
list_resp, list_body = list_api_keys(
36+
request_session_http=request_session_http,
37+
base_url=base_url,
38+
ocp_user_token=admin_ocp_token,
39+
filters={"username": free_user_username, "status": ["active"]},
40+
sort={"by": "created_at", "order": "desc"},
41+
pagination={"limit": 50, "offset": 0},
42+
)
43+
assert list_resp.status_code == 200, (
44+
f"Expected 200 on admin search for username='{free_user_username}', "
45+
f"got {list_resp.status_code}: {list_resp.text[:200]}"
46+
)
47+
items: list[dict] = list_body.get("items") or list_body.get("data") or []
48+
key_ids = [item["id"] for item in items]
49+
assert active_api_key_id in key_ids, (
50+
f"Expected free user's key id={active_api_key_id} in admin search results "
51+
f"for username='{free_user_username}', found ids={key_ids}"
52+
)
53+
LOGGER.info(f"[authz] Admin found {len(items)} active key(s) for user='{free_user_username}'")
54+
55+
revoke_resp, revoke_body = revoke_api_key(
56+
request_session_http=request_session_http,
57+
base_url=base_url,
58+
key_id=active_api_key_id,
59+
ocp_user_token=admin_ocp_token,
60+
)
61+
assert revoke_resp.status_code == 200, (
62+
f"Expected 200 on admin DELETE /v1/api-keys/{active_api_key_id}, "
63+
f"got {revoke_resp.status_code}: {revoke_resp.text[:200]}"
64+
)
65+
assert revoke_body.get("status") == "revoked", (
66+
f"Expected status='revoked' in admin revoke response, got: {revoke_body}"
67+
)
68+
LOGGER.info(f"[authz] Admin successfully revoked free user's key id={active_api_key_id}")
69+
70+
@pytest.mark.tier1
71+
def test_non_admin_cannot_access_other_users_keys(
72+
self,
73+
request_session_http: requests.Session,
74+
base_url: str,
75+
ocp_token_for_actor: str,
76+
admin_active_api_key_id: str,
77+
) -> None:
78+
"""Verify a non-admin user gets 404 when accessing another user's API key."""
79+
get_resp, _ = get_api_key(
80+
request_session_http=request_session_http,
81+
base_url=base_url,
82+
key_id=admin_active_api_key_id,
83+
ocp_user_token=ocp_token_for_actor,
84+
)
85+
assert get_resp.status_code == 404, (
86+
f"Expected 404 (IDOR protection) on free user GET of admin's key id={admin_active_api_key_id}, "
87+
f"got {get_resp.status_code}: {get_resp.text[:200]}"
88+
)
89+
LOGGER.info(f"[authz] Free user correctly received 404 on GET of admin's key id={admin_active_api_key_id}")
90+
91+
revoke_resp, _ = revoke_api_key(
92+
request_session_http=request_session_http,
93+
base_url=base_url,
94+
key_id=admin_active_api_key_id,
95+
ocp_user_token=ocp_token_for_actor,
96+
)
97+
assert revoke_resp.status_code == 404, (
98+
f"Expected 404 (IDOR protection) on free user DELETE of admin's key id={admin_active_api_key_id}, "
99+
f"got {revoke_resp.status_code}: {revoke_resp.text[:200]}"
100+
)
101+
LOGGER.info(f"[authz] Free user correctly received 404 on DELETE of admin's key id={admin_active_api_key_id}")
102+
103+
@pytest.mark.tier1
104+
def test_non_admin_search_only_returns_own_keys(
105+
self,
106+
request_session_http: requests.Session,
107+
base_url: str,
108+
ocp_token_for_actor: str,
109+
active_api_key_id: str,
110+
admin_active_api_key_id: str,
111+
) -> None:
112+
"""Verify a non-admin user's search results contain only their own keys."""
113+
list_resp, list_body = list_api_keys(
114+
request_session_http=request_session_http,
115+
base_url=base_url,
116+
ocp_user_token=ocp_token_for_actor,
117+
filters={"status": ["active"]},
118+
sort={"by": "created_at", "order": "desc"},
119+
pagination={"limit": 50, "offset": 0},
120+
)
121+
assert list_resp.status_code == 200, (
122+
f"Expected 200 on free user search, got {list_resp.status_code}: {list_resp.text[:200]}"
123+
)
124+
items: list[dict] = list_body.get("items") or list_body.get("data") or []
125+
key_ids = [item["id"] for item in items]
126+
127+
assert active_api_key_id in key_ids, (
128+
f"Expected free user's own key id={active_api_key_id} in results, found ids={key_ids}"
129+
)
130+
assert admin_active_api_key_id not in key_ids, (
131+
f"Admin's key id={admin_active_api_key_id} must NOT appear in free user's search results"
132+
)
133+
LOGGER.info(
134+
f"[authz] Free user search returned {len(items)} key(s) — "
135+
f"own key present, admin's key correctly excluded"
136+
)
137+
138+
@pytest.mark.tier1
139+
def test_non_admin_cannot_search_by_other_username(
140+
self,
141+
request_session_http: requests.Session,
142+
base_url: str,
143+
ocp_token_for_actor: str,
144+
admin_username: str,
145+
) -> None:
146+
"""Verify a non-admin user gets 403 when searching with another user's username as a filter."""
147+
list_resp, _ = list_api_keys(
148+
request_session_http=request_session_http,
149+
base_url=base_url,
150+
ocp_user_token=ocp_token_for_actor,
151+
filters={"username": admin_username, "status": ["active"]},
152+
sort={"by": "created_at", "order": "desc"},
153+
pagination={"limit": 50, "offset": 0},
154+
)
155+
assert list_resp.status_code == 403, (
156+
f"Expected 403 when free user searches by admin username='{admin_username}', "
157+
f"got {list_resp.status_code}: {list_resp.text[:200]}"
158+
)
159+
LOGGER.info(
160+
f"[authz] Free user correctly received 403 when searching by admin username='{admin_username}'"
161+
)

tests/model_serving/maas_billing/maas_subscription/utils.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
MAAS_GATEWAY_NAMESPACE,
2626
ApiGroups,
2727
)
28+
from utilities.general import generate_random_name
2829

2930
LOGGER = get_logger(name=__name__)
3031
MAAS_SUBSCRIPTION_NAMESPACE = "models-as-a-service"
@@ -282,6 +283,52 @@ def list_api_keys(
282283
return response, parsed_body
283284

284285

286+
def resolve_api_key_username(
287+
request_session_http: requests.Session,
288+
base_url: str,
289+
key_id: str,
290+
ocp_user_token: str,
291+
) -> str:
292+
"""Fetch an API key by ID and return the owner's username."""
293+
get_resp, get_body = get_api_key(
294+
request_session_http=request_session_http,
295+
base_url=base_url,
296+
key_id=key_id,
297+
ocp_user_token=ocp_user_token,
298+
)
299+
assert get_resp.status_code == 200, (
300+
f"Expected 200 on GET /v1/api-keys/{key_id}, got {get_resp.status_code}: {get_resp.text[:200]}"
301+
)
302+
username = get_body.get("username") or get_body.get("owner")
303+
assert username, f"Expected 'username' or 'owner' field in GET response: {get_body}"
304+
return username
305+
306+
307+
def create_and_yield_api_key_id(
308+
request_session_http: requests.Session,
309+
base_url: str,
310+
ocp_user_token: str,
311+
key_name_prefix: str,
312+
) -> Generator[str]:
313+
"""Create an API key, yield its ID, and revoke it on teardown."""
314+
key_name = f"{key_name_prefix}-{generate_random_name()}"
315+
_, body = create_api_key(
316+
base_url=base_url,
317+
ocp_user_token=ocp_user_token,
318+
request_session_http=request_session_http,
319+
api_key_name=key_name,
320+
)
321+
LOGGER.info(f"create_and_yield_api_key_id: created key id={body['id']} name={key_name}")
322+
yield body["id"]
323+
LOGGER.info(f"create_and_yield_api_key_id: teardown revoking key id={body['id']}")
324+
revoke_api_key(
325+
request_session_http=request_session_http,
326+
base_url=base_url,
327+
key_id=body["id"],
328+
ocp_user_token=ocp_user_token,
329+
)
330+
331+
285332
def revoke_api_key(
286333
request_session_http: requests.Session,
287334
base_url: str,

0 commit comments

Comments
 (0)