Skip to content

Commit 4b8f454

Browse files
rnetserdbasunag
authored andcommitted
[model server] add multi node args check (opendatahub-io#276)
* feat: add multi node args * feat: add multi node args * fix: add wait on delete * fix: update new test
1 parent 15d9017 commit 4b8f454

File tree

3 files changed

+78
-14
lines changed

3 files changed

+78
-14
lines changed

tests/model_serving/model_server/multi_node/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,15 +144,15 @@ def patched_multi_node_isvc_external_route(
144144

145145

146146
@pytest.fixture(scope="function")
147-
def patched_multi_node_worker_spec(
147+
def patched_multi_node_spec(
148148
request: FixtureRequest,
149149
multi_node_inference_service: InferenceService,
150150
) -> Generator[InferenceService, Any, Any]:
151151
with ResourceEditor(
152152
patches={
153153
multi_node_inference_service: {
154154
"spec": {
155-
"predictor": {"workerSpec": request.param["worker-spec"]},
155+
"predictor": request.param["spec"],
156156
},
157157
}
158158
}

tests/model_serving/model_server/multi_node/test_nvidia_multi_node.py

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
)
1010
from tests.model_serving.model_server.multi_node.utils import (
1111
get_pods_by_isvc_generation,
12+
is_arg_in_model_spec,
1213
verify_nvidia_gpu_status,
1314
verify_ray_status,
1415
)
@@ -23,6 +24,7 @@
2324

2425

2526
LOGGER = get_logger(name=__name__)
27+
MAX_NUM_BATCHED_TOKENS_ARG: str = "--max-num-batched-tokens=256"
2628

2729

2830
@pytest.mark.parametrize(
@@ -168,17 +170,26 @@ def test_multi_node_basic_external_inference(self, patched_multi_node_isvc_exter
168170
)
169171

170172
@pytest.mark.parametrize(
171-
"patched_multi_node_worker_spec",
172-
[pytest.param({"worker-spec": {"pipelineParallelSize": 2, "tensorParallelSize": 4}})],
173+
"patched_multi_node_spec",
174+
[
175+
pytest.param({
176+
"spec": {
177+
"workerSpec": {
178+
"pipelineParallelSize": 2,
179+
"tensorParallelSize": 4,
180+
}
181+
}
182+
})
183+
],
173184
indirect=True,
174185
)
175-
def test_multi_node_tensor_parallel_size_propagation(self, unprivileged_client, patched_multi_node_worker_spec):
186+
def test_multi_node_tensor_parallel_size_propagation(self, unprivileged_client, patched_multi_node_spec):
176187
"""Test multi node tensor parallel size (number of GPUs per pod) propagation to pod config"""
177-
isvc_parallel_size = str(patched_multi_node_worker_spec.instance.spec.predictor.workerSpec.tensorParallelSize)
188+
isvc_parallel_size = str(patched_multi_node_spec.instance.spec.predictor.workerSpec.tensorParallelSize)
178189

179190
failed_pods: list[dict[str, Any]] = []
180191

181-
for pod in get_pods_by_isvc_generation(client=unprivileged_client, isvc=patched_multi_node_worker_spec):
192+
for pod in get_pods_by_isvc_generation(client=unprivileged_client, isvc=patched_multi_node_spec):
182193
pod_resources = pod.instance.spec.containers[0].resources
183194
if not (
184195
isvc_parallel_size
@@ -191,17 +202,47 @@ def test_multi_node_tensor_parallel_size_propagation(self, unprivileged_client,
191202
pytest.fail(f"Failed pods resources : {failed_pods}, expected tesnor parallel size {isvc_parallel_size}")
192203

193204
@pytest.mark.parametrize(
194-
"patched_multi_node_worker_spec",
195-
[pytest.param({"worker-spec": {"pipelineParallelSize": 2, "tensorParallelSize": 1}})],
205+
"patched_multi_node_spec",
206+
[
207+
pytest.param({
208+
"spec": {
209+
"workerSpec": {
210+
"pipelineParallelSize": 2,
211+
"tensorParallelSize": 1,
212+
}
213+
}
214+
})
215+
],
196216
indirect=True,
197217
)
198-
def test_multi_node_pipeline_parallel_size_propagation(self, unprivileged_client, patched_multi_node_worker_spec):
218+
def test_multi_node_pipeline_parallel_size_propagation(self, unprivileged_client, patched_multi_node_spec):
199219
"""Test multi node pipeline parallel size (number of pods) propagation to pod config"""
200-
isvc_parallel_size = patched_multi_node_worker_spec.instance.spec.predictor.workerSpec.pipelineParallelSize
201-
isvc_num_pods = get_pods_by_isvc_generation(client=unprivileged_client, isvc=patched_multi_node_worker_spec)
220+
isvc_parallel_size = patched_multi_node_spec.instance.spec.predictor.workerSpec.pipelineParallelSize
221+
isvc_num_pods = get_pods_by_isvc_generation(client=unprivileged_client, isvc=patched_multi_node_spec)
202222

203223
if isvc_parallel_size != len(isvc_num_pods):
204224
pytest.fail(
205225
f"Expected pipeline parallel size {isvc_parallel_size} "
206226
f"does not match number of pods {len(isvc_num_pods)}"
207227
)
228+
229+
@pytest.mark.parametrize(
230+
"patched_multi_node_spec",
231+
[pytest.param({"spec": {"model": {"args": [MAX_NUM_BATCHED_TOKENS_ARG]}}})],
232+
indirect=True,
233+
)
234+
@pytest.mark.dependency(name="test_model_args_added_to_vllm_command")
235+
def test_model_args_added_to_model_spec(self, unprivileged_client, patched_multi_node_spec):
236+
"""Test model args added to vllm command"""
237+
if not is_arg_in_model_spec(
238+
client=unprivileged_client, isvc=patched_multi_node_spec, arg=MAX_NUM_BATCHED_TOKENS_ARG
239+
):
240+
pytest.fail(f"{MAX_NUM_BATCHED_TOKENS_ARG} model args is not set in spec")
241+
242+
@pytest.mark.dependency(depends=["test_model_args_added_to_vllm_command"])
243+
def test_model_args_removed_from_model_spec(self, unprivileged_client, multi_node_inference_service):
244+
"""Test model args removed from vllm command"""
245+
if is_arg_in_model_spec(
246+
client=unprivileged_client, isvc=multi_node_inference_service, arg=MAX_NUM_BATCHED_TOKENS_ARG
247+
):
248+
pytest.fail(f"{MAX_NUM_BATCHED_TOKENS_ARG} model args is not removed from spec")

tests/model_serving/model_server/multi_node/utils.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,10 @@ def delete_multi_node_pod_by_role(client: DynamicClient, isvc: InferenceService,
9090

9191
for pod in pods:
9292
if role == WORKER_POD_ROLE and WORKER_POD_ROLE in pod.name:
93-
pod.delete()
93+
pod.delete(wait=True)
9494

9595
elif role == HEAD_POD_ROLE and WORKER_POD_ROLE not in pod.name:
96-
pod.delete()
96+
pod.delete(wait=True)
9797

9898

9999
@retry(wait_timeout=Timeout.TIMEOUT_2MIN, sleep=5)
@@ -122,3 +122,26 @@ def get_pods_by_isvc_generation(client: DynamicClient, isvc: InferenceService) -
122122
return pods
123123

124124
raise ResourceNotFoundError(f"InferenceService {isvc.name} generation {isvc_generation} has no pods")
125+
126+
127+
def is_arg_in_model_spec(client: DynamicClient, isvc: InferenceService, arg: str) -> bool:
128+
"""
129+
Check if arg is in model spec; spec.model.args are only added to head pod
130+
131+
Args:
132+
client (DynamicClient): OCP Client to use.
133+
isvc (InferenceService): InferenceService object.
134+
arg (str): arg to check
135+
136+
Returns:
137+
bool: True if arg is in model spec, False otherwise
138+
139+
Raises:
140+
ResourceNotFoundError if no head pod found.
141+
142+
"""
143+
for pod in get_pods_by_isvc_generation(client=client, isvc=isvc):
144+
if WORKER_POD_ROLE not in pod.name:
145+
return arg in pod.instance.spec.containers[0].args
146+
147+
raise ResourceNotFoundError(f"InferenceService {isvc.name} has no head pod")

0 commit comments

Comments
 (0)