Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/model_serving/model_server/multi_node/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,15 @@ def patched_multi_node_isvc_external_route(


@pytest.fixture(scope="function")
def patched_multi_node_worker_spec(
def patched_multi_node_spec(
request: FixtureRequest,
multi_node_inference_service: InferenceService,
) -> Generator[InferenceService, Any, Any]:
with ResourceEditor(
patches={
multi_node_inference_service: {
"spec": {
"predictor": {"workerSpec": request.param["worker-spec"]},
"predictor": request.param["spec"],
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
)
from tests.model_serving.model_server.multi_node.utils import (
get_pods_by_isvc_generation,
is_arg_in_model_spec,
verify_nvidia_gpu_status,
verify_ray_status,
)
Expand All @@ -23,6 +24,7 @@


LOGGER = get_logger(name=__name__)
MAX_NUM_BATCHED_TOKENS_ARG: str = "--max-num-batched-tokens=256"


@pytest.mark.parametrize(
Expand Down Expand Up @@ -168,17 +170,26 @@ def test_multi_node_basic_external_inference(self, patched_multi_node_isvc_exter
)

@pytest.mark.parametrize(
"patched_multi_node_worker_spec",
[pytest.param({"worker-spec": {"pipelineParallelSize": 2, "tensorParallelSize": 4}})],
"patched_multi_node_spec",
[
pytest.param({
"spec": {
"workerSpec": {
"pipelineParallelSize": 2,
"tensorParallelSize": 4,
}
}
})
],
indirect=True,
)
def test_multi_node_tensor_parallel_size_propagation(self, admin_client, patched_multi_node_worker_spec):
def test_multi_node_tensor_parallel_size_propagation(self, admin_client, patched_multi_node_spec):
"""Test multi node tensor parallel size (number of GPUs per pod) propagation to pod config"""
isvc_parallel_size = str(patched_multi_node_worker_spec.instance.spec.predictor.workerSpec.tensorParallelSize)
isvc_parallel_size = str(patched_multi_node_spec.instance.spec.predictor.workerSpec.tensorParallelSize)

failed_pods: list[dict[str, Any]] = []

for pod in get_pods_by_isvc_generation(client=admin_client, isvc=patched_multi_node_worker_spec):
for pod in get_pods_by_isvc_generation(client=admin_client, isvc=patched_multi_node_spec):
pod_resources = pod.instance.spec.containers[0].resources
if not (
isvc_parallel_size
Expand All @@ -191,17 +202,43 @@ def test_multi_node_tensor_parallel_size_propagation(self, admin_client, patched
pytest.fail(f"Failed pods resources : {failed_pods}, expected tesnor parallel size {isvc_parallel_size}")

@pytest.mark.parametrize(
"patched_multi_node_worker_spec",
[pytest.param({"worker-spec": {"pipelineParallelSize": 2, "tensorParallelSize": 1}})],
"patched_multi_node_spec",
[
pytest.param({
"spec": {
"workerSpec": {
"pipelineParallelSize": 2,
"tensorParallelSize": 1,
}
}
})
],
indirect=True,
)
def test_multi_node_pipeline_parallel_size_propagation(self, admin_client, patched_multi_node_worker_spec):
def test_multi_node_pipeline_parallel_size_propagation(self, admin_client, patched_multi_node_spec):
"""Test multi node pipeline parallel size (number of pods) propagation to pod config"""
isvc_parallel_size = patched_multi_node_worker_spec.instance.spec.predictor.workerSpec.pipelineParallelSize
isvc_num_pods = get_pods_by_isvc_generation(client=admin_client, isvc=patched_multi_node_worker_spec)
isvc_parallel_size = patched_multi_node_spec.instance.spec.predictor.workerSpec.pipelineParallelSize
isvc_num_pods = get_pods_by_isvc_generation(client=admin_client, isvc=patched_multi_node_spec)

if isvc_parallel_size != len(isvc_num_pods):
pytest.fail(
f"Expected pipeline parallel size {isvc_parallel_size} "
f"does not match number of pods {len(isvc_num_pods)}"
)

@pytest.mark.parametrize(
"patched_multi_node_spec",
[pytest.param({"spec": {"model": {"args": [MAX_NUM_BATCHED_TOKENS_ARG]}}})],
indirect=True,
)
@pytest.mark.dependency(name="test_model_args_added_to_vllm_command")
def test_model_args_added_to_model_spec(self, admin_client, patched_multi_node_spec):
"""Test model args added to vllm command"""
if not is_arg_in_model_spec(client=admin_client, isvc=patched_multi_node_spec, arg=MAX_NUM_BATCHED_TOKENS_ARG):
pytest.fail(f"{MAX_NUM_BATCHED_TOKENS_ARG} model args is not set in spec")

@pytest.mark.dependency(depends=["test_model_args_added_to_vllm_command"])
def test_model_args_removed_from_model_spec(self, admin_client, multi_node_inference_service):
"""Test model args removed from vllm command"""
if is_arg_in_model_spec(client=admin_client, isvc=multi_node_inference_service, arg=MAX_NUM_BATCHED_TOKENS_ARG):
pytest.fail(f"{MAX_NUM_BATCHED_TOKENS_ARG} model args is not removed from spec")
27 changes: 25 additions & 2 deletions tests/model_serving/model_server/multi_node/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ def delete_multi_node_pod_by_role(client: DynamicClient, isvc: InferenceService,

for pod in pods:
if role == WORKER_POD_ROLE and WORKER_POD_ROLE in pod.name:
pod.delete()
pod.delete(wait=True)

elif role == HEAD_POD_ROLE and WORKER_POD_ROLE not in pod.name:
pod.delete()
pod.delete(wait=True)


@retry(wait_timeout=Timeout.TIMEOUT_2MIN, sleep=5)
Expand Down Expand Up @@ -122,3 +122,26 @@ def get_pods_by_isvc_generation(client: DynamicClient, isvc: InferenceService) -
return pods

raise ResourceNotFoundError(f"InferenceService {isvc.name} generation {isvc_generation} has no pods")


def is_arg_in_model_spec(client: DynamicClient, isvc: InferenceService, arg: str) -> bool:
"""
Check if arg is in model spec; spec.model.args are only added to head pod

Args:
client (DynamicClient): OCP Client to use.
isvc (InferenceService): InferenceService object.
arg (str): arg to check

Returns:
bool: True if arg is in model spec, False otherwise

Raises:
ResourceNotFoundError if no head pod found.

"""
for pod in get_pods_by_isvc_generation(client=client, isvc=isvc):
if WORKER_POD_ROLE not in pod.name:
return arg in pod.instance.spec.containers[0].args

raise ResourceNotFoundError(f"InferenceService {isvc.name} has no head pod")