Skip to content

Commit 9b5871d

Browse files
committed
feat: add RBAC test for SA token
Signed-off-by: lugi0 <lgiorgi@redhat.com>
1 parent f1d3546 commit 9b5871d

File tree

2 files changed

+346
-1
lines changed

2 files changed

+346
-1
lines changed

tests/model_registry/conftest.py

Lines changed: 201 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import pytest
22
import re
3+
import random
4+
import string
5+
import subprocess
36
import schemathesis
4-
from typing import Generator, Any
7+
import shlex
8+
from typing import Generator, Any, List, Dict
59
from kubernetes.dynamic.exceptions import ResourceNotFoundError
610
from ocp_resources.pod import Pod
711
from ocp_resources.secret import Secret
@@ -10,6 +14,9 @@
1014
from ocp_resources.persistent_volume_claim import PersistentVolumeClaim
1115
from ocp_resources.data_science_cluster import DataScienceCluster
1216
from ocp_resources.deployment import Deployment
17+
from ocp_resources.service_account import ServiceAccount
18+
from ocp_resources.role import Role
19+
from ocp_resources.role_binding import RoleBinding
1320

1421
from ocp_resources.model_registry import ModelRegistry
1522
import schemathesis.schemas
@@ -23,6 +30,7 @@
2330
from simple_logger.logger import get_logger
2431
from kubernetes.dynamic import DynamicClient
2532
from pytest_testconfig import config as py_config
33+
from pyhelper_utils.shell import run_command
2634
from model_registry.types import RegisteredModel
2735
from tests.model_registry.constants import (
2836
MR_OPERATOR_NAME,
@@ -44,6 +52,7 @@
4452

4553

4654
LOGGER = get_logger(name=__name__)
55+
DEFAULT_TOKEN_DURATION = "10m"
4756

4857

4958
@pytest.fixture(scope="class")
@@ -293,3 +302,194 @@ def model_registry_operator_pod(admin_client: DynamicClient) -> Pod:
293302
if not model_registry_operator_pods:
294303
raise ResourceNotFoundError("Model registry operator pod not found")
295304
return model_registry_operator_pods[0]
305+
306+
307+
# --- Fixture Helper Function ---
308+
309+
310+
def generate_random_name(prefix: str = "test", length: int = 8) -> str:
311+
"""Generates a random string for resource names."""
312+
suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=length))
313+
return f"{prefix}-{suffix}"
314+
315+
316+
# --- Service Account and Namespace Fixtures (Function Scoped for Isolation) ---
317+
318+
319+
@pytest.fixture(scope="function")
320+
def sa_namespace(admin_client: DynamicClient) -> Generator[Namespace, None, None]:
321+
"""
322+
Creates a temporary namespace using a context manager for automatic cleanup.
323+
Function scope ensures a fresh namespace for each test needing it.
324+
"""
325+
ns_name = generate_random_name(prefix="mr-rbac-test-ns")
326+
LOGGER.info(f"Creating temporary namespace: {ns_name}")
327+
# Use context manager for creation and deletion
328+
with Namespace(client=admin_client, name=ns_name) as ns:
329+
try:
330+
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=120)
331+
LOGGER.info(f"Namespace {ns_name} is active.")
332+
yield ns
333+
# Cleanup happens automatically when exiting 'with' block
334+
LOGGER.info(f"Namespace {ns_name} deletion initiated by context manager.")
335+
# Add a final wait within the fixture if immediate confirmation is needed,
336+
# but context manager handles the delete call. Let's rely on the manager.
337+
# Consider adding ns.wait_deleted(timeout=180) here if needed AFTER yield returns.
338+
except Exception:
339+
LOGGER.error(f"Timeout waiting for namespace {ns_name} to become active.")
340+
pytest.fail(f"Namespace {ns_name} failed to become active.")
341+
342+
343+
@pytest.fixture(scope="function")
344+
def service_account(admin_client: DynamicClient, sa_namespace: Namespace) -> Generator[ServiceAccount, None, None]:
345+
"""
346+
Creates a ServiceAccount within the temporary namespace using a context manager.
347+
Function scope ensures it's tied to the lifetime of sa_namespace for that test.
348+
"""
349+
sa_name = generate_random_name(prefix="mr-test-user")
350+
LOGGER.info(f"Creating ServiceAccount: {sa_name} in namespace {sa_namespace.name}")
351+
# Use context manager for creation and deletion
352+
with ServiceAccount(client=admin_client, name=sa_name, namespace=sa_namespace.name) as sa:
353+
try:
354+
sa.wait(timeout=60) # Wait for SA object to exist
355+
LOGGER.info(f"ServiceAccount {sa_name} created.")
356+
yield sa
357+
# Cleanup happens automatically when exiting 'with' block
358+
LOGGER.info(f"ServiceAccount {sa_name} deletion initiated by context manager.")
359+
except Exception:
360+
LOGGER.error(f"Timeout waiting for ServiceAccount {sa_name} to be created.")
361+
pytest.fail(f"ServiceAccount {sa_name} failed to be created.")
362+
363+
364+
@pytest.fixture(scope="function")
365+
def sa_token(service_account: ServiceAccount) -> str: # type: ignore[return]
366+
"""
367+
Retrieves a short-lived token for the ServiceAccount using 'oc create token'.
368+
Function scope because token is temporary and tied to the SA for that test.
369+
"""
370+
sa_name = service_account.name
371+
namespace = service_account.namespace # Get namespace name from SA object
372+
LOGGER.info(f"Retrieving token for ServiceAccount: {sa_name} in namespace {namespace}")
373+
# (Keep the subprocess logic from previous version - it's appropriate here)
374+
try:
375+
cmd = f"oc create token {sa_name} -n {namespace} --duration={DEFAULT_TOKEN_DURATION}"
376+
LOGGER.debug(f"Executing command: {cmd}")
377+
res, out, err = run_command(command=shlex.split(cmd), verify_stderr=False, check=True)
378+
token = out.strip()
379+
if not token:
380+
pytest.fail(f"Retrieved token is empty for SA {sa_name} in {namespace}.")
381+
LOGGER.info(f"Successfully retrieved token for SA {sa_name}")
382+
return token
383+
except subprocess.CalledProcessError as e:
384+
LOGGER.error(f"Failed to create token for SA {sa_name} ns {namespace}: {e.stderr}")
385+
pytest.fail(f"Failed to create token for SA {sa_name}: {e.stderr}")
386+
except subprocess.TimeoutExpired:
387+
LOGGER.error(f"Timeout creating token for SA {sa_name} ns {namespace}")
388+
pytest.fail(f"Timeout creating token for SA {sa_name}")
389+
except Exception as e:
390+
LOGGER.error(
391+
f"An unexpected error occurred during token retrieval for SA {sa_name} ns {namespace}: {e}", exc_info=True
392+
)
393+
pytest.fail(f"Unexpected error getting token for SA {sa_name}: {e}")
394+
395+
396+
# --- RBAC Fixtures (Using Context Managers, Function Scoped) ---
397+
398+
399+
@pytest.fixture(scope="function")
400+
def mr_access_role(
401+
admin_client: DynamicClient,
402+
model_registry_namespace: str, # Existing fixture from main conftest
403+
sa_namespace: Namespace, # Used for unique naming
404+
) -> Generator[Role, None, None]:
405+
"""
406+
Creates the MR Access Role using direct constructor parameters and a context manager.
407+
"""
408+
role_name = f"registry-user-{MR_INSTANCE_NAME}-{sa_namespace.name[:8]}"
409+
LOGGER.info(f"Defining Role: {role_name} in namespace {model_registry_namespace}")
410+
411+
# Define rules directly as required by the Role constructor's 'rules' parameter
412+
role_rules: List[Dict[str, Any]] = [
413+
{
414+
"apiGroups": [""], # Core API group
415+
"resources": ["services"], # As per last refinement for REST access
416+
"resourceNames": [MR_INSTANCE_NAME], # Grant access only to the specific MR service object
417+
"verbs": ["get"],
418+
}
419+
]
420+
421+
# Define labels, to be passed via **kwargs
422+
role_labels = {
423+
"app.kubernetes.io/component": "model-registry-test-rbac",
424+
"test.opendatahub.io/namespace": sa_namespace.name,
425+
}
426+
427+
LOGGER.info(f"Attempting to create Role: {role_name} with rules and labels.")
428+
# Use context manager for creation and deletion
429+
# Pass rules and labels directly
430+
with Role(
431+
client=admin_client,
432+
name=role_name,
433+
namespace=model_registry_namespace,
434+
rules=role_rules,
435+
label=role_labels, # Pass labels via kwargs
436+
) as role:
437+
try:
438+
role.wait(timeout=60) # Wait for role object to exist
439+
LOGGER.info(f"Role {role.name} created successfully.")
440+
yield role
441+
LOGGER.info(f"Role {role.name} deletion initiated by context manager.")
442+
except Exception as e: # Catch other potential errors during Role instantiation or wait
443+
LOGGER.error(f"Error during Role {role_name} creation or wait: {e}", exc_info=True)
444+
pytest.fail(f"Failed during Role {role_name} creation: {e}")
445+
446+
447+
@pytest.fixture(scope="function")
448+
def mr_access_role_binding(
449+
admin_client: DynamicClient,
450+
model_registry_namespace: str, # Existing fixture from main conftest
451+
mr_access_role: Role, # Depend on the role fixture to get its name
452+
sa_namespace: Namespace, # The namespace containing the test SA
453+
) -> Generator[RoleBinding, None, None]:
454+
"""
455+
Creates the MR Access RoleBinding using direct constructor parameters and a context manager.
456+
"""
457+
binding_name = f"{mr_access_role.name}-binding" # Simplify name slightly, role name is already unique
458+
role_name_ref = mr_access_role.name # Get the actual name from the created Role object
459+
460+
LOGGER.info(
461+
f"Defining RoleBinding: {binding_name} linking Group 'system:serviceaccounts:{sa_namespace.name}' "
462+
f"to Role '{role_name_ref}' in namespace {model_registry_namespace}"
463+
)
464+
465+
# Define labels, to be passed via **kwargs
466+
binding_labels = {
467+
"app.kubernetes.io/component": "model-registry-test-rbac",
468+
"test.opendatahub.io/namespace": sa_namespace.name,
469+
}
470+
471+
LOGGER.info(f"Attempting to create RoleBinding: {binding_name} with labels.")
472+
# Use context manager for creation and deletion
473+
# Pass subject and role_ref details directly to constructor
474+
with RoleBinding(
475+
client=admin_client,
476+
name=binding_name,
477+
namespace=model_registry_namespace,
478+
# Subject parameters
479+
subjects_kind="Group",
480+
subjects_name=f"system:serviceaccounts:{sa_namespace.name}",
481+
subjects_api_group="rbac.authorization.k8s.io", # This is the default apiGroup for Group kind
482+
# Role reference parameters
483+
role_ref_kind="Role",
484+
role_ref_name=role_name_ref,
485+
# role_ref_api_group="rbac.authorization.k8s.io", # This is automatically set by the class
486+
label=binding_labels, # Pass labels via kwargs
487+
) as binding:
488+
try:
489+
binding.wait(timeout=60) # Wait for binding object to exist
490+
LOGGER.info(f"RoleBinding {binding.name} created successfully.")
491+
yield binding
492+
LOGGER.info(f"RoleBinding {binding.name} deletion initiated by context manager.")
493+
except Exception as e: # Catch other potential errors
494+
LOGGER.error(f"Error during RoleBinding {binding_name} creation or wait: {e}", exc_info=True)
495+
pytest.fail(f"Failed during RoleBinding {binding_name} creation: {e}")
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
# AI Disclaimer: Google Gemini 2.5 pro has been used to generate a majority of this code, with human review and editing.
2+
import pytest
3+
from typing import Self, Dict, Any
4+
from simple_logger.logger import get_logger
5+
6+
# Framework / Client specific imports
7+
from model_registry import ModelRegistry as ModelRegistryClient
8+
from mr_openapi.exceptions import ForbiddenException
9+
10+
from utilities.constants import DscComponents, Protocols # Need Protocols.HTTPS
11+
from tests.model_registry.constants import MR_NAMESPACE
12+
13+
14+
LOGGER = get_logger(name=__name__)
15+
16+
17+
# --- Helper to build client args ---
18+
# Based on the 'model_registry_client' fixture
19+
def build_mr_client_args(rest_endpoint: str, token: str, author: str) -> Dict[str, Any]:
20+
"""Builds arguments for ModelRegistryClient based on REST endpoint and token."""
21+
try:
22+
server, port = rest_endpoint.split(":")
23+
24+
# Conftest example uses is_secure=False - this implies TLS verification might be off
25+
# Adjust if your environment requires strict TLS verification
26+
return {
27+
"server_address": f"{Protocols.HTTPS}://{server}",
28+
"port": port,
29+
"user_token": token,
30+
"is_secure": False, # Match conftest example (disable TLS verification)
31+
"author": author,
32+
}
33+
except Exception as e:
34+
LOGGER.error(f"Error parsing REST endpoint '{rest_endpoint}': {e}")
35+
raise ValueError(f"Could not parse REST endpoint: {rest_endpoint}") from e
36+
37+
38+
# --- Test Class ---
39+
40+
41+
# Parametrize to ensure Model Registry component is enabled via DSC fixture
42+
@pytest.mark.parametrize(
43+
"updated_dsc_component_state_scope_class",
44+
[
45+
pytest.param(
46+
{
47+
"component_patch": {
48+
DscComponents.MODELREGISTRY: {
49+
"managementState": DscComponents.ManagementState.MANAGED,
50+
"registriesNamespace": MR_NAMESPACE,
51+
},
52+
}
53+
},
54+
id="enable_modelregistry_default_ns",
55+
)
56+
],
57+
indirect=True,
58+
scope="class",
59+
)
60+
@pytest.mark.usefixtures("updated_dsc_component_state_scope_class")
61+
class TestModelRegistryRBAC:
62+
"""
63+
Tests RBAC for Model Registry REST endpoint using ServiceAccount tokens.
64+
"""
65+
66+
@pytest.mark.smoke
67+
@pytest.mark.usefixtures("sa_namespace", "service_account") # Need SA and NS for token
68+
def test_service_account_access_denied(
69+
self: Self,
70+
sa_token: str, # Get token for the test SA
71+
model_registry_instance_rest_endpoint: str, # REST endpoint fixture
72+
):
73+
"""
74+
Verifies SA access is DENIED (403 Forbidden) by default via REST.
75+
Does NOT use mr_access_role or mr_access_role_binding fixtures.
76+
"""
77+
LOGGER.info("--- Starting RBAC Test: Access Denied ---")
78+
LOGGER.info(f"Targeting Model Registry REST endpoint: {model_registry_instance_rest_endpoint}")
79+
LOGGER.info("Expecting initial access DENIAL (403 Forbidden)")
80+
81+
try:
82+
client_args = build_mr_client_args(
83+
rest_endpoint=model_registry_instance_rest_endpoint, token=sa_token, author="rbac-test-denied"
84+
)
85+
LOGGER.debug(f"Attempting client connection with args: {client_args}")
86+
87+
# Expect an exception related to HTTP 403
88+
# Adjust exception type based on ModelRegistryClient's behavior
89+
with pytest.raises(ForbiddenException) as exc_info: # Or client's custom error e.g. ApiClientError
90+
_ = ModelRegistryClient(**client_args)
91+
92+
# Verify the status code from the caught exception
93+
http_error = exc_info.value
94+
assert http_error.body is not None, "HTTPError should have a response object"
95+
LOGGER.info(f"Received expected HTTP error: Status Code {http_error.status}")
96+
assert http_error.status == 403, f"Expected HTTP 403 Forbidden, but got {http_error.status}"
97+
LOGGER.info("Successfully received expected HTTP 403 status code.")
98+
99+
except Exception as e:
100+
LOGGER.error(f"Received unexpected error during 'access denied' check: {e}", exc_info=True)
101+
pytest.fail(f"'Access denied' check failed unexpectedly: {e}")
102+
103+
@pytest.mark.smoke
104+
# Use fixtures for SA/NS/Token AND the RBAC Role/Binding
105+
@pytest.mark.usefixtures("sa_namespace", "service_account", "mr_access_role", "mr_access_role_binding")
106+
def test_service_account_access_granted(
107+
self: Self,
108+
sa_token: str, # Get token for the test SA
109+
model_registry_instance_rest_endpoint: str, # REST endpoint fixture
110+
# mr_access_role and mr_access_role_binding are activated by @usefixtures
111+
):
112+
"""
113+
Verifies SA access is GRANTED via REST after applying Role and RoleBinding fixtures.
114+
"""
115+
LOGGER.info("--- Starting RBAC Test: Access Granted ---")
116+
LOGGER.info(f"Targeting Model Registry REST endpoint: {model_registry_instance_rest_endpoint}")
117+
LOGGER.info("Applied RBAC Role/Binding via fixtures. Expecting access GRANT.")
118+
119+
try:
120+
client_args = build_mr_client_args(
121+
rest_endpoint=model_registry_instance_rest_endpoint, token=sa_token, author="rbac-test-granted"
122+
)
123+
LOGGER.debug(f"Attempting client connection with args: {client_args}")
124+
125+
# Instantiate the client - this should now succeed or fail for non-auth reasons
126+
mr_client_success = ModelRegistryClient(**client_args)
127+
assert mr_client_success is not None, "Client initialization failed after granting permissions"
128+
LOGGER.info("Client instantiated successfully after granting permissions.")
129+
130+
except ForbiddenException as e:
131+
# If we get an HTTP error here, it's unexpected, especially 403
132+
LOGGER.error(
133+
f"Received unexpected HTTP error after granting access: {e.status if e.body else 'No Response'} - {e}",
134+
exc_info=True,
135+
)
136+
if e.body is not None and e.status == 403:
137+
pytest.fail(f"Still received HTTP 403 Forbidden after applying Role/RoleBinding: {e}")
138+
else:
139+
pytest.fail(f"Client interaction failed with HTTP error after granting permissions: {e}")
140+
141+
except Exception as e:
142+
LOGGER.error(f"Received unexpected general error after granting access: {e}", exc_info=True)
143+
pytest.fail(f"Client interaction failed unexpectedly after granting permissions: {e}")
144+
145+
LOGGER.info("--- RBAC Test Completed Successfully ---")

0 commit comments

Comments
 (0)