forked from opendatahub-io/opendatahub-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
61 lines (51 loc) · 2.28 KB
/
utils.py
File metadata and controls
61 lines (51 loc) · 2.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from typing import Any
from simple_logger.logger import get_logger
from ocp_resources.pod import Pod
from ocp_resources.resource import NamespacedResource
KEYS_TO_VALIDATE = ["runAsGroup", "runAsUser"]
LOGGER = get_logger(name=__name__)
def get_uid_from_namespace(namespace_scc: dict[str, str]) -> str:
return namespace_scc["uid-range"].split("/")[0]
def validate_pod_security_context(
pod_security_context: dict[str, Any],
namespace_scc: dict[str, str],
model_registry_pod: NamespacedResource,
ns_uid: str,
) -> list[str]:
"""
Check model registry pod, ensure the security context values are being set by openshift
"""
errors = []
pod_selinux_option = pod_security_context.get("seLinuxOptions", {}).get("level")
if pod_selinux_option != namespace_scc["seLinuxOptions"]:
errors.append(
f"selinux option from pod {model_registry_pod.name} {pod_selinux_option},"
f" namespace: {namespace_scc['seLinuxOptions']}"
)
if pod_security_context.get("fsGroup") != int(ns_uid):
errors.append(
f"UID-range from pod {model_registry_pod.name} {pod_security_context.get('fsGroup')}, namespace: {ns_uid}"
)
return errors
def validate_containers_pod_security_context(model_registry_pod: Pod, namespace_uid: str) -> list[str]:
"""
Check all the containers of model registry pod, ensure the security context values are being set by openshift
"""
errors = []
containers = model_registry_pod.instance.spec.containers
for container in containers:
expected_value = {
"runAsUser": int(namespace_uid) + 1 if "sidecar" in container.args else int(namespace_uid),
"runAsGroup": int(namespace_uid) + 1 if "sidecar" in container.args else None,
}
for key in KEYS_TO_VALIDATE:
if container.securityContext.get(key) == expected_value[key]:
LOGGER.info(
f"For container: {container.name}, {key} validation: {expected_value[key]} completed successfully"
)
else:
errors.append(
f"For {container.name}, expected key {key} value: {expected_value[key]},"
f" actual: {container.securityContext.get(key)}"
)
return errors