Skip to content

Commit 564523d

Browse files
SB159dbasunagmwaykole
authored
tests(maas-billing): add token mint + utils and fixtures (#792)
* tests(maas-billing): add token mint + utils and fixtures Signed-off-by: Swati Mukund Bagal <sbagal@redhat.com> * tests(maas-billing): address review comments * maas-billing: updated review comments * maas-billing: Updated utils as per review comments * maas-billing: address review comments * maas-billing: Using existing llmd-utils-LLMInferenceService URL * maas-billing:Updated as per review comments * maas-billing: Updated conftest * maas-billing: Updated conftest --------- Signed-off-by: Swati Mukund Bagal <sbagal@redhat.com> Co-authored-by: Debarati Basu-Nag <dbasunag@redhat.com> Co-authored-by: Milind Waykole <mwaykole@redhat.com>
1 parent cbee0c5 commit 564523d

File tree

3 files changed

+131
-0
lines changed

3 files changed

+131
-0
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from typing import Generator
2+
3+
import pytest
4+
import requests
5+
from simple_logger.logger import get_logger
6+
7+
from tests.model_serving.model_server.maas_billing.utils import (
8+
detect_scheme_via_llmisvc,
9+
host_from_ingress_domain,
10+
mint_token,
11+
)
12+
13+
LOGGER = get_logger(name=__name__)
14+
15+
16+
@pytest.fixture(scope="session")
17+
def request_session_http() -> Generator[requests.Session, None, None]:
18+
session = requests.Session()
19+
session.headers.update({"User-Agent": "odh-maas-billing-tests/1"})
20+
session.verify = False
21+
yield session
22+
session.close()
23+
24+
25+
@pytest.fixture(scope="class")
26+
def minted_token(request_session_http, base_url: str, current_client_token: str) -> str:
27+
"""Mint a MaaS token once per test class and reuse it."""
28+
resp, body = mint_token(
29+
base_url=base_url,
30+
oc_user_token=current_client_token,
31+
minutes=30,
32+
http_session=request_session_http,
33+
)
34+
LOGGER.info("Mint token response status=%s", resp.status_code)
35+
assert resp.status_code in (200, 201), f"mint failed: {resp.status_code} {resp.text[:200]}"
36+
token = body.get("token", "")
37+
assert isinstance(token, str) and len(token) > 10, f"no usable token in response: {body}"
38+
LOGGER.info(f"Minted MaaS token len={len(token)}")
39+
return token
40+
41+
42+
@pytest.fixture(scope="module")
43+
def base_url(admin_client) -> str:
44+
scheme = detect_scheme_via_llmisvc(client=admin_client, namespace="llm")
45+
host = host_from_ingress_domain(client=admin_client)
46+
return f"{scheme}://{host}/maas-api"
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import json
2+
3+
from tests.model_serving.model_server.maas_billing.utils import b64url_decode
4+
5+
6+
class TestMintedToken:
7+
def test_minted_token_generated(self, minted_token: str) -> None:
8+
"""Smoke: a MaaS token can be minted."""
9+
assert isinstance(minted_token, str) and len(minted_token) > 10, "no usable token minted"
10+
11+
def test_minted_token_is_jwt(self, minted_token: str) -> None:
12+
"""Minted token looks like a JWT and has a JSON header."""
13+
parts = minted_token.split(".")
14+
assert len(parts) == 3, "not a JWT (expected header.payload.signature)"
15+
16+
header_json = b64url_decode(parts[0]).decode("utf-8")
17+
header = json.loads(header_json)
18+
assert isinstance(header, dict), "JWT header not a JSON object"
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from typing import Dict
2+
3+
import base64
4+
import requests
5+
from json import JSONDecodeError
6+
from ocp_resources.ingress_config_openshift_io import Ingress as IngressConfig
7+
from requests import Response
8+
from urllib.parse import urlparse
9+
from ocp_resources.llm_inference_service import LLMInferenceService
10+
from utilities.llmd_utils import get_llm_inference_url
11+
12+
13+
def host_from_ingress_domain(client) -> str:
14+
"""Return 'maas.<ingress-domain>'"""
15+
ingress_config = IngressConfig(name="cluster", client=client, ensure_exists=True)
16+
domain = ingress_config.instance.spec.get("domain")
17+
assert domain, "Ingress 'cluster' missing spec.domain (ingresses.config.openshift.io)"
18+
return f"maas.{domain}"
19+
20+
21+
def detect_scheme_via_llmisvc(client, namespace: str = "llm") -> str:
22+
"""
23+
Using LLMInferenceService's URL to infer the scheme.
24+
"""
25+
for inference_service in LLMInferenceService.get(dyn_client=client, namespace=namespace):
26+
status_conditions = inference_service.instance.status.get("conditions", [])
27+
service_is_ready = any(
28+
condition_entry.get("type") == "Ready" and condition_entry.get("status") == "True"
29+
for condition_entry in status_conditions
30+
)
31+
if service_is_ready:
32+
url = get_llm_inference_url(llm_service=inference_service)
33+
scheme = (urlparse(url).scheme or "").lower()
34+
if scheme in ("http", "https"):
35+
return scheme
36+
return "http"
37+
38+
39+
def maas_auth_headers(token: str) -> Dict[str, str]:
40+
"""Build Authorization header for MaaS/Billing calls."""
41+
return {"Authorization": f"Bearer {token}"}
42+
43+
44+
def mint_token(
45+
base_url: str,
46+
oc_user_token: str,
47+
http_session: requests.Session,
48+
minutes: int = 10,
49+
) -> tuple[Response, dict]:
50+
"""Mint a MaaS token."""
51+
resp = http_session.post(
52+
f"{base_url}/v1/tokens",
53+
headers=maas_auth_headers(token=oc_user_token),
54+
json={"ttl": f"{minutes}m"},
55+
timeout=60,
56+
)
57+
try:
58+
body = resp.json()
59+
except (JSONDecodeError, ValueError):
60+
body = {}
61+
return resp, body
62+
63+
64+
def b64url_decode(encoded_str: str) -> bytes:
65+
padding = "=" * (-len(encoded_str) % 4)
66+
padded_bytes = (encoded_str + padding).encode(encoding="utf-8")
67+
return base64.urlsafe_b64decode(s=padded_bytes)

0 commit comments

Comments
 (0)