Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions tests/model_serving/maas_billing/maas_subscription/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,36 @@ def two_active_api_key_ids(
)


@pytest.fixture(scope="function")
def three_active_api_key_ids(
request_session_http: requests.Session,
base_url: str,
ocp_token_for_actor: str,
) -> Generator[list[str], Any, Any]:
"""Create three active API keys and yield their IDs for bulk-revoke tests."""
key_ids = [
create_api_key(
base_url=base_url,
ocp_user_token=ocp_token_for_actor,
request_session_http=request_session_http,
api_key_name=f"e2e-bulk-key-{index}-{generate_random_name()}",
)[1]["id"]
for index in range(1, 4)
]
LOGGER.info(f"three_active_api_key_ids: created keys {key_ids}")
yield key_ids
for key_id in key_ids:
LOGGER.info(f"three_active_api_key_ids: teardown revoking key {key_id}")
revoke_resp, _ = revoke_api_key(
request_session_http=request_session_http,
base_url=base_url,
key_id=key_id,
ocp_user_token=ocp_token_for_actor,
)
if revoke_resp.status_code not in (200, 404):
raise AssertionError(f"Unexpected teardown status for key id={key_id}: {revoke_resp.status_code}")


@pytest.fixture(scope="function")
def active_api_key_id(
request_session_http: requests.Session,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from __future__ import annotations

import pytest
import requests
from simple_logger.logger import get_logger

from tests.model_serving.maas_billing.maas_subscription.utils import (
assert_bulk_revoke_success,
bulk_revoke_api_keys,
get_api_key,
resolve_api_key_username,
)

LOGGER = get_logger(name=__name__)


@pytest.mark.usefixtures(
"maas_subscription_controller_enabled_latest",
"maas_gateway_api",
"maas_api_gateway_reachable",
)
class TestAPIKeyBulkOperations:
"""Tests for MaaS API key bulk revoke operations."""

@pytest.mark.tier1
@pytest.mark.parametrize("ocp_token_for_actor", [{"type": "admin"}], indirect=True)
def test_bulk_revoke_own_keys(
self,
request_session_http: requests.Session,
base_url: str,
ocp_token_for_actor: str,
three_active_api_key_ids: list[str],
) -> None:
"""Verify a user can bulk revoke all their own active API keys."""
username = resolve_api_key_username(
request_session_http=request_session_http,
base_url=base_url,
key_id=three_active_api_key_ids[0],
ocp_user_token=ocp_token_for_actor,
)

revoked_count = assert_bulk_revoke_success(
request_session_http=request_session_http,
base_url=base_url,
ocp_user_token=ocp_token_for_actor,
username=username,
min_revoked_count=3,
)
LOGGER.info(f"[bulk-revoke] User {username} bulk revoked {revoked_count} key(s)")

for key_id in three_active_api_key_ids:
get_resp, get_body = get_api_key(
request_session_http=request_session_http,
base_url=base_url,
key_id=key_id,
ocp_user_token=ocp_token_for_actor,
)
assert get_resp.status_code == 200, (
f"Expected 200 on GET /v1/api-keys/{key_id} after bulk-revoke, "
f"got {get_resp.status_code}: {get_resp.text[:200]}"
)
assert get_body.get("status") == "revoked", (
f"Expected key id={key_id} to have status='revoked', got: {get_body.get('status')}"
)
LOGGER.info(f"[bulk-revoke] All {len(three_active_api_key_ids)} key(s) confirmed revoked")

@pytest.mark.tier1
@pytest.mark.parametrize("ocp_token_for_actor", [{"type": "free"}], indirect=True)
def test_bulk_revoke_other_user_forbidden(
self,
request_session_http: requests.Session,
base_url: str,
ocp_token_for_actor: str,
) -> None:
"""Verify a non-admin user gets 403 when attempting to bulk revoke another user's keys."""
bulk_resp, _ = bulk_revoke_api_keys(
request_session_http=request_session_http,
base_url=base_url,
ocp_user_token=ocp_token_for_actor,
username="someotheruser",
)
assert bulk_resp.status_code == 403, (
f"Expected 403 (non-admin cannot bulk revoke other users), "
f"got {bulk_resp.status_code}: {bulk_resp.text[:200]}"
)
LOGGER.info("[bulk-revoke] Non-admin correctly received 403 when attempting to bulk revoke another user's keys")

@pytest.mark.tier1
@pytest.mark.parametrize("ocp_token_for_actor", [{"type": "free"}], indirect=True)
def test_bulk_revoke_admin_can_revoke_any_user(
self,
request_session_http: requests.Session,
base_url: str,
active_api_key_id: str,
free_user_username: str,
admin_ocp_token: str,
) -> None:
"""Verify an admin can bulk revoke any user's active API keys."""
revoked_count = assert_bulk_revoke_success(
request_session_http=request_session_http,
base_url=base_url,
ocp_user_token=admin_ocp_token,
username=free_user_username,
min_revoked_count=1,
)
LOGGER.info(f"[bulk-revoke] Admin successfully revoked {revoked_count} key(s) for user {free_user_username}")
55 changes: 55 additions & 0 deletions tests/model_serving/maas_billing/maas_subscription/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,61 @@ def revoke_api_key(
return response, parsed_body


def bulk_revoke_api_keys(
request_session_http: requests.Session,
base_url: str,
ocp_user_token: str,
username: str,
request_timeout_seconds: int = 60,
) -> tuple[Response, dict[str, Any]]:
"""
Bulk revoke all active API keys for a given user via MaaS API (POST /v1/api-keys/bulk-revoke).

"""
url = f"{base_url}/v1/api-keys/bulk-revoke"
response = request_session_http.post(
url=url,
headers={
"Authorization": f"Bearer {ocp_user_token}",
"Content-Type": "application/json",
},
json={"username": username},
timeout=request_timeout_seconds,
)
LOGGER.info(f"bulk_revoke_api_keys: url={url} username={username} status={response.status_code}")
try:
parsed_body: dict[str, Any] = json.loads(response.text)
except json.JSONDecodeError as error:
raise AssertionError(
f"bulk_revoke_api_keys returned non-JSON response: status={response.status_code} body={response.text[:200]}"
) from error
return response, parsed_body


def assert_bulk_revoke_success(
request_session_http: requests.Session,
base_url: str,
ocp_user_token: str,
username: str,
min_revoked_count: int = 1,
) -> int:
"""Bulk revoke API keys for a user and assert the operation succeeded."""
bulk_resp, bulk_body = bulk_revoke_api_keys(
request_session_http=request_session_http,
base_url=base_url,
ocp_user_token=ocp_user_token,
username=username,
)
assert bulk_resp.status_code == 200, (
f"Expected 200 on bulk-revoke for user {username}, got {bulk_resp.status_code}: {bulk_resp.text[:200]}"
)
revoked_count: int = bulk_body.get("revokedCount", 0)
assert revoked_count >= min_revoked_count, (
f"Expected at least {min_revoked_count} revoked key(s), got revokedCount={revoked_count}"
)
return revoked_count


def get_maas_postgres_labels() -> dict[str, str]:
return {
"app": "postgres",
Expand Down
Loading