forked from opendatahub-io/opendatahub-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconftest.py
More file actions
360 lines (313 loc) · 11.2 KB
/
conftest.py
File metadata and controls
360 lines (313 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
from collections.abc import Generator
from typing import Any
import pytest
from _pytest.fixtures import FixtureRequest
from kubernetes.client.rest import ApiException
from kubernetes.dynamic import DynamicClient
from ocp_resources.inference_service import InferenceService
from ocp_resources.namespace import Namespace
from ocp_resources.node import Node
from ocp_resources.persistent_volume_claim import PersistentVolumeClaim
from ocp_resources.pod import Pod
from ocp_resources.resource import ResourceEditor
from ocp_resources.secret import Secret
from ocp_resources.serving_runtime import ServingRuntime
from pytest_testconfig import config as py_config
from simple_logger.logger import get_logger
from timeout_sampler import TimeoutSampler
from tests.model_serving.model_server.kserve.multi_node.constants import WORKER_POD_ROLE
from tests.model_serving.model_server.kserve.multi_node.utils import (
delete_multi_node_pod_by_role,
get_pods_by_isvc_generation,
)
from utilities.constants import KServeDeploymentType, Labels, ModelCarImage, Protocols, Timeout
from utilities.general import download_model_data
from utilities.inference_utils import create_isvc
from utilities.infra import (
get_pods_by_isvc_label,
verify_no_failed_pods,
wait_for_inference_deployment_replicas,
)
from utilities.serving_runtime import ServingRuntimeFromTemplate
LOGGER = get_logger(name=__name__)
@pytest.fixture(scope="session")
def nvidia_gpu_nodes(nodes: list[Node]) -> list[Node]:
return [node for node in nodes if "nvidia.com/gpu.present" in node.labels]
@pytest.fixture(scope="session")
def skip_if_no_gpu_nodes(nvidia_gpu_nodes: list[Node]) -> None:
if len(nvidia_gpu_nodes) < 2:
pytest.skip("Multi-node tests can only run on a Cluster with at least 2 GPU Worker nodes")
@pytest.fixture(scope="class")
def models_bucket_downloaded_model_data(
request: FixtureRequest,
admin_client: DynamicClient,
unprivileged_model_namespace: Namespace,
models_s3_bucket_name: str,
model_pvc: PersistentVolumeClaim,
aws_secret_access_key: str,
aws_access_key_id: str,
models_s3_bucket_endpoint: str,
models_s3_bucket_region: str,
) -> str:
return download_model_data(
client=admin_client,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
model_namespace=unprivileged_model_namespace.name,
model_pvc_name=model_pvc.name,
bucket_name=models_s3_bucket_name,
aws_endpoint_url=models_s3_bucket_endpoint,
aws_default_region=models_s3_bucket_region,
model_path=request.param["model-dir"],
)
@pytest.fixture(scope="class")
def multi_node_serving_runtime(
request: FixtureRequest,
unprivileged_client: DynamicClient,
unprivileged_model_namespace: Namespace,
) -> Generator[ServingRuntime, Any, Any]:
with ServingRuntimeFromTemplate(
client=unprivileged_client,
name="vllm-multinode-runtime", # TODO: rename servingruntime when RHOAIENG-16147 is resolved
namespace=unprivileged_model_namespace.name,
template_name="vllm-multinode-runtime-template",
multi_model=False,
enable_http=True,
) as model_runtime:
yield model_runtime
@pytest.fixture(scope="class")
def multi_node_inference_service(
request: FixtureRequest,
unprivileged_client: DynamicClient,
multi_node_serving_runtime: ServingRuntime,
model_pvc: PersistentVolumeClaim,
models_bucket_downloaded_model_data: str,
) -> Generator[InferenceService, Any, Any]:
with create_isvc(
client=unprivileged_client,
name=request.param["name"],
namespace=multi_node_serving_runtime.namespace,
runtime=multi_node_serving_runtime.name,
storage_uri=f"pvc://{model_pvc.name}/{models_bucket_downloaded_model_data}",
model_format=multi_node_serving_runtime.instance.spec.supportedModelFormats[0].name,
deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT,
autoscaler_mode="none",
multi_node_worker_spec={},
wait_for_predictor_pods=False,
) as isvc:
wait_for_inference_deployment_replicas(
client=unprivileged_client,
isvc=isvc,
expected_num_deployments=2,
runtime_name=multi_node_serving_runtime.name,
)
yield isvc
@pytest.fixture(scope="class")
def multi_node_oci_inference_service(
request: FixtureRequest,
unprivileged_client: DynamicClient,
multi_node_serving_runtime: ServingRuntime,
) -> Generator[InferenceService, Any, Any]:
resources = {
"requests": {
"cpu": "1",
"memory": "4G",
},
"limits": {
"cpu": "2",
"memory": "12G",
},
}
worker_resources = {
"containers": [
{
"name": "worker-container",
"resources": resources,
}
]
}
with create_isvc(
client=unprivileged_client,
name=request.param["name"],
namespace=multi_node_serving_runtime.namespace,
runtime=multi_node_serving_runtime.name,
storage_uri=ModelCarImage.GRANITE_8B_CODE_INSTRUCT,
model_format=multi_node_serving_runtime.instance.spec.supportedModelFormats[0].name,
deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT,
autoscaler_mode="none",
resources=resources,
multi_node_worker_spec=worker_resources,
wait_for_predictor_pods=False,
external_route=True,
timeout=Timeout.TIMEOUT_30MIN,
) as isvc:
wait_for_inference_deployment_replicas(
client=unprivileged_client,
isvc=isvc,
expected_num_deployments=2,
runtime_name=multi_node_serving_runtime.name,
timeout=Timeout.TIMEOUT_15MIN,
)
yield isvc
@pytest.fixture(scope="class")
def multi_node_predictor_pods_scope_class(
unprivileged_client: DynamicClient,
multi_node_inference_service: InferenceService,
) -> list[Pod]:
return get_pods_by_isvc_label(
client=unprivileged_client,
isvc=multi_node_inference_service,
)
@pytest.fixture(scope="function")
def patched_multi_node_isvc_external_route(
multi_node_inference_service: InferenceService,
) -> Generator[InferenceService, Any, Any]:
with ResourceEditor(
patches={
multi_node_inference_service: {
"metadata": {"labels": {Labels.Kserve.NETWORKING_KSERVE_IO: Labels.Kserve.EXPOSED}},
}
}
):
for sample in TimeoutSampler(
wait_timeout=Timeout.TIMEOUT_1MIN,
sleep=1,
func=lambda: multi_node_inference_service.instance.status,
):
if sample and sample.get("url", "").startswith(Protocols.HTTPS):
break
yield multi_node_inference_service
@pytest.fixture(scope="function")
def patched_multi_node_spec(
request: FixtureRequest,
unprivileged_client: DynamicClient,
multi_node_inference_service: InferenceService,
) -> Generator[InferenceService, Any, Any]:
with ResourceEditor(
patches={
multi_node_inference_service: {
"spec": {
"predictor": request.param["spec"],
},
}
}
):
for sample in TimeoutSampler(
wait_timeout=Timeout.TIMEOUT_10MIN,
sleep=10,
func=get_pods_by_isvc_generation,
client=unprivileged_client,
isvc=multi_node_inference_service,
):
if sample:
break
yield multi_node_inference_service
@pytest.fixture()
def ray_ca_tls_secret(admin_client: DynamicClient) -> Secret:
return Secret(
client=admin_client,
name="ray-ca-tls",
namespace=py_config["applications_namespace"],
)
@pytest.fixture()
def ray_tls_secret(unprivileged_client: DynamicClient, multi_node_inference_service: InferenceService) -> Secret:
return Secret(
client=unprivileged_client,
name="ray-tls",
namespace=multi_node_inference_service.namespace,
)
@pytest.fixture()
def deleted_serving_runtime(
multi_node_serving_runtime: ServingRuntime,
) -> Generator[None, Any]:
multi_node_serving_runtime.clean_up()
yield
multi_node_serving_runtime.deploy()
@pytest.fixture()
def deleted_multi_node_pod(
request: FixtureRequest,
unprivileged_client: DynamicClient,
multi_node_inference_service: InferenceService,
) -> None:
delete_multi_node_pod_by_role(
client=unprivileged_client,
isvc=multi_node_inference_service,
role=request.param["pod-role"],
)
verify_no_failed_pods(
client=unprivileged_client,
isvc=multi_node_inference_service,
timeout=Timeout.TIMEOUT_10MIN,
)
wait_for_inference_deployment_replicas(
client=unprivileged_client,
isvc=multi_node_inference_service,
expected_num_deployments=2,
)
_warmup_inference_and_wait_for_recovery(
client=unprivileged_client,
isvc=multi_node_inference_service,
)
def _warmup_inference_and_wait_for_recovery(
client: DynamicClient,
isvc: InferenceService,
) -> None:
"""Probe the head pod with a real inference request until it succeeds.
After a pod deletion+recovery cycle, the vLLM EngineCore may be in a corrupted
state that only crashes on the first real inference request. The /v1/models
endpoint returns 200 even when the engine is broken, so we must use
/v1/completions to exercise the distributed engine path. The liveness probe
then restarts the head pod, which takes several minutes. This function retries
until the completions endpoint responds with 200.
"""
probe_cmd = [
"curl",
"-s",
"--max-time",
"15",
"-o",
"/dev/null",
"-w",
"%{http_code}",
"-H",
"Content-type:application/json",
"-d",
f'{{"model":"{isvc.name}","prompt":"test","max_tokens":1}}',
"http://localhost:8080/v1/completions",
]
for sample in TimeoutSampler(
wait_timeout=Timeout.TIMEOUT_10MIN,
sleep=30,
func=_probe_inference_health,
client=client,
isvc=isvc,
cmd=probe_cmd,
):
if sample:
return
def _probe_inference_health(
client: DynamicClient,
isvc: InferenceService,
cmd: list[str],
) -> bool:
"""Return True if the head pod's completions endpoint returns 200.
Re-fetches pods each call to handle pod replacement during restarts.
"""
for pod in get_pods_by_isvc_label(client=client, isvc=isvc):
if WORKER_POD_ROLE in pod.name:
continue
for condition in pod.instance.status.conditions or []:
if condition.type == "Ready" and condition.status != "True":
LOGGER.info(f"Head pod {pod.name} not Ready yet")
return False
try:
result = pod.execute(command=cmd)
except (ApiException, OSError) as exc:
LOGGER.warning(f"Inference probe on {pod.name} failed: {exc}")
return False
else:
status = result.strip()
LOGGER.info(f"Inference probe on {pod.name}: HTTP {status}")
return status == "200"
LOGGER.warning("No head pod found")
return False