diff --git a/tests/model_serving/model_server/kserve/multi_node/conftest.py b/tests/model_serving/model_server/kserve/multi_node/conftest.py index 8765dadae..4e39ae7a4 100644 --- a/tests/model_serving/model_server/kserve/multi_node/conftest.py +++ b/tests/model_serving/model_server/kserve/multi_node/conftest.py @@ -3,6 +3,7 @@ import pytest from _pytest.fixtures import FixtureRequest +from kubernetes.client.rest import ApiException from kubernetes.dynamic import DynamicClient from ocp_resources.inference_service import InferenceService from ocp_resources.namespace import Namespace @@ -13,10 +14,13 @@ from ocp_resources.secret import Secret from ocp_resources.serving_runtime import ServingRuntime from pytest_testconfig import config as py_config +from simple_logger.logger import get_logger from timeout_sampler import TimeoutSampler +from tests.model_serving.model_server.kserve.multi_node.constants import WORKER_POD_ROLE from tests.model_serving.model_server.kserve.multi_node.utils import ( delete_multi_node_pod_by_role, + get_pods_by_isvc_generation, ) from utilities.constants import KServeDeploymentType, Labels, ModelCarImage, Protocols, Timeout from utilities.general import download_model_data @@ -28,6 +32,8 @@ ) from utilities.serving_runtime import ServingRuntimeFromTemplate +LOGGER = get_logger(name=__name__) + @pytest.fixture(scope="session") def nvidia_gpu_nodes(nodes: list[Node]) -> list[Node]: @@ -98,7 +104,7 @@ def multi_node_inference_service( storage_uri=f"pvc://{model_pvc.name}/{models_bucket_downloaded_model_data}", model_format=multi_node_serving_runtime.instance.spec.supportedModelFormats[0].name, deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT, - autoscaler_mode="external", + autoscaler_mode="none", multi_node_worker_spec={}, wait_for_predictor_pods=False, ) as isvc: @@ -137,7 +143,6 @@ def multi_node_oci_inference_service( ] } - # NOTE: In KServe v0.15, the autoscaler_mode needs to be updated to "none". with create_isvc( client=unprivileged_client, name=request.param["name"], @@ -146,7 +151,7 @@ def multi_node_oci_inference_service( storage_uri=ModelCarImage.GRANITE_8B_CODE_INSTRUCT, model_format=multi_node_serving_runtime.instance.spec.supportedModelFormats[0].name, deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT, - autoscaler_mode="external", + autoscaler_mode="none", resources=resources, multi_node_worker_spec=worker_resources, wait_for_predictor_pods=False, @@ -199,6 +204,7 @@ def patched_multi_node_isvc_external_route( @pytest.fixture(scope="function") def patched_multi_node_spec( request: FixtureRequest, + unprivileged_client: DynamicClient, multi_node_inference_service: InferenceService, ) -> Generator[InferenceService, Any, Any]: with ResourceEditor( @@ -210,6 +216,15 @@ def patched_multi_node_spec( } } ): + for sample in TimeoutSampler( + wait_timeout=Timeout.TIMEOUT_10MIN, + sleep=10, + func=get_pods_by_isvc_generation, + client=unprivileged_client, + isvc=multi_node_inference_service, + ): + if sample: + break yield multi_node_inference_service @@ -259,3 +274,87 @@ def deleted_multi_node_pod( isvc=multi_node_inference_service, timeout=Timeout.TIMEOUT_10MIN, ) + + wait_for_inference_deployment_replicas( + client=unprivileged_client, + isvc=multi_node_inference_service, + expected_num_deployments=2, + ) + + _warmup_inference_and_wait_for_recovery( + client=unprivileged_client, + isvc=multi_node_inference_service, + ) + + +def _warmup_inference_and_wait_for_recovery( + client: DynamicClient, + isvc: InferenceService, +) -> None: + """Probe the head pod with a real inference request until it succeeds. + + After a pod deletion+recovery cycle, the vLLM EngineCore may be in a corrupted + state that only crashes on the first real inference request. The /v1/models + endpoint returns 200 even when the engine is broken, so we must use + /v1/completions to exercise the distributed engine path. The liveness probe + then restarts the head pod, which takes several minutes. This function retries + until the completions endpoint responds with 200. + """ + probe_cmd = [ + "curl", + "-s", + "--max-time", + "15", + "-o", + "/dev/null", + "-w", + "%{http_code}", + "-H", + "Content-type:application/json", + "-d", + f'{{"model":"{isvc.name}","prompt":"test","max_tokens":1}}', + "http://localhost:8080/v1/completions", + ] + + for sample in TimeoutSampler( + wait_timeout=Timeout.TIMEOUT_10MIN, + sleep=30, + func=_probe_inference_health, + client=client, + isvc=isvc, + cmd=probe_cmd, + ): + if sample: + return + + +def _probe_inference_health( + client: DynamicClient, + isvc: InferenceService, + cmd: list[str], +) -> bool: + """Return True if the head pod's completions endpoint returns 200. + + Re-fetches pods each call to handle pod replacement during restarts. + """ + for pod in get_pods_by_isvc_label(client=client, isvc=isvc): + if WORKER_POD_ROLE in pod.name: + continue + + for condition in pod.instance.status.conditions or []: + if condition.type == "Ready" and condition.status != "True": + LOGGER.info(f"Head pod {pod.name} not Ready yet") + return False + + try: + result = pod.execute(command=cmd) + except (ApiException, OSError) as exc: + LOGGER.warning(f"Inference probe on {pod.name} failed: {exc}") + return False + else: + status = result.strip() + LOGGER.info(f"Inference probe on {pod.name}: HTTP {status}") + return status == "200" + + LOGGER.warning("No head pod found") + return False diff --git a/utilities/inference_utils.py b/utilities/inference_utils.py index 61feddd8f..5d86b0f18 100644 --- a/utilities/inference_utils.py +++ b/utilities/inference_utils.py @@ -475,6 +475,12 @@ def run_inference( f"Got {HTTPStatus.SERVICE_UNAVAILABLE} error." ) + if re.search(rf"http/1\.\d\s+{HTTPStatus.INTERNAL_SERVER_ERROR.value}\b", out.lower()): + raise InferenceResponseError( + f"Inference service at {self.get_inference_url()} returned " + f"{HTTPStatus.INTERNAL_SERVER_ERROR} error." + ) + else: sanitized_cmd = re.sub(r"('Authorization: Bearer ).*?(')", r"\1***REDACTED***2", cmd) raise ValueError(f"Inference failed with error: {err}\nOutput: {out}\nCommand: {sanitized_cmd}") diff --git a/utilities/infra.py b/utilities/infra.py index 3afedc144..6d5678fae 100644 --- a/utilities/infra.py +++ b/utilities/infra.py @@ -757,6 +757,16 @@ def verify_no_failed_pods( """ wait_for_isvc_pods(client=client, isvc=isvc, runtime_name=runtime_name) + container_wait_base_errors = ["InvalidImageName"] + container_terminated_base_errors = [Resource.Status.ERROR] + + # For Model Mesh, if image pulling takes longer, pod may be in CrashLoopBackOff state but recover with retries. + if ( + deployment_mode := isvc.instance.metadata.annotations.get("serving.kserve.io/deploymentMode") + ) and deployment_mode != KServeDeploymentType.MODEL_MESH: + container_wait_base_errors.append(Resource.Status.CRASH_LOOPBACK_OFF) + container_terminated_base_errors.append(Resource.Status.CRASH_LOOPBACK_OFF) + LOGGER.info("Verifying no failed pods") for pods in TimeoutSampler( wait_timeout=timeout, @@ -766,54 +776,48 @@ def verify_no_failed_pods( isvc=isvc, runtime_name=runtime_name, ): - ready_pods = 0 + if not pods: + continue + failed_pods: dict[str, Any] = {} - container_wait_base_errors = ["InvalidImageName"] - container_terminated_base_errors = [Resource.Status.ERROR] + for pod in pods: + pod_status = pod.instance.status - # For Model Mesh, if image pulling takes longer, pod may be in CrashLoopBackOff state but recover with retries. - if ( - deployment_mode := isvc.instance.metadata.annotations.get("serving.kserve.io/deploymentMode") - ) and deployment_mode != KServeDeploymentType.MODEL_MESH: - container_wait_base_errors.append(Resource.Status.CRASH_LOOPBACK_OFF) - container_terminated_base_errors.append(Resource.Status.CRASH_LOOPBACK_OFF) - - if pods: - for pod in pods: - for condition in pod.instance.status.conditions: - if condition.type == pod.Status.READY and condition.status == pod.Condition.Status.TRUE: - ready_pods += 1 - - if ready_pods == len(pods): - return + all_container_statuses = list(pod_status.get("initContainerStatuses", []) or []) + list( + pod_status.get("containerStatuses", []) or [] + ) - for pod in pods: - pod_status = pod.instance.status + for container_status in all_container_statuses: + is_waiting_error = ( + wait_state := container_status.state.waiting + ) and wait_state.reason in container_wait_base_errors - if pod_status.containerStatuses: - for container_status in pod_status.get("containerStatuses", []) + pod_status.get( - "initContainerStatuses", [] - ): - is_waiting_pull_back_off = ( - wait_state := container_status.state.waiting - ) and wait_state.reason in container_wait_base_errors + is_terminated_error = ( + terminate_state := container_status.state.terminated + ) and terminate_state.reason in container_terminated_base_errors - is_terminated_error = ( - terminate_state := container_status.state.terminated - ) and terminate_state.reason in container_terminated_base_errors + if is_waiting_error or is_terminated_error: + failed_pods[pod.name] = pod_status + break - if is_waiting_pull_back_off or is_terminated_error: - failed_pods[pod.name] = pod_status + if pod_status.phase in (pod.Status.CRASH_LOOPBACK_OFF, pod.Status.FAILED): + failed_pods[pod.name] = pod_status - elif pod_status.phase in ( - pod.Status.CRASH_LOOPBACK_OFF, - pod.Status.FAILED, - ): - failed_pods[pod.name] = pod_status + if failed_pods: + raise FailedPodsError(pods=failed_pods) + + ready_pods = sum( + 1 + for pod in pods + if any( + c.type == pod.Status.READY and c.status == pod.Condition.Status.TRUE + for c in (pod.instance.status.conditions or []) + ) + ) - if failed_pods: - raise FailedPodsError(pods=failed_pods) + if ready_pods == len(pods): + return def check_pod_status_in_time(pod: Pod, status: set[str], duration: int = Timeout.TIMEOUT_2MIN, wait: int = 1) -> None: diff --git a/utilities/manifests/vllm.py b/utilities/manifests/vllm.py index 87b05322a..91cc5d5e5 100644 --- a/utilities/manifests/vllm.py +++ b/utilities/manifests/vllm.py @@ -1,7 +1,7 @@ VLLM_INFERENCE_CONFIG = { "default_query_model": { "query_input": '"prompt": "At what temperature does Nitrogen boil?", "max_tokens": 100, "temperature": 0', - "query_output": r'{"id":"cmpl-[a-z0-9]+","object":"text_completion","created":\d+,"model":"$model_name","choices":\[{"index":0,"text":".*Theboilingpointofnitrogenis77.4.*","logprobs":null,"finish_reason":"length","stop_reason":null,"prompt_logprobs":null}\],"usage":{"prompt_tokens":10,"total_tokens":110,"completion_tokens":100,"prompt_tokens_details":null}}', + "query_output": r'{"id":"cmpl-[a-z0-9]+","object":"text_completion","created":\d+,"model":"$model_name","choices":\[{"index":0,"text":".*Theboilingpointofnitrogenis77.4.*","logprobs":null,"finish_reason":"length","stop_reason":null.*"prompt_logprobs":null.*}\].*"usage":{"prompt_tokens":10,"total_tokens":110,"completion_tokens":100,"prompt_tokens_details":null}.*}', "use_regex": True }, "completions": {