Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 102 additions & 3 deletions tests/model_serving/model_server/kserve/multi_node/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest
from _pytest.fixtures import FixtureRequest
from kubernetes.client.rest import ApiException
from kubernetes.dynamic import DynamicClient
from ocp_resources.inference_service import InferenceService
from ocp_resources.namespace import Namespace
Expand All @@ -13,10 +14,13 @@
from ocp_resources.secret import Secret
from ocp_resources.serving_runtime import ServingRuntime
from pytest_testconfig import config as py_config
from simple_logger.logger import get_logger
from timeout_sampler import TimeoutSampler

from tests.model_serving.model_server.kserve.multi_node.constants import WORKER_POD_ROLE
from tests.model_serving.model_server.kserve.multi_node.utils import (
delete_multi_node_pod_by_role,
get_pods_by_isvc_generation,
)
from utilities.constants import KServeDeploymentType, Labels, ModelCarImage, Protocols, Timeout
from utilities.general import download_model_data
Expand All @@ -28,6 +32,8 @@
)
from utilities.serving_runtime import ServingRuntimeFromTemplate

LOGGER = get_logger(name=__name__)


@pytest.fixture(scope="session")
def nvidia_gpu_nodes(nodes: list[Node]) -> list[Node]:
Expand Down Expand Up @@ -98,7 +104,7 @@ def multi_node_inference_service(
storage_uri=f"pvc://{model_pvc.name}/{models_bucket_downloaded_model_data}",
model_format=multi_node_serving_runtime.instance.spec.supportedModelFormats[0].name,
deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT,
autoscaler_mode="external",
autoscaler_mode="none",
multi_node_worker_spec={},
wait_for_predictor_pods=False,
) as isvc:
Expand Down Expand Up @@ -137,7 +143,6 @@ def multi_node_oci_inference_service(
]
}

# NOTE: In KServe v0.15, the autoscaler_mode needs to be updated to "none".
with create_isvc(
client=unprivileged_client,
name=request.param["name"],
Expand All @@ -146,7 +151,7 @@ def multi_node_oci_inference_service(
storage_uri=ModelCarImage.GRANITE_8B_CODE_INSTRUCT,
model_format=multi_node_serving_runtime.instance.spec.supportedModelFormats[0].name,
deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT,
autoscaler_mode="external",
autoscaler_mode="none",
resources=resources,
multi_node_worker_spec=worker_resources,
wait_for_predictor_pods=False,
Expand Down Expand Up @@ -199,6 +204,7 @@ def patched_multi_node_isvc_external_route(
@pytest.fixture(scope="function")
def patched_multi_node_spec(
request: FixtureRequest,
unprivileged_client: DynamicClient,
multi_node_inference_service: InferenceService,
) -> Generator[InferenceService, Any, Any]:
with ResourceEditor(
Expand All @@ -210,6 +216,15 @@ def patched_multi_node_spec(
}
}
):
for sample in TimeoutSampler(
wait_timeout=Timeout.TIMEOUT_10MIN,
sleep=10,
func=get_pods_by_isvc_generation,
client=unprivileged_client,
isvc=multi_node_inference_service,
):
if sample:
break
Comment thread
mwaykole marked this conversation as resolved.
yield multi_node_inference_service


Expand Down Expand Up @@ -259,3 +274,87 @@ def deleted_multi_node_pod(
isvc=multi_node_inference_service,
timeout=Timeout.TIMEOUT_10MIN,
)

wait_for_inference_deployment_replicas(
client=unprivileged_client,
isvc=multi_node_inference_service,
expected_num_deployments=2,
)

_warmup_inference_and_wait_for_recovery(
client=unprivileged_client,
isvc=multi_node_inference_service,
)


def _warmup_inference_and_wait_for_recovery(
client: DynamicClient,
isvc: InferenceService,
) -> None:
"""Probe the head pod with a real inference request until it succeeds.

After a pod deletion+recovery cycle, the vLLM EngineCore may be in a corrupted
state that only crashes on the first real inference request. The /v1/models
endpoint returns 200 even when the engine is broken, so we must use
/v1/completions to exercise the distributed engine path. The liveness probe
then restarts the head pod, which takes several minutes. This function retries
until the completions endpoint responds with 200.
"""
probe_cmd = [
"curl",
"-s",
"--max-time",
"15",
"-o",
"/dev/null",
"-w",
"%{http_code}",
"-H",
"Content-type:application/json",
"-d",
f'{{"model":"{isvc.name}","prompt":"test","max_tokens":1}}',
"http://localhost:8080/v1/completions",
]

for sample in TimeoutSampler(
wait_timeout=Timeout.TIMEOUT_10MIN,
sleep=30,
func=_probe_inference_health,
client=client,
isvc=isvc,
cmd=probe_cmd,
):
Comment thread
mwaykole marked this conversation as resolved.
if sample:
return


def _probe_inference_health(
client: DynamicClient,
isvc: InferenceService,
cmd: list[str],
) -> bool:
"""Return True if the head pod's completions endpoint returns 200.

Re-fetches pods each call to handle pod replacement during restarts.
"""
for pod in get_pods_by_isvc_label(client=client, isvc=isvc):
if WORKER_POD_ROLE in pod.name:
continue

for condition in pod.instance.status.conditions or []:
if condition.type == "Ready" and condition.status != "True":
LOGGER.info(f"Head pod {pod.name} not Ready yet")
return False

try:
result = pod.execute(command=cmd)
except (ApiException, OSError) as exc:
LOGGER.warning(f"Inference probe on {pod.name} failed: {exc}")
return False
else:
status = result.strip()
LOGGER.info(f"Inference probe on {pod.name}: HTTP {status}")
return status == "200"

LOGGER.warning("No head pod found")
return False
6 changes: 6 additions & 0 deletions utilities/inference_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,12 @@ def run_inference(
f"Got {HTTPStatus.SERVICE_UNAVAILABLE} error."
)

if re.search(rf"http/1\.\d\s+{HTTPStatus.INTERNAL_SERVER_ERROR.value}\b", out.lower()):
raise InferenceResponseError(
f"Inference service at {self.get_inference_url()} returned "
f"{HTTPStatus.INTERNAL_SERVER_ERROR} error."
)

else:
sanitized_cmd = re.sub(r"('Authorization: Bearer ).*?(')", r"\1***REDACTED***2", cmd)
raise ValueError(f"Inference failed with error: {err}\nOutput: {out}\nCommand: {sanitized_cmd}")
Expand Down
82 changes: 43 additions & 39 deletions utilities/infra.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,16 @@ def verify_no_failed_pods(
"""
wait_for_isvc_pods(client=client, isvc=isvc, runtime_name=runtime_name)

container_wait_base_errors = ["InvalidImageName"]
container_terminated_base_errors = [Resource.Status.ERROR]

# For Model Mesh, if image pulling takes longer, pod may be in CrashLoopBackOff state but recover with retries.
if (
deployment_mode := isvc.instance.metadata.annotations.get("serving.kserve.io/deploymentMode")
) and deployment_mode != KServeDeploymentType.MODEL_MESH:
container_wait_base_errors.append(Resource.Status.CRASH_LOOPBACK_OFF)
container_terminated_base_errors.append(Resource.Status.CRASH_LOOPBACK_OFF)

LOGGER.info("Verifying no failed pods")
for pods in TimeoutSampler(
wait_timeout=timeout,
Expand All @@ -766,54 +776,48 @@ def verify_no_failed_pods(
isvc=isvc,
runtime_name=runtime_name,
):
ready_pods = 0
if not pods:
continue

failed_pods: dict[str, Any] = {}

container_wait_base_errors = ["InvalidImageName"]
container_terminated_base_errors = [Resource.Status.ERROR]
for pod in pods:
pod_status = pod.instance.status

# For Model Mesh, if image pulling takes longer, pod may be in CrashLoopBackOff state but recover with retries.
if (
deployment_mode := isvc.instance.metadata.annotations.get("serving.kserve.io/deploymentMode")
) and deployment_mode != KServeDeploymentType.MODEL_MESH:
container_wait_base_errors.append(Resource.Status.CRASH_LOOPBACK_OFF)
container_terminated_base_errors.append(Resource.Status.CRASH_LOOPBACK_OFF)

if pods:
for pod in pods:
for condition in pod.instance.status.conditions:
if condition.type == pod.Status.READY and condition.status == pod.Condition.Status.TRUE:
ready_pods += 1

if ready_pods == len(pods):
return
all_container_statuses = list(pod_status.get("initContainerStatuses", []) or []) + list(
pod_status.get("containerStatuses", []) or []
)

for pod in pods:
pod_status = pod.instance.status
for container_status in all_container_statuses:
is_waiting_error = (
wait_state := container_status.state.waiting
) and wait_state.reason in container_wait_base_errors

if pod_status.containerStatuses:
for container_status in pod_status.get("containerStatuses", []) + pod_status.get(
"initContainerStatuses", []
):
is_waiting_pull_back_off = (
wait_state := container_status.state.waiting
) and wait_state.reason in container_wait_base_errors
is_terminated_error = (
terminate_state := container_status.state.terminated
) and terminate_state.reason in container_terminated_base_errors

is_terminated_error = (
terminate_state := container_status.state.terminated
) and terminate_state.reason in container_terminated_base_errors
if is_waiting_error or is_terminated_error:
failed_pods[pod.name] = pod_status
break

if is_waiting_pull_back_off or is_terminated_error:
failed_pods[pod.name] = pod_status
if pod_status.phase in (pod.Status.CRASH_LOOPBACK_OFF, pod.Status.FAILED):
failed_pods[pod.name] = pod_status
Comment thread
mwaykole marked this conversation as resolved.

elif pod_status.phase in (
pod.Status.CRASH_LOOPBACK_OFF,
pod.Status.FAILED,
):
failed_pods[pod.name] = pod_status
if failed_pods:
raise FailedPodsError(pods=failed_pods)

ready_pods = sum(
1
for pod in pods
if any(
c.type == pod.Status.READY and c.status == pod.Condition.Status.TRUE
for c in (pod.instance.status.conditions or [])
)
)

if failed_pods:
raise FailedPodsError(pods=failed_pods)
if ready_pods == len(pods):
return


def check_pod_status_in_time(pod: Pod, status: set[str], duration: int = Timeout.TIMEOUT_2MIN, wait: int = 1) -> None:
Expand Down
2 changes: 1 addition & 1 deletion utilities/manifests/vllm.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
VLLM_INFERENCE_CONFIG = {
"default_query_model": {
"query_input": '"prompt": "At what temperature does Nitrogen boil?", "max_tokens": 100, "temperature": 0',
"query_output": r'{"id":"cmpl-[a-z0-9]+","object":"text_completion","created":\d+,"model":"$model_name","choices":\[{"index":0,"text":".*Theboilingpointofnitrogenis77.4.*","logprobs":null,"finish_reason":"length","stop_reason":null,"prompt_logprobs":null}\],"usage":{"prompt_tokens":10,"total_tokens":110,"completion_tokens":100,"prompt_tokens_details":null}}',
"query_output": r'{"id":"cmpl-[a-z0-9]+","object":"text_completion","created":\d+,"model":"$model_name","choices":\[{"index":0,"text":".*Theboilingpointofnitrogenis77.4.*","logprobs":null,"finish_reason":"length","stop_reason":null.*"prompt_logprobs":null.*}\].*"usage":{"prompt_tokens":10,"total_tokens":110,"completion_tokens":100,"prompt_tokens_details":null}.*}',
"use_regex": True
},
"completions": {
Expand Down