forked from opendatahub-io/opendatahub-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
266 lines (238 loc) · 10.1 KB
/
utils.py
File metadata and controls
266 lines (238 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
import uuid
from typing import Any
from kubernetes.dynamic import DynamicClient
from ocp_resources.namespace import Namespace
from ocp_resources.pod import Pod
from ocp_resources.service import Service
from ocp_resources.model_registry import ModelRegistry
from kubernetes.dynamic.exceptions import ResourceNotFoundError
from simple_logger.logger import get_logger
from timeout_sampler import TimeoutExpiredError, TimeoutSampler
from kubernetes.dynamic.exceptions import NotFoundError
from tests.model_registry.constants import MR_DB_IMAGE_DIGEST
from utilities.exceptions import ProtocolNotSupportedError, TooManyServicesError
from utilities.constants import Protocols, Annotations
ADDRESS_ANNOTATION_PREFIX: str = "routing.opendatahub.io/external-address-"
LOGGER = get_logger(name=__name__)
def get_mr_service_by_label(client: DynamicClient, ns: Namespace, mr_instance: ModelRegistry) -> Service:
"""
Args:
client (DynamicClient): OCP Client to use.
ns (Namespace): Namespace object where to find the Service
mr_instance (ModelRegistry): Model Registry instance
Returns:
Service: The matching Service
Raises:
ResourceNotFoundError: if no service is found.
"""
if svc := [
svcs
for svcs in Service.get(
dyn_client=client,
namespace=ns.name,
label_selector=f"app={mr_instance.name},component=model-registry",
)
]:
if len(svc) == 1:
return svc[0]
raise TooManyServicesError(svc)
raise ResourceNotFoundError(f"{mr_instance.name} has no Service")
def get_endpoint_from_mr_service(svc: Service, protocol: str) -> str:
if protocol in (Protocols.REST, Protocols.GRPC):
return svc.instance.metadata.annotations[f"{ADDRESS_ANNOTATION_PREFIX}{protocol}"]
else:
raise ProtocolNotSupportedError(protocol)
def get_model_registry_deployment_template_dict(secret_name: str, resource_name: str) -> dict[str, Any]:
return {
"metadata": {
"labels": {
"name": resource_name,
"sidecar.istio.io/inject": "false",
}
},
"spec": {
"containers": [
{
"env": [
{
"name": "MYSQL_USER",
"valueFrom": {
"secretKeyRef": {
"key": "database-user",
"name": secret_name,
}
},
},
{
"name": "MYSQL_PASSWORD",
"valueFrom": {
"secretKeyRef": {
"key": "database-password",
"name": secret_name,
}
},
},
{
"name": "MYSQL_ROOT_PASSWORD",
"valueFrom": {
"secretKeyRef": {
"key": "database-password",
"name": secret_name,
}
},
},
{
"name": "MYSQL_DATABASE",
"valueFrom": {
"secretKeyRef": {
"key": "database-name",
"name": secret_name,
}
},
},
],
"args": [
"--datadir",
"/var/lib/mysql/datadir",
"--default-authentication-plugin=mysql_native_password",
],
"image": MR_DB_IMAGE_DIGEST,
"imagePullPolicy": "IfNotPresent",
"livenessProbe": {
"exec": {
"command": [
"/bin/bash",
"-c",
"mysqladmin -u${MYSQL_USER} -p${MYSQL_ROOT_PASSWORD} ping",
]
},
"initialDelaySeconds": 15,
"periodSeconds": 10,
"timeoutSeconds": 5,
},
"name": "mysql",
"ports": [{"containerPort": 3306, "protocol": "TCP"}],
"readinessProbe": {
"exec": {
"command": [
"/bin/bash",
"-c",
'mysql -D ${MYSQL_DATABASE} -u${MYSQL_USER} -p${MYSQL_ROOT_PASSWORD} -e "SELECT 1"',
]
},
"initialDelaySeconds": 10,
"timeoutSeconds": 5,
},
"securityContext": {"capabilities": {}, "privileged": False},
"terminationMessagePath": "/dev/termination-log",
"volumeMounts": [
{
"mountPath": "/var/lib/mysql",
"name": f"{resource_name}-data",
}
],
}
],
"dnsPolicy": "ClusterFirst",
"restartPolicy": "Always",
"volumes": [
{
"name": f"{resource_name}-data",
"persistentVolumeClaim": {"claimName": resource_name},
}
],
},
}
def get_model_registry_db_label_dict(db_resource_name: str) -> dict[str, str]:
return {
Annotations.KubernetesIo.NAME: db_resource_name,
Annotations.KubernetesIo.INSTANCE: db_resource_name,
Annotations.KubernetesIo.PART_OF: db_resource_name,
}
def get_pod_container_error_status(pod: Pod) -> str | None:
"""
Check container error status for a given pod and if any containers is in waiting state, return that information
"""
pod_instance_status = pod.instance.status
for container_status in pod_instance_status.get("containerStatuses", []):
if waiting_container := container_status.get("state", {}).get("waiting"):
return waiting_container["reason"] if waiting_container.get("reason") else waiting_container
return ""
def get_not_running_pods(pods: list[Pod]) -> list[dict[str, Any]]:
# Gets all the non-running pods from a given namespace.
# Note: We need to keep track of pods marked for deletion as not running. This would ensure any
# pod that was spun up in place of pod marked for deletion, are not ignored
pods_not_running = []
try:
for pod in pods:
pod_instance = pod.instance
if container_status_error := get_pod_container_error_status(pod=pod):
pods_not_running.append({pod.name: container_status_error})
if pod_instance.metadata.get("deletionTimestamp") or pod_instance.status.phase not in (
pod.Status.RUNNING,
pod.Status.SUCCEEDED,
):
pods_not_running.append({pod.name: pod.status})
except (ResourceNotFoundError, NotFoundError) as exc:
LOGGER.warning("Ignoring pod that disappeared during cluster sanity check: %s", exc)
return pods_not_running
def wait_for_pods_running(
admin_client: DynamicClient,
namespace_name: str,
number_of_consecutive_checks: int = 1,
) -> bool | None:
"""
Waits for all pods in a given namespace to reach Running/Completed state. To avoid catching all pods in running
state too soon, use number_of_consecutive_checks with appropriate values.
"""
samples = TimeoutSampler(
wait_timeout=180,
sleep=5,
func=get_not_running_pods,
pods=list(Pod.get(dyn_client=admin_client, namespace=namespace_name)),
exceptions_dict={NotFoundError: [], ResourceNotFoundError: []},
)
sample = None
try:
current_check = 0
for sample in samples:
if not sample:
current_check += 1
if current_check >= number_of_consecutive_checks:
return True
else:
current_check = 0
except TimeoutExpiredError:
if sample:
LOGGER.error(
f"timeout waiting for all pods in namespace {namespace_name} to reach "
f"running state, following pods are in not running state: {sample}"
)
raise
return None
def generate_random_name(prefix: str, length: int = 8) -> str:
"""
Generates a name with a required prefix and a random suffix derived from a UUID.
The length of the random suffix can be controlled, defaulting to 8 characters.
The suffix is taken from the beginning of a V4 UUID's hex representation.
Args:
prefix (str): The required prefix for the generated name.
ength (int, optional): The desired length for the UUID-derived suffix.
Defaults to 8. Must be between 1 and 32.
Returns:
str: A string in the format "prefix-uuid_suffix".
Raises:
ValueError: If prefix is empty, or if length is not between 1 and 32.
"""
if not prefix:
raise ValueError("Prefix cannot be empty or None.")
if not isinstance(length, int) or not (1 <= length <= 32):
raise ValueError("suffix_length must be an integer between 1 and 32.")
# Generate a new random UUID (version 4)
random_uuid = uuid.uuid4()
# Use the first 'length' characters of the hexadecimal representation of the UUID as the suffix.
# random_uuid.hex is 32 characters long.
suffix = random_uuid.hex[:length]
return f"{prefix}-{suffix}"
def generate_namespace_name(file_path: str) -> str:
return (file_path.removesuffix(".py").replace("/", "-").replace("_", "-"))[-63:].split("-", 1)[-1]