Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions tests/model_serving/maas_billing/maas_subscription/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,49 @@
CHAT_COMPLETIONS = OpenAIEnpoints.CHAT_COMPLETIONS


# # TEMPORARY: patches maas-controller to pr-498 to fix /v1/models 500 response.
# # Remove once the upstream maas-controller image is fixed (Konflux onboarding complete).
# @pytest.fixture(scope="session")
# def maas_controller_pr498_image(
# admin_client: DynamicClient,
# maas_subscription_controller_enabled_latest: DataScienceCluster,
# ) -> Generator[None, Any, Any]:
# """Patch maas-controller image to pr-498 after MaaS is enabled.

# The maas-controller:latest image has a broken /v1/models endpoint (returns 500).
# This fixture temporarily overrides the image after the operator creates the deployment.
# """
# deployment = Deployment(
# client=admin_client,
# name="maas-controller",
# namespace=MAAS_DB_NAMESPACE,
# )
# deployment.wait_for_condition(condition="Available", status="True", timeout=120)

# with ResourceEditor(
# patches={
# deployment: {
# "metadata": {"annotations": {"opendatahub.io/managed": "false"}},
# "spec": {
# "template": {
# "spec": {
# "containers": [
# {
# "name": "manager",
# "image": "quay.io/opendatahub/maas-controller:pr-498",
# }
# ]
# }
# }
# },
# }
# }
# ):
# deployment.wait_for_condition(condition="Available", status="True", timeout=180)
# LOGGER.info("[TEMPORARY] maas-controller patched to pr-498 image")
# yield


@pytest.fixture(scope="session")
def maas_subscription_controller_enabled_latest(
dsc_resource: DataScienceCluster,
Expand Down Expand Up @@ -809,3 +852,19 @@ def revoked_api_key_id(
assert revoke_body.get("status") == "revoked", f"Expected status='revoked' in DELETE response, got: {revoke_body}"
LOGGER.info(f"revoked_api_key_id: revoked key id={active_api_key_id}")
return active_api_key_id


@pytest.fixture(scope="function")
def short_expiration_api_key_id(
request_session_http: requests.Session,
base_url: str,
ocp_token_for_actor: str,
) -> Generator[str, Any, Any]:
"""Create an API key with 1-hour expiration, yield its ID, and revoke on teardown."""
yield from create_and_yield_api_key_id(
request_session_http=request_session_http,
base_url=base_url,
ocp_user_token=ocp_token_for_actor,
key_name_prefix="e2e-exp-short",
expires_in="1h",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
from __future__ import annotations

import pytest
import requests

from tests.model_serving.maas_billing.maas_subscription.utils import (
assert_api_key_created_ok,
create_api_key,
get_api_key,
)
from utilities.general import generate_random_name
from utilities.opendatahub_logger import get_logger

LOGGER = get_logger(name=__name__)

MAAS_API_KEY_MAX_EXPIRATION_DAYS = 30


@pytest.mark.parametrize("ocp_token_for_actor", [{"type": "admin"}], indirect=True)
@pytest.mark.usefixtures(
"maas_subscription_controller_enabled_latest",
"maas_gateway_api",
"maas_api_gateway_reachable",
)
class TestAPIKeyExpiration:
"""Tests for API key expiration policy enforcement."""

@pytest.mark.tier1
def test_create_key_within_expiration_limit(
self,
request_session_http: requests.Session,
base_url: str,
ocp_token_for_actor: str,
) -> None:
"""Verify creating an API key with expiration below the limit succeeds."""
expires_in_hours = max((MAAS_API_KEY_MAX_EXPIRATION_DAYS // 2) * 24, 24)
key_name = f"e2e-exp-within-{generate_random_name()}"

create_resp, create_body = create_api_key(
base_url=base_url,
ocp_user_token=ocp_token_for_actor,
request_session_http=request_session_http,
api_key_name=key_name,
expires_in=f"{expires_in_hours}h",
raise_on_error=False,
)
assert_api_key_created_ok(create_resp, create_body, required_fields=("key", "expiresAt"))
LOGGER.info(
f"[expiration] Created key within limit: expires_in={expires_in_hours}h, "
f"expiresAt={create_body.get('expiresAt')}"
)

@pytest.mark.tier1
def test_create_key_at_expiration_limit(
self,
request_session_http: requests.Session,
base_url: str,
ocp_token_for_actor: str,
) -> None:
"""Verify creating an API key with expiration exactly at the limit succeeds."""
expires_in_hours = MAAS_API_KEY_MAX_EXPIRATION_DAYS * 24
key_name = f"e2e-exp-at-limit-{generate_random_name()}"

create_resp, create_body = create_api_key(
base_url=base_url,
ocp_user_token=ocp_token_for_actor,
request_session_http=request_session_http,
api_key_name=key_name,
expires_in=f"{expires_in_hours}h",
raise_on_error=False,
)
assert_api_key_created_ok(create_resp, create_body, required_fields=("key", "expiresAt"))
LOGGER.info(
f"[expiration] Created key at limit: expires_in={expires_in_hours}h "
f"({MAAS_API_KEY_MAX_EXPIRATION_DAYS} days)"
)

@pytest.mark.tier1
def test_create_key_exceeds_expiration_limit(
self,
request_session_http: requests.Session,
base_url: str,
ocp_token_for_actor: str,
) -> None:
"""Verify creating an API key with expiration beyond the limit returns 400."""
exceeds_days = MAAS_API_KEY_MAX_EXPIRATION_DAYS * 2
key_name = f"e2e-exp-exceeds-{generate_random_name()}"

create_resp, _ = create_api_key(
base_url=base_url,
ocp_user_token=ocp_token_for_actor,
request_session_http=request_session_http,
api_key_name=key_name,
expires_in=f"{exceeds_days * 24}h",
raise_on_error=False,
)
assert create_resp.status_code == 400, (
f"Expected 400 for expiration exceeding limit "
f"({exceeds_days} days > {MAAS_API_KEY_MAX_EXPIRATION_DAYS} days limit), "
f"got {create_resp.status_code}: {create_resp.text[:200]}"
)
error_text = create_resp.text.lower()
assert "exceed" in error_text or "maximum" in error_text, (
f"Expected error body to mention 'exceed' or 'maximum': {create_resp.text[:200]}"
)
LOGGER.info(
f"[expiration] Correctly rejected key: {exceeds_days} days > {MAAS_API_KEY_MAX_EXPIRATION_DAYS} days limit"
)

@pytest.mark.tier1
def test_create_key_without_expiration(
self,
request_session_http: requests.Session,
base_url: str,
ocp_token_for_actor: str,
active_api_key_id: str,
) -> None:
"""Verify a key created without an expiration field has no expiresAt value."""
get_resp, get_body = get_api_key(
request_session_http=request_session_http,
base_url=base_url,
key_id=active_api_key_id,
ocp_user_token=ocp_token_for_actor,
)
assert get_resp.status_code == 200, (
f"Expected 200 on GET /v1/api-keys/{active_api_key_id}, got {get_resp.status_code}: {get_resp.text[:200]}"
)
expires_at = get_body.get("expiresAt")
LOGGER.info(f"[expiration] Key without expiration field: expiresAt={expires_at!r}")

@pytest.mark.tier2
def test_create_key_with_short_expiration(
self,
request_session_http: requests.Session,
base_url: str,
ocp_token_for_actor: str,
short_expiration_api_key_id: str,
) -> None:
"""Verify a key created with a 1-hour expiration has a non-null expiresAt value."""
get_resp, get_body = get_api_key(
request_session_http=request_session_http,
base_url=base_url,
key_id=short_expiration_api_key_id,
ocp_user_token=ocp_token_for_actor,
)
assert get_resp.status_code == 200, (
f"Expected 200 on GET /v1/api-keys/{short_expiration_api_key_id}, "
f"got {get_resp.status_code}: {get_resp.text[:200]}"
)
assert get_body.get("expirationDate"), (
f"Expected non-null 'expirationDate' for 1h key, got: {get_body.get('expirationDate')!r}"
)
LOGGER.info(f"[expiration] 1h key expirationDate={get_body['expirationDate']}")
33 changes: 31 additions & 2 deletions tests/model_serving/maas_billing/maas_subscription/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,28 +185,42 @@ def create_api_key(
request_session_http: requests.Session,
api_key_name: str,
request_timeout_seconds: int = 60,
expires_in: str | None = None,
raise_on_error: bool = True,
) -> tuple[Response, dict[str, Any]]:
"""
Create an API key via MaaS API and return (response, parsed_body).

Uses ocp_user_token for auth against maas-api.
Expects plaintext key in body["key"] (sk-...).

Args:
expires_in: Optional expiration duration string (e.g. "24h", "720h").
When None, no expiresIn field is sent and the key does not expire.
raise_on_error: When True (default), raises AssertionError for non-200/201
responses. Set to False when testing error cases (e.g. 400 rejection).
"""
api_keys_url = f"{base_url}/v1/api-keys"

payload: dict[str, Any] = {"name": api_key_name}
if expires_in is not None:
payload["expiresIn"] = expires_in

response = request_session_http.post(
url=api_keys_url,
headers={
"Authorization": f"Bearer {ocp_user_token}",
"Content-Type": "application/json",
},
json={"name": api_key_name},
json=payload,
timeout=request_timeout_seconds,
)

LOGGER.info(f"create_api_key: url={api_keys_url} status={response.status_code}")
if response.status_code not in (200, 201):
raise AssertionError(f"api-key create failed: status={response.status_code}")
if raise_on_error:
raise AssertionError(f"api-key create failed: status={response.status_code}")
return response, {}

try:
parsed_body: dict[str, Any] = json.loads(response.text)
Expand Down Expand Up @@ -326,6 +340,7 @@ def create_and_yield_api_key_id(
base_url: str,
ocp_user_token: str,
key_name_prefix: str,
expires_in: str | None = None,
) -> Generator[str]:
"""Create an API key, yield its ID, and revoke it on teardown."""
key_name = f"{key_name_prefix}-{generate_random_name()}"
Expand All @@ -334,6 +349,7 @@ def create_and_yield_api_key_id(
ocp_user_token=ocp_user_token,
request_session_http=request_session_http,
api_key_name=key_name,
expires_in=expires_in,
)
LOGGER.info(f"create_and_yield_api_key_id: created key id={body['id']} name={key_name}")
yield body["id"]
Expand Down Expand Up @@ -431,6 +447,19 @@ def assert_bulk_revoke_success(
return revoked_count


def assert_api_key_created_ok(
resp: Response,
body: dict[str, Any],
required_fields: tuple[str, ...] = ("key",),
) -> None:
"""Assert an API key creation response has a success status and expected fields."""
assert resp.status_code in (200, 201), (
f"Expected 200/201 for API key creation, got {resp.status_code}: {resp.text[:200]}"
)
for field in required_fields:
assert field in body, f"Response must contain '{field}'"


def get_maas_postgres_labels() -> dict[str, str]:
return {
"app": "postgres",
Expand Down