Skip to content

Commit 67a427c

Browse files
[RHOAIENG-25148] Created kueue integration tests with isvcs
Created kueue isvc tests --- **Summary by CodeRabbit** * New Features * Introduced integration tests for Kueue admission control with InferenceService in both raw and serverless deployment modes. * Added utilities and fixtures for creating and managing Kueue Kubernetes resources (ResourceFlavor, LocalQueue, ClusterQueue) to support model serving tests. * Enhanced namespace creation to support automatic Kueue management labeling. * Extended InferenceService creation to support custom labels and max replica settings. * Bug Fixes * Improved handling of labels and replica specifications when creating InferenceService resources. * Tests * Added comprehensive tests to verify that Kueue enforces resource quotas, limiting deployment scaling as expected. * Chores * Added a new pytest marker for Kueue-related tests. * Introduced timeout constant for test utilities. --------- Signed-off-by: Andres Llausas <allausas@redhat.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent af4eee1 commit 67a427c

File tree

8 files changed

+723
-1
lines changed

8 files changed

+723
-1
lines changed

pytest.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ markers =
2626
minio: Mark tests which are using MinIO storage
2727
tls: Mark tests which are testing TLS
2828
metrics: Mark tests which are testing metrics
29+
kueue: Mark tests which are testing Kueue
2930

3031
addopts =
3132
-s
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
from typing import Generator, Any, Dict
2+
3+
import pytest
4+
from kubernetes.dynamic import DynamicClient
5+
from _pytest.fixtures import FixtureRequest
6+
from utilities.kueue_utils import (
7+
create_local_queue,
8+
create_cluster_queue,
9+
create_resource_flavor,
10+
LocalQueue,
11+
ClusterQueue,
12+
ResourceFlavor,
13+
)
14+
from ocp_resources.namespace import Namespace
15+
from utilities.constants import ModelAndFormat, KServeDeploymentType
16+
from utilities.inference_utils import create_isvc
17+
from utilities.serving_runtime import ServingRuntimeFromTemplate
18+
from ocp_resources.secret import Secret
19+
from ocp_resources.inference_service import InferenceService
20+
from ocp_resources.serving_runtime import ServingRuntime
21+
from utilities.constants import RuntimeTemplates, ModelFormat
22+
import logging
23+
24+
BASIC_LOGGER = logging.getLogger(name="basic")
25+
26+
27+
def kueue_resource_groups(
28+
flavor_name: str,
29+
cpu_quota: int,
30+
memory_quota: str,
31+
) -> list[Dict[str, Any]]:
32+
return [
33+
{
34+
"coveredResources": ["cpu", "memory"],
35+
"flavors": [
36+
{
37+
"name": flavor_name,
38+
"resources": [
39+
{"name": "cpu", "nominalQuota": cpu_quota},
40+
{"name": "memory", "nominalQuota": memory_quota},
41+
],
42+
}
43+
],
44+
}
45+
]
46+
47+
48+
@pytest.fixture(scope="class")
49+
def kueue_cluster_queue_from_template(
50+
request: FixtureRequest,
51+
admin_client: DynamicClient,
52+
) -> Generator[ClusterQueue, Any, Any]:
53+
if request.param.get("name") is None:
54+
raise ValueError("name is required")
55+
with create_cluster_queue(
56+
name=request.param.get("name"),
57+
client=admin_client,
58+
resource_groups=kueue_resource_groups(
59+
request.param.get("resource_flavor_name"), request.param.get("cpu_quota"), request.param.get("memory_quota")
60+
),
61+
namespace_selector=request.param.get("namespace_selector", {}),
62+
) as cluster_queue:
63+
yield cluster_queue
64+
65+
66+
@pytest.fixture(scope="class")
67+
def kueue_resource_flavor_from_template(
68+
request: FixtureRequest,
69+
admin_client: DynamicClient,
70+
) -> Generator[ResourceFlavor, Any, Any]:
71+
if request.param.get("name") is None:
72+
raise ValueError("name is required")
73+
with create_resource_flavor(
74+
name=request.param.get("name"),
75+
client=admin_client,
76+
) as resource_flavor:
77+
yield resource_flavor
78+
79+
80+
@pytest.fixture(scope="class")
81+
def kueue_local_queue_from_template(
82+
request: FixtureRequest,
83+
unprivileged_model_namespace: Namespace,
84+
admin_client: DynamicClient,
85+
) -> Generator[LocalQueue, Any, Any]:
86+
if request.param.get("name") is None:
87+
raise ValueError("name is required")
88+
if request.param.get("cluster_queue") is None:
89+
raise ValueError("cluster_queue is required")
90+
with create_local_queue(
91+
name=request.param.get("name"),
92+
namespace=unprivileged_model_namespace.name,
93+
cluster_queue=request.param.get("cluster_queue"),
94+
client=admin_client,
95+
) as local_queue:
96+
yield local_queue
97+
98+
99+
@pytest.fixture(scope="class")
100+
def kueue_raw_inference_service(
101+
request: FixtureRequest,
102+
admin_client: DynamicClient,
103+
unprivileged_model_namespace: Namespace,
104+
kueue_kserve_serving_runtime: ServingRuntime,
105+
ci_endpoint_s3_secret: Secret,
106+
) -> Generator[InferenceService, Any, Any]:
107+
with create_isvc(
108+
client=admin_client,
109+
name=f"{request.param['name']}-raw",
110+
namespace=unprivileged_model_namespace.name,
111+
external_route=True,
112+
runtime=kueue_kserve_serving_runtime.name,
113+
storage_path=request.param["model-dir"],
114+
storage_key=ci_endpoint_s3_secret.name,
115+
model_format=ModelAndFormat.OPENVINO_IR,
116+
deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT,
117+
model_version=request.param["model-version"],
118+
labels=request.param.get("labels", {}),
119+
resources=request.param.get(
120+
"resources", {"requests": {"cpu": "1", "memory": "8Gi"}, "limits": {"cpu": "2", "memory": "10Gi"}}
121+
),
122+
min_replicas=request.param.get("min-replicas", 1),
123+
max_replicas=request.param.get("max-replicas", 2),
124+
) as isvc:
125+
yield isvc
126+
127+
128+
@pytest.fixture(scope="class")
129+
def kueue_kserve_inference_service(
130+
request: FixtureRequest,
131+
admin_client: DynamicClient,
132+
unprivileged_model_namespace: Namespace,
133+
kueue_kserve_serving_runtime: ServingRuntime,
134+
ci_endpoint_s3_secret: Secret,
135+
) -> Generator[InferenceService, Any, Any]:
136+
deployment_mode = request.param["deployment-mode"]
137+
isvc_kwargs = {
138+
"client": admin_client,
139+
"name": f"{request.param['name']}-{deployment_mode.lower()}",
140+
"namespace": unprivileged_model_namespace.name,
141+
"runtime": kueue_kserve_serving_runtime.name,
142+
"storage_path": request.param["model-dir"],
143+
"storage_key": ci_endpoint_s3_secret.name,
144+
"model_format": ModelAndFormat.OPENVINO_IR,
145+
"deployment_mode": deployment_mode,
146+
"model_version": request.param["model-version"],
147+
"labels": request.param.get("labels", {}),
148+
"resources": request.param.get(
149+
"resources", {"requests": {"cpu": "1", "memory": "8Gi"}, "limits": {"cpu": "2", "memory": "10Gi"}}
150+
),
151+
"min_replicas": request.param.get("min-replicas", 1),
152+
"max_replicas": request.param.get("max-replicas", 2),
153+
}
154+
155+
if env_vars := request.param.get("env-vars"):
156+
isvc_kwargs["model_env_variables"] = env_vars
157+
158+
if (min_replicas := request.param.get("min-replicas")) is not None:
159+
isvc_kwargs["min_replicas"] = min_replicas
160+
if min_replicas == 0:
161+
isvc_kwargs["wait_for_predictor_pods"] = False
162+
163+
if scale_metric := request.param.get("scale-metric"):
164+
isvc_kwargs["scale_metric"] = scale_metric
165+
166+
if (scale_target := request.param.get("scale-target")) is not None:
167+
isvc_kwargs["scale_target"] = scale_target
168+
169+
if (resources := request.param.get("resources")) is not None:
170+
isvc_kwargs["resources"] = resources
171+
172+
print("isvc_kwargs before create_isvc", isvc_kwargs)
173+
with create_isvc(**isvc_kwargs) as isvc:
174+
yield isvc
175+
176+
177+
@pytest.fixture(scope="class")
178+
def kueue_kserve_serving_runtime(
179+
request: FixtureRequest,
180+
unprivileged_client: DynamicClient,
181+
unprivileged_model_namespace: Namespace,
182+
) -> Generator[ServingRuntime, Any, Any]:
183+
runtime_kwargs = {
184+
"client": unprivileged_client,
185+
"namespace": unprivileged_model_namespace.name,
186+
"name": request.param["runtime-name"],
187+
"template_name": RuntimeTemplates.OVMS_KSERVE,
188+
"multi_model": False,
189+
"resources": {
190+
ModelFormat.OVMS: {
191+
"requests": {"cpu": "1", "memory": "8Gi"},
192+
"limits": {"cpu": "2", "memory": "10Gi"},
193+
}
194+
},
195+
}
196+
197+
if model_format_name := request.param.get("model-format"):
198+
runtime_kwargs["model_format_name"] = model_format_name
199+
200+
if supported_model_formats := request.param.get("supported-model-formats"):
201+
runtime_kwargs["supported_model_formats"] = supported_model_formats
202+
203+
if runtime_image := request.param.get("runtime-image"):
204+
runtime_kwargs["runtime_image"] = runtime_image
205+
206+
with ServingRuntimeFromTemplate(**runtime_kwargs) as model_runtime:
207+
yield model_runtime
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
"""
2+
Integration test for Kueue and InferenceService admission control.
3+
This test imports the reusable test logic from utilities.kueue_utils.
4+
"""
5+
6+
import pytest
7+
from ocp_resources.deployment import Deployment
8+
from timeout_sampler import TimeoutExpiredError, TimeoutSampler
9+
from utilities.constants import RunTimeConfigs, KServeDeploymentType, ModelVersion
10+
from utilities.general import create_isvc_label_selector_str
11+
from utilities.kueue_utils import check_gated_pods_and_running_pods
12+
13+
pytestmark = [
14+
pytest.mark.rawdeployment,
15+
pytest.mark.sanity,
16+
pytest.mark.usefixtures("valid_aws_config"),
17+
pytest.mark.kueue,
18+
pytest.mark.smoke,
19+
]
20+
21+
NAMESPACE_NAME = "kueue-isvc-raw-test"
22+
LOCAL_QUEUE_NAME = "local-queue-raw"
23+
CLUSTER_QUEUE_NAME = "cluster-queue-raw"
24+
RESOURCE_FLAVOR_NAME = "default-flavor-raw"
25+
CPU_QUOTA = 2
26+
MEMORY_QUOTA = "10Gi"
27+
ISVC_RESOURCES = {"requests": {"cpu": "1", "memory": "8Gi"}, "limits": {"cpu": CPU_QUOTA, "memory": MEMORY_QUOTA}}
28+
# min_replicas needs to be 1 or you need to change the test to check for the number of
29+
# available replicas
30+
MIN_REPLICAS = 1
31+
MAX_REPLICAS = 2
32+
EXPECTED_RUNNING_PODS = 1
33+
EXPECTED_GATED_PODS = 1
34+
EXPECTED_DEPLOYMENTS = 1
35+
EXPECTED_INITIAL_REPLICAS = 1
36+
EXPECTED_UPDATED_REPLICAS = 2
37+
38+
39+
@pytest.mark.rawdeployment
40+
@pytest.mark.parametrize(
41+
"unprivileged_model_namespace, kueue_kserve_serving_runtime, kueue_raw_inference_service, "
42+
"kueue_cluster_queue_from_template, kueue_resource_flavor_from_template, kueue_local_queue_from_template",
43+
[
44+
pytest.param(
45+
{"name": NAMESPACE_NAME, "add-kueue-label": True},
46+
RunTimeConfigs.ONNX_OPSET13_RUNTIME_CONFIG,
47+
{
48+
"name": "kueue-isvc-raw",
49+
"min-replicas": MIN_REPLICAS,
50+
"max-replicas": MAX_REPLICAS,
51+
"labels": {"kueue.x-k8s.io/queue-name": LOCAL_QUEUE_NAME},
52+
"deployment-mode": KServeDeploymentType.RAW_DEPLOYMENT,
53+
"model-dir": "test-dir",
54+
"model-version": ModelVersion.OPSET13,
55+
"resources": ISVC_RESOURCES,
56+
},
57+
{
58+
"name": CLUSTER_QUEUE_NAME,
59+
"resource_flavor_name": RESOURCE_FLAVOR_NAME,
60+
"cpu_quota": CPU_QUOTA,
61+
"memory_quota": MEMORY_QUOTA,
62+
# "namespace_selector": {"matchLabels": {"kubernetes.io/metadata.name": NAMESPACE_NAME}},
63+
"namespace_selector": {},
64+
},
65+
{"name": RESOURCE_FLAVOR_NAME},
66+
{"name": LOCAL_QUEUE_NAME, "cluster_queue": CLUSTER_QUEUE_NAME},
67+
)
68+
],
69+
indirect=True,
70+
)
71+
class TestKueueInferenceServiceRaw:
72+
"""Test inference service with raw deployment"""
73+
74+
def _get_deployment_status_replicas(self, deployment: Deployment) -> int:
75+
deployment.get()
76+
return deployment.instance.status.replicas
77+
78+
def test_kueue_inference_service_raw(
79+
self,
80+
admin_client,
81+
kueue_resource_flavor_from_template,
82+
kueue_cluster_queue_from_template,
83+
kueue_local_queue_from_template,
84+
kueue_raw_inference_service,
85+
kueue_kserve_serving_runtime,
86+
):
87+
"""Test inference service with raw deployment"""
88+
deployment_labels = [
89+
create_isvc_label_selector_str(
90+
isvc=kueue_raw_inference_service,
91+
resource_type="deployment",
92+
runtime_name=kueue_kserve_serving_runtime.name,
93+
)
94+
]
95+
pod_labels = [
96+
create_isvc_label_selector_str(
97+
isvc=kueue_raw_inference_service,
98+
resource_type="pod",
99+
runtime_name=kueue_kserve_serving_runtime.name,
100+
)
101+
]
102+
deployments = list(
103+
Deployment.get(
104+
label_selector=",".join(deployment_labels),
105+
namespace=kueue_raw_inference_service.namespace,
106+
dyn_client=admin_client,
107+
)
108+
)
109+
assert len(deployments) == EXPECTED_DEPLOYMENTS, (
110+
f"Expected {EXPECTED_DEPLOYMENTS} deployment, got {len(deployments)}"
111+
)
112+
113+
deployment = deployments[0]
114+
deployment.wait_for_replicas(deployed=True)
115+
replicas = deployment.instance.spec.replicas
116+
assert replicas == EXPECTED_INITIAL_REPLICAS, (
117+
f"Deployment should have {EXPECTED_INITIAL_REPLICAS} replica, got {replicas}"
118+
)
119+
120+
# Update inference service to request 2 replicas
121+
isvc_to_update = kueue_raw_inference_service.instance.to_dict()
122+
isvc_to_update["spec"]["predictor"]["minReplicas"] = EXPECTED_UPDATED_REPLICAS
123+
kueue_raw_inference_service.update(isvc_to_update)
124+
125+
# Check the deployment until it has 2 replicas, which means it's been updated
126+
for replicas in TimeoutSampler(
127+
wait_timeout=30,
128+
sleep=2,
129+
func=lambda: self._get_deployment_status_replicas(deployment),
130+
):
131+
if replicas == EXPECTED_UPDATED_REPLICAS:
132+
break
133+
134+
# Verify only 1 pod is running due to Kueue admission control, 1 pod is pending due to Kueue admission control
135+
try:
136+
for running_pods, gated_pods in TimeoutSampler(
137+
wait_timeout=30,
138+
sleep=2,
139+
func=lambda: check_gated_pods_and_running_pods(
140+
pod_labels, kueue_raw_inference_service.namespace, admin_client
141+
),
142+
):
143+
if running_pods == EXPECTED_RUNNING_PODS and gated_pods == EXPECTED_GATED_PODS:
144+
break
145+
except TimeoutExpiredError:
146+
assert False, (
147+
f"Timeout waiting for {EXPECTED_RUNNING_PODS} running pods and "
148+
f"{EXPECTED_GATED_PODS} gated pods, got {running_pods} running pods and {gated_pods} gated pods"
149+
)
150+
151+
# Refresh the isvc instance to get latest status
152+
kueue_raw_inference_service.get()
153+
isvc = kueue_raw_inference_service.instance
154+
total_copies = isvc.status.modelStatus.copies.totalCopies
155+
assert total_copies == EXPECTED_RUNNING_PODS, (
156+
f"InferenceService should have {EXPECTED_RUNNING_PODS} total model copy, got {total_copies}"
157+
)

0 commit comments

Comments
 (0)