Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion tests/model_registry/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from ocp_resources.deployment import Deployment

from ocp_resources.model_registry import ModelRegistry
import schemathesis.schemas
from schemathesis.specs.openapi.schemas import BaseOpenAPISchema
from schemathesis.generation.stateful.state_machine import APIStateMachine
from schemathesis.core.transport import Response
Expand Down
172 changes: 172 additions & 0 deletions tests/model_registry/rbac/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import pytest
import shlex
import subprocess
import os
from typing import Generator, List, Dict, Any
from ocp_resources.namespace import Namespace
from ocp_resources.service_account import ServiceAccount
from ocp_resources.role_binding import RoleBinding
from ocp_resources.role import Role
from kubernetes.dynamic import DynamicClient
from pyhelper_utils.shell import run_command
from tests.model_registry.utils import generate_random_name, generate_namespace_name
from simple_logger.logger import get_logger
from tests.model_registry.constants import MR_INSTANCE_NAME


LOGGER = get_logger(name=__name__)
DEFAULT_TOKEN_DURATION = "10m"


@pytest.fixture(scope="function")
def sa_namespace(request: pytest.FixtureRequest, admin_client: DynamicClient) -> Generator[Namespace, None, None]:
"""
Creates a temporary namespace using a context manager for automatic cleanup.
Function scope ensures a fresh namespace for each test needing it.
"""
test_file = os.path.relpath(request.fspath.strpath, start=os.path.dirname(__file__))
ns_name = generate_namespace_name(file_path=test_file)
LOGGER.info(f"Creating temporary namespace: {ns_name}")
with Namespace(client=admin_client, name=ns_name) as ns:
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=120)
yield ns


@pytest.fixture(scope="function")
def service_account(admin_client: DynamicClient, sa_namespace: Namespace) -> Generator[ServiceAccount, None, None]:
"""
Creates a ServiceAccount within the temporary namespace using a context manager.
Function scope ensures it's tied to the lifetime of sa_namespace for that test.
"""
sa_name = generate_random_name(prefix="mr-test-user")
LOGGER.info(f"Creating ServiceAccount: {sa_name} in namespace {sa_namespace.name}")
with ServiceAccount(client=admin_client, name=sa_name, namespace=sa_namespace.name, wait_for_resource=True) as sa:
yield sa


@pytest.fixture(scope="function")
def sa_token(service_account: ServiceAccount) -> str:
"""
Retrieves a short-lived token for the ServiceAccount using 'oc create token'.
Function scope because token is temporary and tied to the SA for that test.
"""
sa_name = service_account.name
namespace = service_account.namespace
LOGGER.info(f"Retrieving token for ServiceAccount: {sa_name} in namespace {namespace}")
try:
cmd = f"oc create token {sa_name} -n {namespace} --duration={DEFAULT_TOKEN_DURATION}"
LOGGER.debug(f"Executing command: {cmd}")
res, out, err = run_command(command=shlex.split(cmd), verify_stderr=False, check=True, timeout=30)
token = out.strip()
if not token:
raise ValueError("Retrieved token is empty after successful command execution.")

LOGGER.info(f"Successfully retrieved token for SA '{sa_name}'")
return token

except Exception as e: # Catch all exceptions from the try block
error_type = type(e).__name__
log_message = (
f"Failed during token retrieval for SA '{sa_name}' in namespace '{namespace}'. "
f"Error Type: {error_type}, Message: {str(e)}"
)
if isinstance(e, subprocess.CalledProcessError):
# Add specific details for CalledProcessError
# run_command already logs the error if log_errors=True and returncode !=0,
# but we can add context here.
stderr_from_exception = e.stderr.strip() if e.stderr else "N/A"
log_message += f". Exit Code: {e.returncode}. Stderr from exception: {stderr_from_exception}"
elif isinstance(e, subprocess.TimeoutExpired):
timeout_value = getattr(e, "timeout", "N/A")
log_message += f". Command timed out after {timeout_value} seconds."
elif isinstance(e, FileNotFoundError):
# This occurs if 'oc' is not found.
# e.filename usually holds the name of the file that was not found.
command_not_found = e.filename if hasattr(e, "filename") and e.filename else shlex.split(cmd)[0]
log_message += f". Command '{command_not_found}' not found. Is it installed and in PATH?"

LOGGER.error(log_message, exc_info=True) # exc_info=True adds stack trace to the log
raise


# --- RBAC Fixtures ---


@pytest.fixture(scope="function")
def mr_access_role(
admin_client: DynamicClient,
model_registry_namespace: str,
sa_namespace: Namespace,
) -> Generator[Role, None, None]:
"""
Creates the MR Access Role using direct constructor parameters and a context manager.
"""
role_name = f"registry-user-{MR_INSTANCE_NAME}-{sa_namespace.name[:8]}"
LOGGER.info(f"Defining Role: {role_name} in namespace {model_registry_namespace}")

role_rules: List[Dict[str, Any]] = [
{
"apiGroups": [""],
"resources": ["services"],
"resourceNames": [MR_INSTANCE_NAME], # Grant access only to the specific MR service object
"verbs": ["get"],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we pass the verbs as input param so to have more flexibility?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in our case we only really care about get on the MR Instance, so I don't think it's really needed. If we were to generalize this to create any Role then I'd agree with you

}
]
role_labels = {
"app.kubernetes.io/component": "model-registry-test-rbac",
"test.opendatahub.io/namespace": sa_namespace.name,
}

LOGGER.info(f"Attempting to create Role: {role_name} with rules and labels.")
with Role(
client=admin_client,
name=role_name,
namespace=model_registry_namespace,
rules=role_rules,
label=role_labels,
wait_for_resource=True,
) as role:
LOGGER.info(f"Role {role.name} created successfully.")
yield role
LOGGER.info(f"Role {role.name} deletion initiated by context manager.")


@pytest.fixture(scope="function")
def mr_access_role_binding(
admin_client: DynamicClient,
model_registry_namespace: str,
mr_access_role: Role,
sa_namespace: Namespace,
) -> Generator[RoleBinding, None, None]:
"""
Creates the MR Access RoleBinding using direct constructor parameters and a context manager.
"""
binding_name = f"{mr_access_role.name}-binding"

LOGGER.info(
f"Defining RoleBinding: {binding_name} linking Group 'system:serviceaccounts:{sa_namespace.name}' "
f"to Role '{mr_access_role.name}' in namespace {model_registry_namespace}"
)
binding_labels = {
"app.kubernetes.io/component": "model-registry-test-rbac",
"test.opendatahub.io/namespace": sa_namespace.name,
}

LOGGER.info(f"Attempting to create RoleBinding: {binding_name} with labels.")
with RoleBinding(
client=admin_client,
name=binding_name,
namespace=model_registry_namespace,
# Subject parameters
subjects_kind="Group",
subjects_name=f"system:serviceaccounts:{sa_namespace.name}",
subjects_api_group="rbac.authorization.k8s.io", # This is the default apiGroup for Group kind
# Role reference parameters
role_ref_kind="Role",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are passing already mr_access_role we could use it mr_access_role.kind

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good idea. if you rebase in your PR can you add it?

role_ref_name=mr_access_role.name,
label=binding_labels,
wait_for_resource=True,
) as binding:
LOGGER.info(f"RoleBinding {binding.name} created successfully.")
yield binding
LOGGER.info(f"RoleBinding {binding.name} deletion initiated by context manager.")
110 changes: 110 additions & 0 deletions tests/model_registry/rbac/test_mr_rbac_sa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# AI Disclaimer: Google Gemini 2.5 pro has been used to generate a majority of this code, with human review and editing.
import pytest
from typing import Self, Dict, Any
from simple_logger.logger import get_logger
from model_registry import ModelRegistry as ModelRegistryClient
from mr_openapi.exceptions import ForbiddenException
from utilities.constants import DscComponents, Protocols
from tests.model_registry.constants import MR_NAMESPACE


LOGGER = get_logger(name=__name__)


def build_mr_client_args(rest_endpoint: str, token: str, author: str) -> Dict[str, Any]:
"""Builds arguments for ModelRegistryClient based on REST endpoint and token."""
server, port = rest_endpoint.split(":")
return {
"server_address": f"{Protocols.HTTPS}://{server}",
"port": port,
"user_token": token,
"is_secure": False,
"author": author,
}


@pytest.mark.parametrize(
"updated_dsc_component_state_scope_class",
[
pytest.param(
{
"component_patch": {
DscComponents.MODELREGISTRY: {
"managementState": DscComponents.ManagementState.MANAGED,
"registriesNamespace": MR_NAMESPACE,
},
}
},
id="enable_modelregistry_default_ns",
)
],
indirect=True,
scope="class",
)
@pytest.mark.usefixtures("updated_dsc_component_state_scope_class")
class TestModelRegistryRBAC:
"""
Tests RBAC for Model Registry REST endpoint using ServiceAccount tokens.
"""

@pytest.mark.sanity
@pytest.mark.usefixtures("sa_namespace", "service_account")
def test_service_account_access_denied(
self: Self,
sa_token: str,
model_registry_instance_rest_endpoint: str,
):
"""
Verifies SA access is DENIED (403 Forbidden) by default via REST.
Does NOT use mr_access_role or mr_access_role_binding fixtures.
"""
LOGGER.info("--- Starting RBAC Test: Access Denied ---")
LOGGER.info(f"Targeting Model Registry REST endpoint: {model_registry_instance_rest_endpoint}")
LOGGER.info("Expecting initial access DENIAL (403 Forbidden)")

client_args = build_mr_client_args(
rest_endpoint=model_registry_instance_rest_endpoint, token=sa_token, author="rbac-test-denied"
)
LOGGER.debug(f"Attempting client connection with args: {client_args}")

# Expect an exception related to HTTP 403
with pytest.raises(ForbiddenException) as exc_info:
_ = ModelRegistryClient(**client_args)

# Verify the status code from the caught exception
http_error = exc_info.value
assert http_error.body is not None, "HTTPError should have a response object"
LOGGER.info(f"Received expected HTTP error: Status Code {http_error.status}")
assert http_error.status == 403, f"Expected HTTP 403 Forbidden, but got {http_error.status}"
LOGGER.info("Successfully received expected HTTP 403 status code.")

@pytest.mark.sanity
# Use fixtures for SA/NS/Token AND the RBAC Role/Binding
@pytest.mark.usefixtures("sa_namespace", "service_account", "mr_access_role", "mr_access_role_binding")
def test_service_account_access_granted(
self: Self,
sa_token: str,
model_registry_instance_rest_endpoint: str,
):
"""
Verifies SA access is GRANTED via REST after applying Role and RoleBinding fixtures.
"""
LOGGER.info("--- Starting RBAC Test: Access Granted ---")
LOGGER.info(f"Targeting Model Registry REST endpoint: {model_registry_instance_rest_endpoint}")
LOGGER.info("Applied RBAC Role/Binding via fixtures. Expecting access GRANT.")

try:
client_args = build_mr_client_args(
rest_endpoint=model_registry_instance_rest_endpoint, token=sa_token, author="rbac-test-granted"
)
LOGGER.debug(f"Attempting client connection with args: {client_args}")
mr_client_success = ModelRegistryClient(**client_args)
assert mr_client_success is not None, "Client initialization failed after granting permissions"
LOGGER.info("Client instantiated successfully after granting permissions.")

except Exception as e:
# If we get an exception here, it's unexpected, especially 403
LOGGER.error(f"Received unexpected general error after granting access: {e}", exc_info=True)
raise

LOGGER.info("--- RBAC Test Completed Successfully ---")
35 changes: 35 additions & 0 deletions tests/model_registry/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
from typing import Any

from kubernetes.dynamic import DynamicClient
Expand Down Expand Up @@ -229,3 +230,37 @@ def wait_for_pods_running(
)
raise
return None


def generate_random_name(prefix: str, length: int = 8) -> str:
"""
Generates a name with a required prefix and a random suffix derived from a UUID.

The length of the random suffix can be controlled, defaulting to 8 characters.
The suffix is taken from the beginning of a V4 UUID's hex representation.

Args:
prefix (str): The required prefix for the generated name.
ength (int, optional): The desired length for the UUID-derived suffix.
Defaults to 8. Must be between 1 and 32.

Returns:
str: A string in the format "prefix-uuid_suffix".

Raises:
ValueError: If prefix is empty, or if length is not between 1 and 32.
"""
if not prefix:
raise ValueError("Prefix cannot be empty or None.")
if not isinstance(length, int) or not (1 <= length <= 32):
raise ValueError("suffix_length must be an integer between 1 and 32.")
# Generate a new random UUID (version 4)
random_uuid = uuid.uuid4()
# Use the first 'length' characters of the hexadecimal representation of the UUID as the suffix.
# random_uuid.hex is 32 characters long.
suffix = random_uuid.hex[:length]
return f"{prefix}-{suffix}"


def generate_namespace_name(file_path: str) -> str:
return (file_path.removesuffix(".py").replace("/", "-").replace("_", "-"))[-63:].split("-", 1)[-1]