forked from opendatahub-io/opendatahub-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_kueue_isvc_raw.py
More file actions
155 lines (143 loc) · 6.05 KB
/
test_kueue_isvc_raw.py
File metadata and controls
155 lines (143 loc) · 6.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""
Integration test for Kueue and InferenceService admission control.
This test imports the reusable test logic from utilities.kueue_utils.
"""
import pytest
from ocp_resources.deployment import Deployment
from timeout_sampler import TimeoutExpiredError, TimeoutSampler
from utilities.constants import RunTimeConfigs, KServeDeploymentType, ModelVersion
from utilities.general import create_isvc_label_selector_str
from utilities.kueue_utils import check_gated_pods_and_running_pods
pytestmark = [
pytest.mark.rawdeployment,
pytest.mark.usefixtures("valid_aws_config"),
pytest.mark.kueue,
]
NAMESPACE_NAME = "test-kueue-isvc-raw"
LOCAL_QUEUE_NAME = "local-queue-raw"
CLUSTER_QUEUE_NAME = "cluster-queue-raw"
RESOURCE_FLAVOR_NAME = "default-flavor-raw"
CPU_QUOTA = 2
MEMORY_QUOTA = "10Gi"
ISVC_RESOURCES = {"requests": {"cpu": "1", "memory": "8Gi"}, "limits": {"cpu": CPU_QUOTA, "memory": MEMORY_QUOTA}}
# min_replicas needs to be 1 or you need to change the test to check for the number of
# available replicas
MIN_REPLICAS = 1
MAX_REPLICAS = 2
EXPECTED_RUNNING_PODS = 1
EXPECTED_GATED_PODS = 1
EXPECTED_DEPLOYMENTS = 1
EXPECTED_INITIAL_REPLICAS = 1
EXPECTED_UPDATED_REPLICAS = 2
@pytest.mark.rawdeployment
@pytest.mark.parametrize(
"unprivileged_model_namespace, kueue_kserve_serving_runtime, kueue_raw_inference_service, "
"kueue_cluster_queue_from_template, kueue_resource_flavor_from_template, kueue_local_queue_from_template",
[
pytest.param(
{"name": NAMESPACE_NAME, "add-kueue-label": True},
RunTimeConfigs.ONNX_OPSET13_RUNTIME_CONFIG,
{
"name": "kueue-isvc-raw",
"min-replicas": MIN_REPLICAS,
"max-replicas": MAX_REPLICAS,
"labels": {"kueue.x-k8s.io/queue-name": LOCAL_QUEUE_NAME},
"deployment-mode": KServeDeploymentType.RAW_DEPLOYMENT,
"model-dir": "test-dir",
"model-version": ModelVersion.OPSET13,
"resources": ISVC_RESOURCES,
},
{
"name": CLUSTER_QUEUE_NAME,
"resource_flavor_name": RESOURCE_FLAVOR_NAME,
"cpu_quota": CPU_QUOTA,
"memory_quota": MEMORY_QUOTA,
# "namespace_selector": {"matchLabels": {"kubernetes.io/metadata.name": NAMESPACE_NAME}},
"namespace_selector": {},
},
{"name": RESOURCE_FLAVOR_NAME},
{"name": LOCAL_QUEUE_NAME, "cluster_queue": CLUSTER_QUEUE_NAME},
)
],
indirect=True,
)
class TestKueueInferenceServiceRaw:
"""Test inference service with raw deployment"""
def _get_deployment_status_replicas(self, deployment: Deployment) -> int:
deployment.get()
return deployment.instance.status.replicas
def test_kueue_inference_service_raw(
self,
admin_client,
kueue_resource_flavor_from_template,
kueue_cluster_queue_from_template,
kueue_local_queue_from_template,
kueue_raw_inference_service,
kueue_kserve_serving_runtime,
):
"""Test inference service with raw deployment"""
deployment_labels = [
create_isvc_label_selector_str(
isvc=kueue_raw_inference_service,
resource_type="deployment",
runtime_name=kueue_kserve_serving_runtime.name,
)
]
pod_labels = [
create_isvc_label_selector_str(
isvc=kueue_raw_inference_service,
resource_type="pod",
runtime_name=kueue_kserve_serving_runtime.name,
)
]
deployments = list(
Deployment.get(
label_selector=",".join(deployment_labels),
namespace=kueue_raw_inference_service.namespace,
dyn_client=admin_client,
)
)
assert len(deployments) == EXPECTED_DEPLOYMENTS, (
f"Expected {EXPECTED_DEPLOYMENTS} deployment, got {len(deployments)}"
)
deployment = deployments[0]
deployment.wait_for_replicas(deployed=True)
replicas = deployment.instance.spec.replicas
assert replicas == EXPECTED_INITIAL_REPLICAS, (
f"Deployment should have {EXPECTED_INITIAL_REPLICAS} replica, got {replicas}"
)
# Update inference service to request 2 replicas
isvc_to_update = kueue_raw_inference_service.instance.to_dict()
isvc_to_update["spec"]["predictor"]["minReplicas"] = EXPECTED_UPDATED_REPLICAS
kueue_raw_inference_service.update(isvc_to_update)
# Check the deployment until it has 2 replicas, which means it's been updated
for replicas in TimeoutSampler(
wait_timeout=30,
sleep=2,
func=lambda: self._get_deployment_status_replicas(deployment),
):
if replicas == EXPECTED_UPDATED_REPLICAS:
break
# Verify only 1 pod is running due to Kueue admission control, 1 pod is pending due to Kueue admission control
try:
for running_pods, gated_pods in TimeoutSampler(
wait_timeout=30,
sleep=2,
func=lambda: check_gated_pods_and_running_pods(
pod_labels, kueue_raw_inference_service.namespace, admin_client
),
):
if running_pods == EXPECTED_RUNNING_PODS and gated_pods == EXPECTED_GATED_PODS:
break
except TimeoutExpiredError:
assert False, (
f"Timeout waiting for {EXPECTED_RUNNING_PODS} running pods and "
f"{EXPECTED_GATED_PODS} gated pods, got {running_pods} running pods and {gated_pods} gated pods"
)
# Refresh the isvc instance to get latest status
kueue_raw_inference_service.get()
isvc = kueue_raw_inference_service.instance
total_copies = isvc.status.modelStatus.copies.totalCopies
assert total_copies == EXPECTED_RUNNING_PODS, (
f"InferenceService should have {EXPECTED_RUNNING_PODS} total model copy, got {total_copies}"
)