Skip to content

Commit 0238cb9

Browse files
committed
tests(maas-billing): add token mint + utils and fixtures
Signed-off-by: Swati Mukund Bagal <sbagal@redhat.com>
1 parent f96d0ac commit 0238cb9

File tree

3 files changed

+189
-0
lines changed

3 files changed

+189
-0
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from __future__ import annotations
2+
3+
from typing import Generator
4+
5+
import pytest
6+
import requests
7+
8+
from tests.model_serving.model_server.maas_billing.utils import (
9+
choose_scheme_via_gateway,
10+
host_from_ingress_domain,
11+
current_user_bearer_via_oc,
12+
)
13+
14+
15+
@pytest.fixture(scope="session")
16+
def request_session_http() -> Generator[requests.Session, None, None]:
17+
s = requests.Session()
18+
s.verify = False
19+
s.headers.update({"User-Agent": "odh-maas-billing-tests/1"})
20+
try:
21+
yield s
22+
finally:
23+
s.close()
24+
25+
26+
@pytest.fixture(scope="session")
27+
def http(request_session_http: requests.Session) -> requests.Session:
28+
return request_session_http
29+
30+
31+
@pytest.fixture(scope="session", name="maas_user_token_for_api_calls")
32+
def maas_user_token_for_api_calls() -> str:
33+
token = current_user_bearer_via_oc()
34+
assert token, "Empty token from 'oc whoami -t'"
35+
return token
36+
37+
38+
@pytest.fixture(scope="module")
39+
def base_url(admin_client) -> str:
40+
scheme = choose_scheme_via_gateway(client=admin_client)
41+
host = host_from_ingress_domain(client=admin_client)
42+
return f"{scheme}://{host}/maas-api"
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from __future__ import annotations
2+
3+
import json
4+
5+
from tests.model_serving.model_server.maas_billing.utils import (
6+
mint_token,
7+
b64url_decode,
8+
)
9+
10+
11+
def test_minted_token_generated(
12+
http,
13+
base_url: str,
14+
maas_user_token_for_api_calls: str,
15+
) -> None:
16+
"""Smoke: a MaaS token can be minted (no JWT shape checks)."""
17+
resp, body = mint_token(
18+
base_url=base_url,
19+
oc_user_token=maas_user_token_for_api_calls,
20+
minutes=10,
21+
http=http,
22+
)
23+
assert resp.status_code in (200, 201), f"mint failed: {resp.status_code} {resp.text[:200]}"
24+
tok = body.get("token", "")
25+
print(f"[debug] MaaS token (truncated): {tok[:12]}...{tok[-12:]}")
26+
assert isinstance(tok, str) and len(tok) > 10, f"no usable token in response: {body}"
27+
28+
29+
def test_minted_token_is_jwt(http, base_url: str, oc_user_token: str) -> None:
30+
resp, body = mint_token(
31+
base_url=base_url,
32+
oc_user_token=oc_user_token,
33+
minutes=10,
34+
http=http,
35+
)
36+
assert resp.status_code in (200, 201), f"mint failed: {resp.status_code} {resp.text[:200]}"
37+
38+
token = body.get("token", "")
39+
parts = token.split(".")
40+
assert len(parts) == 3, "not a JWT (expected header.payload.signature)"
41+
42+
header_json = b64url_decode(parts[0]).decode("utf-8")
43+
header = json.loads(header_json)
44+
assert isinstance(header, dict), "JWT header not a JSON object"
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
from __future__ import annotations
2+
3+
from typing import Dict, Optional
4+
5+
import base64
6+
import requests
7+
from ocp_resources.gateway_gateway_networking_k8s_io import Gateway
8+
from ocp_resources.ingress_config_openshift_io import Ingress as IngressConfig
9+
from pyhelper_utils.shell import run_command
10+
from requests import Response
11+
12+
13+
def host_from_ingress_domain(client) -> str:
14+
"""
15+
Return 'maas.<ingress-domain>' using the wrapper (no 'oc get' calls).
16+
"""
17+
ing = IngressConfig(name="cluster", client=client)
18+
assert ing.exists, "ingresses.config.openshift.io/cluster not found"
19+
20+
spec = ing.instance.spec
21+
domain = spec.get("domain") if isinstance(spec, dict) else getattr(spec, "domain", None)
22+
assert domain, "spec.domain is missing on Ingress 'cluster'"
23+
24+
return f"maas.{domain}"
25+
26+
27+
def scheme_from_gateway(gw: Gateway) -> str:
28+
"""
29+
Decide 'http' or 'https' from Gateway listeners.
30+
Rule: if any listener has protocol HTTPS or tls set, return 'https'; else 'http'.
31+
listeners is a list[dict] per the Gateway wrapper.
32+
"""
33+
listeners = gw.instance.spec.get("listeners", [])
34+
for listener in listeners:
35+
protocol = (listener.get("protocol") or "").upper()
36+
if protocol == "HTTPS" or listener.get("tls"):
37+
return "https"
38+
return "http"
39+
40+
41+
def choose_scheme_via_gateway(client) -> str:
42+
"""Cluster-wide; prefer maas-default-gateway if present, else first."""
43+
gateways = list(Gateway.get(client=client))
44+
if not gateways:
45+
return "http"
46+
47+
gw = next(
48+
(g for g in gateways if (g.instance.metadata.name or "") == "maas-default-gateway"),
49+
gateways[0],
50+
)
51+
return scheme_from_gateway(gw=gw)
52+
53+
54+
def current_user_bearer_via_oc() -> str:
55+
"""
56+
Return the current oc login token from `oc whoami -t`.
57+
"""
58+
rc, out, err = run_command(command=["oc", "whoami", "-t"])
59+
assert (rc is True) or (rc == 0), f"failed to get token via 'oc whoami -t': rc={rc} err={err}"
60+
token = (out or "").strip()
61+
assert token, "empty token from 'oc whoami -t'"
62+
return token
63+
64+
65+
def maas_auth_headers(token: str) -> Dict[str, str]:
66+
"""Build Authorization header for MaaS/Billing calls."""
67+
return {"Authorization": f"Bearer {token}"}
68+
69+
70+
def mint_token(
71+
base_url: str,
72+
oc_user_token: str,
73+
minutes: int = 10,
74+
http: Optional[requests.Session] = None,
75+
) -> tuple[Response, dict]:
76+
"""Mint a MaaS token.
77+
78+
Args:
79+
base_url: MaaS API base, e.g. "https://maas.apps.../maas-api".
80+
oc_user_token: Bearer used to mint the token (usually `oc whoami -t`).
81+
minutes: Token TTL in minutes.
82+
83+
Returns:
84+
(raw requests.Response, parsed_json_or_empty_dict)
85+
"""
86+
assert http is not None, "HTTP session is required (pass the fixture)"
87+
resp = http.post(
88+
f"{base_url}/v1/tokens",
89+
headers=maas_auth_headers(token=oc_user_token),
90+
json={"ttl": f"{minutes}m"},
91+
timeout=60,
92+
)
93+
try:
94+
body = resp.json()
95+
except Exception:
96+
body = {}
97+
return resp, body
98+
99+
100+
def b64url_decode(s: str) -> bytes:
101+
pad = "=" * (-len(s) % 4)
102+
# keyword-arg to satisfy FCN001 rule:
103+
return base64.urlsafe_b64decode(s=(s + pad).encode("utf-8"))

0 commit comments

Comments
 (0)