Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 24 additions & 61 deletions tests/model_serving/model_server/kserve/multi_node/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@
from ocp_resources.inference_service import InferenceService
from ocp_resources.namespace import Namespace
from ocp_resources.node import Node
from ocp_resources.persistent_volume_claim import PersistentVolumeClaim
from ocp_resources.pod import Pod
from ocp_resources.resource import ResourceEditor
from ocp_resources.secret import Secret
from ocp_resources.serving_runtime import ServingRuntime
from pytest_testconfig import config as py_config
from timeout_sampler import TimeoutSampler

from tests.model_serving.model_server.kserve.multi_node.utils import (
delete_multi_node_pod_by_role,
wait_for_vllm_health,
)
from timeout_sampler import TimeoutSampler

from utilities.constants import KServeDeploymentType, Labels, Protocols, Timeout, ModelCarImage
from utilities.general import download_model_data
from utilities.inference_utils import create_isvc
from utilities.infra import (
get_pods_by_isvc_label,
Expand All @@ -33,37 +33,20 @@ def nvidia_gpu_nodes(nodes: list[Node]) -> list[Node]:
return [node for node in nodes if "nvidia.com/gpu.present" in node.labels.keys()]


@pytest.fixture(scope="session")
def max_gpu_per_node(nvidia_gpu_nodes: list[Node]) -> int:
return max(
(int(node.instance.status.allocatable.get("nvidia.com/gpu", 0)) for node in nvidia_gpu_nodes),
default=0,
)


@pytest.fixture(scope="session")
def skip_if_no_gpu_nodes(nvidia_gpu_nodes: list[Node]) -> None:
if len(nvidia_gpu_nodes) < 2:
pytest.skip("Multi-node tests can only run on a Cluster with at least 2 GPU Worker nodes")


@pytest.fixture(scope="class")
def models_bucket_downloaded_model_data(
request: FixtureRequest,
admin_client: DynamicClient,
unprivileged_model_namespace: Namespace,
models_s3_bucket_name: str,
model_pvc: PersistentVolumeClaim,
aws_secret_access_key: str,
aws_access_key_id: str,
models_s3_bucket_endpoint: str,
models_s3_bucket_region: str,
) -> str:
return download_model_data(
client=admin_client,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
model_namespace=unprivileged_model_namespace.name,
model_pvc_name=model_pvc.name,
bucket_name=models_s3_bucket_name,
aws_endpoint_url=models_s3_bucket_endpoint,
aws_default_region=models_s3_bucket_region,
model_path=request.param["model-dir"],
)


@pytest.fixture(scope="class")
def multi_node_serving_runtime(
request: FixtureRequest,
Expand All @@ -86,35 +69,6 @@ def multi_node_inference_service(
request: FixtureRequest,
unprivileged_client: DynamicClient,
multi_node_serving_runtime: ServingRuntime,
model_pvc: PersistentVolumeClaim,
models_bucket_downloaded_model_data: str,
) -> Generator[InferenceService, Any, Any]:
with create_isvc(
client=unprivileged_client,
name=request.param["name"],
namespace=multi_node_serving_runtime.namespace,
runtime=multi_node_serving_runtime.name,
storage_uri=f"pvc://{model_pvc.name}/{models_bucket_downloaded_model_data}",
model_format=multi_node_serving_runtime.instance.spec.supportedModelFormats[0].name,
deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT,
autoscaler_mode="external",
multi_node_worker_spec={},
wait_for_predictor_pods=False,
) as isvc:
wait_for_inference_deployment_replicas(
client=unprivileged_client,
isvc=isvc,
expected_num_deployments=2,
runtime_name=multi_node_serving_runtime.name,
)
yield isvc


@pytest.fixture(scope="class")
def multi_node_oci_inference_service(
request: FixtureRequest,
unprivileged_client: DynamicClient,
multi_node_serving_runtime: ServingRuntime,
) -> Generator[InferenceService, Any, Any]:
resources = {
"requests": {
Expand All @@ -136,7 +90,6 @@ def multi_node_oci_inference_service(
]
}

# NOTE: In KServe v0.15, the autoscaler_mode needs to be updated to "none".
with create_isvc(
client=unprivileged_client,
name=request.param["name"],
Expand All @@ -145,11 +98,10 @@ def multi_node_oci_inference_service(
storage_uri=ModelCarImage.GRANITE_8B_CODE_INSTRUCT,
model_format=multi_node_serving_runtime.instance.spec.supportedModelFormats[0].name,
deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT,
autoscaler_mode="external",
autoscaler_mode="none",
resources=resources,
multi_node_worker_spec=worker_resources,
wait_for_predictor_pods=False,
external_route=True,
timeout=Timeout.TIMEOUT_30MIN,
) as isvc:
wait_for_inference_deployment_replicas(
Expand Down Expand Up @@ -177,6 +129,12 @@ def multi_node_predictor_pods_scope_class(
def patched_multi_node_isvc_external_route(
multi_node_inference_service: InferenceService,
) -> Generator[InferenceService, Any, Any]:
multi_node_inference_service.wait_for_condition(
condition=multi_node_inference_service.Condition.READY,
status=multi_node_inference_service.Condition.Status.TRUE,
timeout=Timeout.TIMEOUT_10MIN,
)

with ResourceEditor(
patches={
multi_node_inference_service: {
Expand All @@ -185,7 +143,7 @@ def patched_multi_node_isvc_external_route(
}
):
for sample in TimeoutSampler(
wait_timeout=Timeout.TIMEOUT_1MIN,
wait_timeout=Timeout.TIMEOUT_5MIN,
sleep=1,
func=lambda: multi_node_inference_service.instance.status,
):
Expand Down Expand Up @@ -258,3 +216,8 @@ def deleted_multi_node_pod(
isvc=multi_node_inference_service,
timeout=Timeout.TIMEOUT_10MIN,
)

wait_for_vllm_health(
client=unprivileged_client,
isvc=multi_node_inference_service,
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
verify_ray_status,
)
from tests.model_serving.model_server.utils import verify_inference_response
from utilities.constants import Labels, Protocols, StorageClassName
from utilities.constants import Labels, Protocols
from utilities.manifests.vllm import VLLM_INFERENCE_CONFIG

pytestmark = [
pytest.mark.rawdeployment,
pytest.mark.usefixtures("skip_if_no_gpu_nodes", "skip_if_no_nfs_storage_class"),
pytest.mark.usefixtures("skip_if_no_gpu_nodes"),
pytest.mark.model_server_gpu,
pytest.mark.multinode,
pytest.mark.gpu,
Expand All @@ -31,16 +31,10 @@


@pytest.mark.parametrize(
"unprivileged_model_namespace, models_bucket_downloaded_model_data, model_pvc, multi_node_inference_service",
"unprivileged_model_namespace, multi_node_inference_service",
[
pytest.param(
{"name": "gpu-multi-node"},
{"model-dir": "granite-8b-code-base"},
{
"access-modes": "ReadWriteMany",
"storage-class-name": StorageClassName.NFS,
"pvc-size": "40Gi",
},
{"name": "multi-vllm"},
)
],
Expand All @@ -52,10 +46,10 @@ def test_multi_node_ray_status(self, multi_node_predictor_pods_scope_class):
verify_ray_status(pods=multi_node_predictor_pods_scope_class)

def test_multi_node_nvidia_gpu_status(self, multi_node_predictor_pods_scope_class):
"""Test multi node ray status"""
"""Test multi node nvidia gpu status"""
verify_nvidia_gpu_status(pod=multi_node_predictor_pods_scope_class[0])

def test_multi_node_default_config(self, multi_node_serving_runtime, multi_node_predictor_pods_scope_class):
def test_multi_node_default_config(self, multi_node_serving_runtime, multi_node_inference_service):
"""Test multi node inference service with default config"""
runtime_worker_spec = multi_node_serving_runtime.instance.spec.workerSpec

Expand Down Expand Up @@ -179,17 +173,22 @@ def test_multi_node_basic_external_inference(self, patched_multi_node_isvc_exter
"spec": {
"workerSpec": {
"pipelineParallelSize": 2,
"tensorParallelSize": 4,
"tensorParallelSize": 2,
}
}
})
],
indirect=True,
)
def test_multi_node_tensor_parallel_size_propagation(self, unprivileged_client, patched_multi_node_spec):
def test_multi_node_tensor_parallel_size_propagation(
self, unprivileged_client, patched_multi_node_spec, max_gpu_per_node
):
"""Test multi node tensor parallel size (number of GPUs per pod) propagation to pod config"""
isvc_parallel_size = str(patched_multi_node_spec.instance.spec.predictor.workerSpec.tensorParallelSize)

if int(isvc_parallel_size) > max_gpu_per_node:
pytest.skip(f"tensorParallelSize {isvc_parallel_size} exceeds max GPUs per node ({max_gpu_per_node})")

failed_pods: list[dict[str, Any]] = []

for pod in get_pods_by_isvc_generation(client=unprivileged_client, isvc=patched_multi_node_spec):
Expand Down

This file was deleted.

47 changes: 47 additions & 0 deletions tests/model_serving/model_server/kserve/multi_node/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,53 @@ def get_pods_by_isvc_generation(client: DynamicClient, isvc: InferenceService) -
raise ResourceNotFoundError(f"InferenceService {isvc.name} generation {isvc_generation} has no pods")


@retry(wait_timeout=Timeout.TIMEOUT_10MIN, sleep=10)
def wait_for_vllm_health(client: DynamicClient, isvc: InferenceService) -> bool:
"""Wait for vLLM to serve inference successfully on the head pod.

After pod deletion and recovery, the vLLM /health endpoint and Ray
node count may report healthy before the distributed inference
pipeline is fully rebuilt. This function verifies the model can
serve an actual inference request via pod exec on the head pod.

Args:
client: Dynamic client.
isvc: InferenceService object.

Returns:
True when inference succeeds.

Raises:
RuntimeError: If inference check fails.
"""
for pod in get_pods_by_isvc_label(client=client, isvc=isvc):
if WORKER_POD_ROLE not in pod.name:
result = pod.execute(
command=[
"curl",
"-s",
"-o",
"/dev/null",
"-w",
"%{http_code}",
"-X",
"POST",
"http://localhost:8080/v1/completions",
"-H",
"Content-Type: application/json",
"-d",
f'{{"model":"{isvc.name}","prompt":"test","max_tokens":1}}',
]
)
if result.strip().strip("'") != "200":
raise RuntimeError(f"vLLM inference check returned {result} on head pod {pod.name}")

LOGGER.info(f"vLLM inference check passed on head pod {pod.name}")
return True

raise RuntimeError(f"No head pod found for InferenceService {isvc.name}")


def is_arg_in_model_spec(client: DynamicClient, isvc: InferenceService, arg: str) -> bool:
"""
Check if arg is in model spec; spec.model.args are only added to head pod
Expand Down
2 changes: 1 addition & 1 deletion utilities/manifests/vllm.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
VLLM_INFERENCE_CONFIG = {
"default_query_model": {
"query_input": '"prompt": "At what temperature does Nitrogen boil?", "max_tokens": 100, "temperature": 0',
"query_output": r'{"id":"cmpl-[a-z0-9]+","object":"text_completion","created":\d+,"model":"$model_name","choices":\[{"index":0,"text":".*Theboilingpointofnitrogenis77.4.*","logprobs":null,"finish_reason":"length","stop_reason":null,"prompt_logprobs":null}\],"usage":{"prompt_tokens":10,"total_tokens":110,"completion_tokens":100,"prompt_tokens_details":null}}',
"query_output": r'{"id":"cmpl-[a-z0-9]+","object":"text_completion","created":\d+,"model":"$model_name","choices":\[{"index":0,"text":".*Theboilingpointofnitrogenis77.4.*","logprobs":null,"finish_reason":"length","stop_reason":null(,"[a-z_]+":null)*}\](,"[a-z_]+":null)*,"usage":{"prompt_tokens":10,"total_tokens":110,"completion_tokens":100,"prompt_tokens_details":null}(,"[a-z_]+":null)*}',
"use_regex": True
},
"completions": {
Expand Down
Loading