Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/model_registry/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ModelRegistryEndpoints:
"authProvider": "redhat-ods-applications-auth-provider",
"gateway": {"grpc": {"tls": {}}, "rest": {"tls": {}}},
}
DB_RESOURCES_NAME: str = "model-registry-db"
DB_RESOURCES_NAME: str = "db-model-registry"
MR_DB_IMAGE_DIGEST: str = (
"public.ecr.aws/docker/library/mysql@sha256:9de9d54fecee6253130e65154b930978b1fcc336bcc86dfd06e89b72a2588ebe"
)
Empty file.
130 changes: 130 additions & 0 deletions tests/model_registry/scc/test_model_registry_scc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import pytest
from typing import Self
from simple_logger.logger import get_logger
from _pytest.fixtures import FixtureRequest

from ocp_resources.namespace import Namespace
from ocp_resources.pod import Pod
from ocp_resources.deployment import Deployment
from ocp_resources.resource import NamespacedResource
from tests.model_registry.scc.utils import (
get_uid_from_namespace,
validate_pod_security_context,
KEYS_TO_VALIDATE,
validate_containers_pod_security_context,
)
from utilities.constants import DscComponents
from tests.model_registry.constants import MR_NAMESPACE, MODEL_DICT, MR_INSTANCE_NAME

from kubernetes.dynamic import DynamicClient
from ocp_utilities.infra import get_pods_by_name_prefix

LOGGER = get_logger(name=__name__)


@pytest.fixture(scope="class")
def model_registry_scc_namespace(model_registry_namespace: Namespace):
mr_annotations = model_registry_namespace.instance.metadata.annotations
return {
"seLinuxOptions": mr_annotations.get("openshift.io/sa.scc.mcs"),
"uid-range": mr_annotations.get("openshift.io/sa.scc.uid-range"),
}


@pytest.fixture(scope="class")
def model_registry_resource(
request: FixtureRequest, admin_client: DynamicClient, model_registry_namespace: Namespace
) -> NamespacedResource:
if request.param["kind"] == Deployment:
return Deployment(name=MR_INSTANCE_NAME, namespace=model_registry_namespace.name, ensure_exists=True)
elif request.param["kind"] == Pod:
pods = get_pods_by_name_prefix(
client=admin_client, pod_prefix=MR_INSTANCE_NAME, namespace=model_registry_namespace.name
)
if len(pods) != 1:
pytest.fail(
"Expected one model registry pod. Found: {[{pod.name: pod.status} for pod in pods] if pods else None}"
)
return pods[0]
else:
raise AssertionError(f"Invalid resource: {request.param['kind']}. Valid options: Deployment and Pod")


@pytest.mark.parametrize(
"model_registry_namespace, updated_dsc_component_state_scope_class, registered_model",
[
pytest.param(
{"namespace_name": MR_NAMESPACE},
{
"component_patch": {
DscComponents.MODELREGISTRY: {
"managementState": DscComponents.ManagementState.MANAGED,
"registriesNamespace": MR_NAMESPACE,
},
},
},
MODEL_DICT,
),
],
indirect=True,
)
@pytest.mark.usefixtures("model_registry_namespace", "updated_dsc_component_state_scope_class", "registered_model")
class TestModelRegistrySecurityContextValidation:
@pytest.mark.parametrize(
"model_registry_resource",
[
pytest.param({"kind": Deployment}),
],
indirect=["model_registry_resource"],
)
def test_model_registry_deployment_security_context_validation(
self: Self,
model_registry_resource: NamespacedResource,
model_registry_namespace: Namespace,
):
"""
Validate that model registry deployment does not set runAsUser/runAsGroup
"""
error = []
for container in model_registry_resource.instance.spec.template.spec.containers:
if not all([True for key in KEYS_TO_VALIDATE if not container.get(key)]):
error.append({container.name: container.securityContext})

if error:
pytest.fail(
f"{model_registry_resource.name} {model_registry_resource.kind} containers expected to not "
f"set {KEYS_TO_VALIDATE}, actual: {error}"
)

@pytest.mark.parametrize(
"model_registry_resource",
[
pytest.param({"kind": Pod}),
],
indirect=["model_registry_resource"],
)
def test_model_registry_pod_security_context_validation(
self: Self,
model_registry_resource: NamespacedResource,
model_registry_scc_namespace: dict[str, str],
):
"""
Validate that model registry pod gets runAsUser/runAsGroup from openshift and the values matches namespace
annotations
"""
ns_uid = get_uid_from_namespace(namespace_scc=model_registry_scc_namespace)
pod_spec = model_registry_resource.instance.spec
errors = validate_pod_security_context(
pod_security_context=pod_spec.securityContext,
namespace_scc=model_registry_scc_namespace,
model_registry_pod=model_registry_resource,
ns_uid=ns_uid,
)
errors.extend(
validate_containers_pod_security_context(model_registry_pod=model_registry_resource, namespace_uid=ns_uid)
)
if errors:
pytest.fail(
f"{model_registry_resource.name} {model_registry_resource.kind} pod security context validation failed"
f" with error: {errors}"
)
61 changes: 61 additions & 0 deletions tests/model_registry/scc/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from typing import Any
from simple_logger.logger import get_logger

from ocp_resources.pod import Pod
from ocp_resources.resource import NamespacedResource

KEYS_TO_VALIDATE = ["runAsGroup", "runAsUser"]

LOGGER = get_logger(name=__name__)


def get_uid_from_namespace(namespace_scc: dict[str, str]) -> str:
return namespace_scc["uid-range"].split("/")[0]


def validate_pod_security_context(
pod_security_context: dict[str, Any],
namespace_scc: dict[str, str],
model_registry_pod: NamespacedResource,
ns_uid: str,
) -> list[str]:
"""
Check model registry pod, ensure the security context values are being set by openshift
"""
errors = []
pod_selinux_option = pod_security_context.get("seLinuxOptions", {}).get("level")
if pod_selinux_option != namespace_scc["seLinuxOptions"]:
errors.append(
f"selinux option from pod {model_registry_pod.name} {pod_selinux_option},"
f" namespace: {namespace_scc['seLinuxOptions']}"
)
if pod_security_context.get("fsGroup") != int(ns_uid):
errors.append(
f"UID-range from pod {model_registry_pod.name} {pod_security_context.get('fsGroup')}, namespace: {ns_uid}"
)
return errors


def validate_containers_pod_security_context(model_registry_pod: Pod, namespace_uid: str) -> list[str]:
"""
Check all the containers of model registry pod, ensure the security context values are being set by openshift
"""
errors = []
containers = model_registry_pod.instance.spec.containers
for container in containers:
expected_value = {
"runAsUser": int(namespace_uid) + 1 if "sidecar" in container.args else int(namespace_uid),
"runAsGroup": int(namespace_uid) + 1 if "sidecar" in container.args else None,
}

for key in KEYS_TO_VALIDATE:
if container.securityContext.get(key) == expected_value[key]:
LOGGER.info(
f"For container: {container.name}, {key} validation: {expected_value[key]} completed successfully"
)
else:
errors.append(
f"For {container.name}, expected key {key} value: {expected_value[key]},"
f" actual: {container.securityContext.get(key)}"
)
return errors