diff --git a/pytest.ini b/pytest.ini index 80c149a5e..85ed14ab2 100644 --- a/pytest.ini +++ b/pytest.ini @@ -33,6 +33,7 @@ markers = gpu: Mark tests which require GPU resources multinode: Mark tests which require multiple nodes keda: Mark tests which are testing KEDA scaling + llmd_cpu: Mark tests which are testing LLMD (LLM Deployment) with CPU resources # Model Registry: custom_namespace: mark tests that are to be run with custom namespace diff --git a/tests/model_serving/model_server/llmd/__init__.py b/tests/model_serving/model_server/llmd/__init__.py new file mode 100644 index 000000000..98274df1a --- /dev/null +++ b/tests/model_serving/model_server/llmd/__init__.py @@ -0,0 +1 @@ +"""LLMD (LLM Deployment) test module for OpenDataHub and OpenShift AI.""" diff --git a/tests/model_serving/model_server/llmd/conftest.py b/tests/model_serving/model_server/llmd/conftest.py new file mode 100644 index 000000000..05ac74284 --- /dev/null +++ b/tests/model_serving/model_server/llmd/conftest.py @@ -0,0 +1,185 @@ +from typing import Generator + +import pytest +from _pytest.fixtures import FixtureRequest +from kubernetes.dynamic import DynamicClient +from ocp_resources.gateway import Gateway +from ocp_resources.llm_inference_service import LLMInferenceService +from ocp_resources.namespace import Namespace +from ocp_resources.secret import Secret +from ocp_resources.service_account import ServiceAccount + +from utilities.constants import Timeout +from utilities.infra import s3_endpoint_secret +from utilities.llmd_utils import create_llmd_gateway, create_llmisvc +from utilities.llmd_constants import ( + DEFAULT_GATEWAY_NAMESPACE, + VLLM_STORAGE_OCI, + VLLM_CPU_IMAGE, + DEFAULT_S3_STORAGE_PATH, +) + + +@pytest.fixture(scope="class") +def gateway_namespace(admin_client: DynamicClient) -> str: + return DEFAULT_GATEWAY_NAMESPACE + + +@pytest.fixture(scope="class") +def llmd_s3_secret( + admin_client: DynamicClient, + unprivileged_model_namespace: Namespace, + aws_access_key_id: str, + aws_secret_access_key: str, + models_s3_bucket_name: str, + models_s3_bucket_region: str, + models_s3_bucket_endpoint: str, +) -> Generator[Secret, None, None]: + with s3_endpoint_secret( + client=admin_client, + name="llmd-s3-secret", + namespace=unprivileged_model_namespace.name, + aws_access_key=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + aws_s3_region=models_s3_bucket_region, + aws_s3_bucket=models_s3_bucket_name, + aws_s3_endpoint=models_s3_bucket_endpoint, + ) as secret: + yield secret + + +@pytest.fixture(scope="class") +def llmd_s3_service_account( + admin_client: DynamicClient, llmd_s3_secret: Secret +) -> Generator[ServiceAccount, None, None]: + with ServiceAccount( + client=admin_client, + namespace=llmd_s3_secret.namespace, + name="llmd-s3-service-account", + secrets=[{"name": llmd_s3_secret.name}], + ) as sa: + yield sa + + +@pytest.fixture(scope="class") +def llmd_gateway( + request: FixtureRequest, + admin_client: DynamicClient, + gateway_namespace: str, +) -> Generator[Gateway, None, None]: + """ + Pytest fixture for LLMD Gateway management using create_llmd_gateway. + + Implements persistent LLMD gateway strategy: + - Reuses existing gateways if available + - Creates new gateway only if needed + - Does not delete gateway in teardown + - Uses LLMD-specific gateway configuration + """ + if isinstance(request.param, str): + gateway_class_name = request.param + kwargs = {} + else: + gateway_class_name = request.param.get("gateway_class_name", "openshift-default") + kwargs = {k: v for k, v in request.param.items() if k != "gateway_class_name"} + + with create_llmd_gateway( + client=admin_client, + namespace=gateway_namespace, + gateway_class_name=gateway_class_name, + wait_for_condition=True, + timeout=Timeout.TIMEOUT_5MIN, + teardown=False, # Don't delete gateway in teardown + **kwargs, + ) as gateway: + yield gateway + + +@pytest.fixture(scope="class") +def llmd_inference_service( + request: FixtureRequest, + admin_client: DynamicClient, + unprivileged_model_namespace: Namespace, +) -> Generator[LLMInferenceService, None, None]: + if isinstance(request.param, str): + name_suffix = request.param + kwargs = {} + else: + name_suffix = request.param.get("name_suffix", "basic") + kwargs = {k: v for k, v in request.param.items() if k != "name_suffix"} + + service_name = kwargs.get("name", f"llm-{name_suffix}") + + if "llmd_gateway" in request.fixturenames: + request.getfixturevalue(argname="llmd_gateway") + container_resources = kwargs.get( + "container_resources", + { + "limits": {"cpu": "1", "memory": "10Gi"}, + "requests": {"cpu": "100m", "memory": "8Gi"}, + }, + ) + + with create_llmisvc( + client=admin_client, + name=service_name, + namespace=unprivileged_model_namespace.name, + storage_uri=kwargs.get("storage_uri", VLLM_STORAGE_OCI), + container_image=kwargs.get("container_image", VLLM_CPU_IMAGE), + container_resources=container_resources, + wait=True, + timeout=Timeout.TIMEOUT_15MIN, + **{k: v for k, v in kwargs.items() if k != "name"}, + ) as llm_service: + yield llm_service + + +@pytest.fixture(scope="class") +def llmd_inference_service_s3( + request: FixtureRequest, + admin_client: DynamicClient, + unprivileged_model_namespace: Namespace, + llmd_s3_secret: Secret, + llmd_s3_service_account: ServiceAccount, +) -> Generator[LLMInferenceService, None, None]: + if isinstance(request.param, str): + name_suffix = request.param + kwargs = {"storage_path": DEFAULT_S3_STORAGE_PATH} + else: + name_suffix = request.param.get("name_suffix", "s3") + kwargs = {k: v for k, v in request.param.items() if k != "name_suffix"} + + service_name = kwargs.get("name", f"llm-{name_suffix}") + + if "storage_key" not in kwargs: + kwargs["storage_key"] = llmd_s3_secret.name + + if "storage_path" not in kwargs: + kwargs["storage_path"] = DEFAULT_S3_STORAGE_PATH + + container_resources = kwargs.get( + "container_resources", + { + "limits": {"cpu": "1", "memory": "10Gi"}, + "requests": {"cpu": "100m", "memory": "8Gi"}, + }, + ) + + with create_llmisvc( + client=admin_client, + name=service_name, + namespace=unprivileged_model_namespace.name, + storage_key=kwargs.get("storage_key"), + storage_path=kwargs.get("storage_path"), + container_image=kwargs.get("container_image", VLLM_CPU_IMAGE), + container_resources=container_resources, + service_account=llmd_s3_service_account.name, + wait=True, + timeout=Timeout.TIMEOUT_15MIN, + **{ + k: v + for k, v in kwargs.items() + if k not in ["name", "storage_key", "storage_path", "container_image", "container_resources"] + }, + ) as llm_service: + yield llm_service diff --git a/tests/model_serving/model_server/llmd/test_llmd_oci_cpu.py b/tests/model_serving/model_server/llmd/test_llmd_oci_cpu.py new file mode 100644 index 000000000..57481b29f --- /dev/null +++ b/tests/model_serving/model_server/llmd/test_llmd_oci_cpu.py @@ -0,0 +1,40 @@ +import pytest + +from tests.model_serving.model_server.llmd.utils import ( + verify_llm_service_status, + verify_gateway_status, + verify_llmd_pods_not_restarted, +) +from utilities.constants import Protocols +from utilities.llmd_utils import verify_inference_response_llmd +from utilities.llmd_constants import BASIC_LLMD_PARAMS +from utilities.manifests.opt125m_cpu import OPT125M_CPU_INFERENCE_CONFIG + +pytestmark = [ + pytest.mark.llmd_cpu, +] + + +@pytest.mark.parametrize( + "unprivileged_model_namespace, llmd_gateway, llmd_inference_service", + BASIC_LLMD_PARAMS, + indirect=True, +) +class TestLLMDOCICPUInference: + """LLMD inference testing with OCI storage and CPU runtime using vLLM.""" + + def test_llmd_oci(self, admin_client, llmd_gateway, llmd_inference_service): + """Test LLMD inference with OCI storage and CPU runtime.""" + assert verify_gateway_status(llmd_gateway), "Gateway should be ready" + assert verify_llm_service_status(llmd_inference_service), "LLMInferenceService should be ready" + + verify_inference_response_llmd( + llm_service=llmd_inference_service, + inference_config=OPT125M_CPU_INFERENCE_CONFIG, + inference_type="chat_completions", + protocol=Protocols.HTTP, + use_default_query=True, + insecure=True, + ) + + verify_llmd_pods_not_restarted(client=admin_client, llm_service=llmd_inference_service) diff --git a/tests/model_serving/model_server/llmd/test_llmd_s3.py b/tests/model_serving/model_server/llmd/test_llmd_s3.py new file mode 100644 index 000000000..ec09e3257 --- /dev/null +++ b/tests/model_serving/model_server/llmd/test_llmd_s3.py @@ -0,0 +1,40 @@ +import pytest + +from tests.model_serving.model_server.llmd.utils import ( + verify_llm_service_status, + verify_gateway_status, + verify_llmd_pods_not_restarted, +) +from utilities.constants import Protocols +from utilities.llmd_utils import verify_inference_response_llmd +from utilities.manifests.opt125m_cpu import OPT125M_CPU_INFERENCE_CONFIG + +pytestmark = [ + pytest.mark.llmd_cpu, +] + + +@pytest.mark.parametrize( + "unprivileged_model_namespace, llmd_gateway, llmd_inference_service_s3", + [({"name": "llmd-s3-test"}, "openshift-default", {"storage_path": "opt-125m/"})], + indirect=True, +) +@pytest.mark.usefixtures("valid_aws_config") +class TestLLMDS3Inference: + """LLMD inference testing with S3 storage.""" + + def test_llmd_s3(self, admin_client, llmd_gateway, llmd_inference_service_s3): + """Test LLMD inference with S3 storage.""" + assert verify_gateway_status(llmd_gateway), "Gateway should be ready" + assert verify_llm_service_status(llmd_inference_service_s3), "LLMInferenceService should be ready" + + verify_inference_response_llmd( + llm_service=llmd_inference_service_s3, + inference_config=OPT125M_CPU_INFERENCE_CONFIG, + inference_type="chat_completions", + protocol=Protocols.HTTP, + use_default_query=True, + insecure=True, + ) + + verify_llmd_pods_not_restarted(client=admin_client, llm_service=llmd_inference_service_s3) diff --git a/tests/model_serving/model_server/llmd/utils.py b/tests/model_serving/model_server/llmd/utils.py new file mode 100644 index 000000000..337872645 --- /dev/null +++ b/tests/model_serving/model_server/llmd/utils.py @@ -0,0 +1,109 @@ +""" +Utility functions for LLM Deployment (LLMD) tests. + +This module provides helper functions for LLMD test operations using ocp_resources. +Follows the established model server utils pattern for consistency. +""" + +from kubernetes.dynamic import DynamicClient +from ocp_resources.gateway import Gateway +from ocp_resources.llm_inference_service import LLMInferenceService +from ocp_resources.pod import Pod +from simple_logger.logger import get_logger + +from utilities.exceptions import PodContainersRestartError + + +LOGGER = get_logger(name=__name__) + + +def verify_gateway_status(gateway: Gateway) -> bool: + """ + Verify that a Gateway is properly configured and programmed. + + Args: + gateway (Gateway): The Gateway resource to verify + + Returns: + bool: True if gateway is properly configured, False otherwise + """ + if not gateway.exists: + LOGGER.warning(f"Gateway {gateway.name} does not exist") + return False + + conditions = gateway.instance.status.get("conditions", []) + for condition in conditions: + if condition["type"] == "Programmed" and condition["status"] == "True": + LOGGER.info(f"Gateway {gateway.name} is programmed and ready") + return True + + LOGGER.warning(f"Gateway {gateway.name} is not in Programmed state") + return False + + +def verify_llm_service_status(llm_service: LLMInferenceService) -> bool: + """ + Verify that an LLMInferenceService is properly configured and ready. + + Args: + llm_service (LLMInferenceService): The LLMInferenceService resource to verify + + Returns: + bool: True if service is properly configured, False otherwise + """ + if not llm_service.exists: + LOGGER.warning(f"LLMInferenceService {llm_service.name} does not exist") + return False + + conditions = llm_service.instance.status.get("conditions", []) + for condition in conditions: + if condition["type"] == "Ready" and condition["status"] == "True": + LOGGER.info(f"LLMInferenceService {llm_service.name} is ready") + return True + + LOGGER.warning(f"LLMInferenceService {llm_service.name} is not in Ready state") + return False + + +def verify_llmd_pods_not_restarted(client: DynamicClient, llm_service: LLMInferenceService) -> None: + """ + Verify that LLMD inference pods containers have not restarted. + + This function checks for container restarts in pods related to the specific LLMInferenceService. + + Args: + client (DynamicClient): DynamicClient instance + llm_service (LLMInferenceService): The LLMInferenceService to check pods for + + Raises: + PodContainersRestartError: If any containers in LLMD pods have restarted + """ + LOGGER.info(f"Verifying that pods for LLMInferenceService {llm_service.name} have not restarted") + + restarted_containers = {} + + for pod in Pod.get( + dyn_client=client, + namespace=llm_service.namespace, + label_selector=( + f"{Pod.ApiGroup.APP_KUBERNETES_IO}/part-of=llminferenceservice," + f"{Pod.ApiGroup.APP_KUBERNETES_IO}/name={llm_service.name}" + ), + ): + labels = pod.instance.metadata.get("labels", {}) + if labels.get("kserve.io/component") == "workload": + LOGGER.debug(f"Checking pod {pod.name} for container restarts") + + if pod.instance.status.containerStatuses: + if _restarted_containers := [ + container.name for container in pod.instance.status.containerStatuses if container.restartCount > 0 + ]: + restarted_containers[pod.name] = _restarted_containers + LOGGER.warning(f"Pod {pod.name} has restarted containers: {_restarted_containers}") + + if restarted_containers: + error_msg = f"LLMD inference containers restarted for {llm_service.name}: {restarted_containers}" + LOGGER.error(error_msg) + raise PodContainersRestartError(error_msg) + + LOGGER.info(f"All pods for LLMInferenceService {llm_service.name} have no container restarts") diff --git a/utilities/llmd_constants.py b/utilities/llmd_constants.py new file mode 100644 index 000000000..90751f30b --- /dev/null +++ b/utilities/llmd_constants.py @@ -0,0 +1,30 @@ +"""Centralized constants for LLMD (LLM Deployment) utilities and tests.""" + +from utilities.constants import Timeout + +DEFAULT_GATEWAY_NAME = "openshift-ai-inference" +DEFAULT_GATEWAY_NAMESPACE = "openshift-ingress" +OPENSHIFT_DEFAULT_GATEWAY_CLASS = "openshift-default" + +KSERVE_GATEWAY_LABEL = "serving.kserve.io/gateway" +KSERVE_INGRESS_GATEWAY = "kserve-ingress-gateway" + +DEFAULT_LLM_ENDPOINT = "/v1/chat/completions" +DEFAULT_MAX_TOKENS = 50 +DEFAULT_TEMPERATURE = 0.0 +DEFAULT_TIMEOUT = Timeout.TIMEOUT_30SEC + +VLLM_STORAGE_OCI = "oci://quay.io/mwaykole/test:opt-125m" +VLLM_CPU_IMAGE = "quay.io/pierdipi/vllm-cpu:latest" +DEFAULT_LLMD_REPLICAS = 1 +DEFAULT_S3_STORAGE_PATH = "opt-125m" + +DEFAULT_STORAGE_URI = VLLM_STORAGE_OCI +DEFAULT_CONTAINER_IMAGE = VLLM_CPU_IMAGE + +DEFAULT_CPU_LIMIT = "1" +DEFAULT_MEMORY_LIMIT = "10Gi" +DEFAULT_CPU_REQUEST = "100m" +DEFAULT_MEMORY_REQUEST = "8Gi" + +BASIC_LLMD_PARAMS = [({"name": "llmd-comprehensive-test"}, "openshift-default", "basic")] diff --git a/utilities/llmd_utils.py b/utilities/llmd_utils.py new file mode 100644 index 000000000..62db07bd7 --- /dev/null +++ b/utilities/llmd_utils.py @@ -0,0 +1,655 @@ +"""Utilities for LLM Deployment (LLMD) resources.""" + +import json +import re +import shlex +from contextlib import contextmanager +from string import Template +from typing import Any, Dict, Generator, Optional + +from kubernetes.dynamic import DynamicClient +from ocp_resources.gateway import Gateway +from ocp_resources.llm_inference_service import LLMInferenceService +from pyhelper_utils.shell import run_command +from simple_logger.logger import get_logger +from timeout_sampler import retry, TimeoutWatch + +from utilities.certificates_utils import get_ca_bundle +from utilities.constants import HTTPRequest, Timeout +from utilities.exceptions import InferenceResponseError +from utilities.infra import get_services_by_isvc_label +from utilities.llmd_constants import ( + DEFAULT_GATEWAY_NAME, + DEFAULT_GATEWAY_NAMESPACE, + OPENSHIFT_DEFAULT_GATEWAY_CLASS, + DEFAULT_LLM_ENDPOINT, + DEFAULT_TIMEOUT, +) + +LOGGER = get_logger(name=__name__) + + +@contextmanager +def create_llmd_gateway( + client: DynamicClient, + name: str = DEFAULT_GATEWAY_NAME, + namespace: str = DEFAULT_GATEWAY_NAMESPACE, + gateway_class_name: str = OPENSHIFT_DEFAULT_GATEWAY_CLASS, + listeners: Optional[list[Dict[str, Any]]] = None, + infrastructure: Optional[Dict[str, Any]] = None, + wait_for_condition: bool = True, + timeout: int = 300, + teardown: bool = False, +) -> Generator[Gateway, None, None]: + """ + Context manager to create and manage LLMD Gateway resources using ocp_resources. + + This function implements smart LLMD gateway management: + - Only creates gateway if it doesn't already exist + - Reuses existing gateways to avoid conflicts + - Does not delete gateway in teardown (persistent gateway strategy) + - Specifically designed for LLMD (LLM Deployment) infrastructure + + Args: + client: Kubernetes dynamic client + name: Gateway name (defaults to openshift-ai-inference) + namespace: Gateway namespace (defaults to openshift-ingress) + gateway_class_name: The name of the GatewayClass resource + listeners: List of listener configurations + infrastructure: Infrastructure configuration + wait_for_condition: Whether to wait for the gateway to be programmed + timeout: Timeout in seconds for waiting + teardown: Whether to clean up the resource (default: False for persistent strategy) + + Yields: + Gateway: The Gateway resource (existing or newly created) + """ + if listeners is None: + listeners = [ + { + "name": "http", + "port": 80, + "protocol": "HTTP", + "allowedRoutes": {"namespaces": {"from": "All"}}, + } + ] + + if infrastructure is None: + infrastructure = {"labels": {"serving.kserve.io/gateway": "kserve-ingress-gateway"}} + + # Check if gateway already exists + try: + existing_gateway = Gateway( + client=client, + name=name, + namespace=namespace, + api_group="gateway.networking.k8s.io", + ) + if existing_gateway.exists: + LOGGER.info(f"Using existing Gateway {name} in namespace {namespace}") + + if wait_for_condition: + LOGGER.info(f"Waiting for existing Gateway {name} to be programmed...") + existing_gateway.wait_for_condition( + condition="Programmed", + status="True", + timeout=timeout, + ) + LOGGER.info(f"Existing Gateway {name} is programmed and ready") + + # Yield the existing gateway without teardown + yield existing_gateway + return + except Exception as e: + LOGGER.debug(f"No existing Gateway found, will create new one: {e}") + + # Create new gateway only if it doesn't exist + gateway_body = { + "apiVersion": "gateway.networking.k8s.io/v1", + "kind": "Gateway", + "metadata": { + "name": name, + "namespace": namespace, + }, + "spec": { + "gatewayClassName": gateway_class_name, + "listeners": listeners, + "infrastructure": infrastructure, + }, + } + + LOGGER.info(f"Creating new Gateway {name} in namespace {namespace}") + with Gateway( + client=client, + teardown=teardown, + kind_dict=gateway_body, + api_group="gateway.networking.k8s.io", + ) as gateway: + if wait_for_condition: + LOGGER.info(f"Waiting for Gateway {name} to be programmed...") + gateway.wait_for_condition( + condition="Programmed", + status="True", + timeout=timeout, + ) + LOGGER.info(f"Gateway {name} is programmed and ready") + + yield gateway + + +@contextmanager +def create_llmisvc( + client: DynamicClient, + name: str, + namespace: str, + storage_uri: Optional[str] = None, + storage_key: Optional[str] = None, + storage_path: Optional[str] = None, + replicas: int = 1, + wait: bool = True, + enable_auth: bool = False, + router_config: Optional[Dict[str, Any]] = None, + container_image: Optional[str] = None, + container_resources: Optional[Dict[str, Any]] = None, + container_env: Optional[list[Dict[str, str]]] = None, + liveness_probe: Optional[Dict[str, Any]] = None, + readiness_probe: Optional[Dict[str, Any]] = None, + image_pull_secrets: Optional[list[str]] = None, + service_account: Optional[str] = None, + volumes: Optional[list[Dict[str, Any]]] = None, + volume_mounts: Optional[list[Dict[str, Any]]] = None, + annotations: Optional[Dict[str, str]] = None, + labels: Optional[Dict[str, str]] = None, + timeout: int = Timeout.TIMEOUT_15MIN, + teardown: bool = True, +) -> Generator[LLMInferenceService, Any, None]: + """ + Create LLMInferenceService object following the pattern of create_isvc. + + Args: + client: DynamicClient object + name: LLMInferenceService name + namespace: Namespace name + storage_uri: Storage URI (e.g., 'oci://quay.io/user/model:tag') - used if storage_key/storage_path not provided + storage_key: S3 secret name for authentication (alternative to storage_uri) + storage_path: S3 path to model (alternative to storage_uri) + replicas: Number of replicas + wait: Wait for LLMInferenceService to be ready + enable_auth: Enable authentication + router_config: Router configuration (scheduler, route, gateway) + container_image: Container image + container_resources: Container resource requirements + container_env: Container environment variables + liveness_probe: Liveness probe configuration + readiness_probe: Readiness probe configuration + image_pull_secrets: Image pull secrets + service_account: Service account name + volumes: Volume configurations + volume_mounts: Volume mount configurations + annotations: Additional annotations + labels: Additional labels + timeout: Timeout for waiting + teardown: Whether to clean up on exit + + Yields: + LLMInferenceService: LLMInferenceService object + """ + if labels is None: + labels = {} + + if annotations is None: + annotations = {} + + if storage_key and storage_path: + model_config = { + "uri": f"s3://ods-ci-wisdom/{storage_path}", + } + elif storage_uri: + model_config = { + "uri": storage_uri, + } + else: + raise ValueError("Provide either storage_uri or (storage_key and storage_path) for the model") + + if router_config is None: + router_config = {"scheduler": {}, "route": {}, "gateway": {}} + + if container_resources is None: + raise ValueError("container_resources must be provided for LLMInferenceService") + + if container_env is None: + container_env = [{"name": "VLLM_LOGGING_LEVEL", "value": "DEBUG"}] + template_config: Dict[str, Any] = {"containers": []} + + main_container: Dict[str, Any] = {"name": "main"} + + if container_image: + main_container["image"] = container_image + else: + raise ValueError("container_image must be provided for LLMInferenceService") + + if container_resources: + main_container["resources"] = container_resources + + if container_env: + main_container["env"] = container_env + + if liveness_probe: + main_container["livenessProbe"] = liveness_probe + + if readiness_probe: + main_container["readinessProbe"] = readiness_probe + + if volume_mounts: + main_container["volumeMounts"] = volume_mounts + + template_config["containers"].append(main_container) + + if volumes: + template_config["volumes"] = volumes + + if service_account: + template_config["serviceAccountName"] = service_account + + if image_pull_secrets: + template_config["imagePullSecrets"] = [{"name": secret} for secret in image_pull_secrets] + + if enable_auth: + annotations["serving.kserve.io/auth"] = "true" + + LOGGER.info(f"Creating LLMInferenceService {name} in namespace {namespace}") + + with LLMInferenceService( + client=client, + name=name, + namespace=namespace, + model=model_config, + replicas=replicas, + router=router_config, + template=template_config, + annotations=annotations, + label=labels, + teardown=teardown, + ) as llm_service: + timeout_watch = TimeoutWatch(timeout=timeout) + + if wait: + LOGGER.info(f"Waiting for LLMInferenceService {name} to be ready...") + llm_service.wait_for_condition( + condition="Ready", + status="True", + timeout=timeout_watch.remaining_time(), + ) + LOGGER.info(f"LLMInferenceService {name} is ready") + + yield llm_service + + +def get_llm_inference_url(llm_service: LLMInferenceService) -> str: + """ + Get the inference URL for an LLMInferenceService. + + This function attempts to resolve the URL in the following order: + 1. External URL from service status + 2. Service discovery via labels + 3. Fallback to service name pattern + + Args: + llm_service: The LLMInferenceService resource + + Returns: + str: The inference URL (full URL including protocol and path) + + Raises: + ValueError: If the inference URL cannot be determined + """ + if llm_service.instance.status and llm_service.instance.status.get("url"): + url = llm_service.instance.status["url"] + LOGGER.debug(f"Using external URL for {llm_service.name}: {url}") + return url + + try: + services = get_services_by_isvc_label( + client=llm_service.client, + isvc=llm_service, + runtime_name=None, + ) + if services: + internal_url = f"http://{services[0].name}.{llm_service.namespace}.svc.cluster.local" + LOGGER.debug(f"Using service discovery URL for {llm_service.name}: {internal_url}") + return internal_url + except Exception as e: + LOGGER.warning(f"Could not get service for LLMInferenceService {llm_service.name}: {e}") + fallback_url = f"http://{llm_service.name}.{llm_service.namespace}.svc.cluster.local" + LOGGER.debug(f"Using fallback URL for {llm_service.name}: {fallback_url}") + return fallback_url + + +def verify_inference_response_llmd( + llm_service: LLMInferenceService, + inference_config: Dict[str, Any], + inference_type: str, + protocol: str, + model_name: Optional[str] = None, + inference_input: Optional[Any] = None, + use_default_query: bool = False, + expected_response_text: Optional[str] = None, + insecure: bool = False, + token: Optional[str] = None, + authorized_user: Optional[bool] = None, +) -> None: + """ + Verify the LLM inference response following the pattern of verify_inference_response. + + Args: + llm_service: LLMInferenceService resource to test + inference_config: Inference configuration dictionary + inference_type: Type of inference ('infer', 'streaming', etc.) + protocol: Protocol to use ('http', 'grpc') + model_name: Name of the model (defaults to service name) + inference_input: Input for inference (optional) + use_default_query: Whether to use default query from config + expected_response_text: Expected response text for validation + insecure: Whether to use insecure connections + token: Authentication token (optional) + authorized_user: Whether user should be authorized (optional) + + Raises: + InferenceResponseError: If inference response is invalid + ValueError: If inference response validation fails + """ + + model_name = model_name or llm_service.name + inference = LLMUserInference( + llm_service=llm_service, + inference_config=inference_config, + inference_type=inference_type, + protocol=protocol, + ) + + res = inference.run_inference_flow( + model_name=model_name, + inference_input=inference_input, + use_default_query=use_default_query, + token=token, + insecure=insecure, + ) + + if authorized_user is False: + _validate_unauthorized_response(res=res, token=token, inference=inference) + else: + _validate_authorized_response( + res=res, + inference=inference, + inference_config=inference_config, + inference_type=inference_type, + expected_response_text=expected_response_text, + use_default_query=use_default_query, + model_name=model_name, + ) + + +class LLMUserInference: + """ + LLM-specific inference handler following the pattern of UserInference. + """ + + STREAMING = "streaming" + INFER = "infer" + + def __init__( + self, + llm_service: LLMInferenceService, + inference_config: Dict[str, Any], + inference_type: str, + protocol: str, + ) -> None: + self.llm_service = llm_service + self.inference_config = inference_config + self.inference_type = inference_type + self.protocol = protocol + self.runtime_config = self.get_runtime_config() + + def get_runtime_config(self) -> Dict[str, Any]: + """Get runtime config from inference config based on inference type and protocol.""" + if inference_type_config := self.inference_config.get(self.inference_type): + protocol = "http" if self.protocol.lower() in ["http", "https"] else self.protocol + if data := inference_type_config.get(protocol): + return data + else: + raise ValueError(f"Protocol {protocol} not supported for inference type {self.inference_type}") + else: + raise ValueError(f"Inference type {self.inference_type} not supported in config") + + @property + def inference_response_text_key_name(self) -> Optional[str]: + """Get inference response text key name from runtime config.""" + return self.runtime_config.get("response_fields_map", {}).get("response_output") + + @property + def inference_response_key_name(self) -> str: + """Get inference response key name from runtime config.""" + return self.runtime_config.get("response_fields_map", {}).get("response", "output") + + def get_inference_body( + self, + model_name: str, + inference_input: Optional[Any] = None, + use_default_query: bool = False, + ) -> str: + """Get inference body for LLM request.""" + if not use_default_query and inference_input is None: + raise ValueError("Either pass `inference_input` or set `use_default_query` to True") + + if use_default_query: + default_query_config = self.inference_config.get("default_query_model") + if not default_query_config: + raise ValueError(f"Missing default query config for {model_name}") + + if self.inference_config.get("support_multi_default_queries"): + query_config = default_query_config.get(self.inference_type) + if not query_config: + raise ValueError(f"Missing default query for inference type {self.inference_type}") + template_str = query_config.get("query_input", "") + else: + template_str = default_query_config.get("query_input", "") + + # Use template substitution for model name + template = Template(template=template_str) + body = template.safe_substitute(model_name=model_name) + else: + # For custom input, create OpenAI-compatible format + if isinstance(inference_input, str): + body = json.dumps({ + "model": model_name, + "messages": [{"role": "user", "content": inference_input}], + "max_tokens": 100, + "temperature": 0.0, + }) + else: + body = json.dumps(inference_input) + + return body + + def generate_command( + self, + model_name: str, + inference_input: Optional[str] = None, + use_default_query: bool = False, + insecure: bool = False, + token: Optional[str] = None, + ) -> str: + """Generate curl command string for LLM inference.""" + base_url = get_llm_inference_url(llm_service=self.llm_service) + endpoint_url = f"{base_url}{DEFAULT_LLM_ENDPOINT}" + + body = self.get_inference_body( + model_name=model_name, + inference_input=inference_input, + use_default_query=use_default_query, + ) + + header = HTTPRequest.CONTENT_JSON.replace("-H ", "").replace("'", "") + cmd_exec = "curl -i -s" + cmd = f"{cmd_exec} -X POST -d '{body}' -H {header} -H 'Accept: application/json'" + + if token: + cmd += f" {HTTPRequest.AUTH_HEADER.format(token=token)}" + + if insecure: + cmd += " --insecure" + else: + try: + from ocp_resources.resource import get_client + + client = get_client() + ca_bundle = get_ca_bundle(client=client, deployment_mode="raw") + if ca_bundle: + cmd += f" --cacert {ca_bundle}" + else: + cmd += " --insecure" + except Exception: + cmd += " --insecure" + + cmd += f" --max-time {DEFAULT_TIMEOUT} {endpoint_url}" + return cmd + + @retry(wait_timeout=Timeout.TIMEOUT_30SEC, sleep=5) + def run_inference( + self, + model_name: str, + inference_input: Optional[str] = None, + use_default_query: bool = False, + insecure: bool = False, + token: Optional[str] = None, + ) -> str: + """Run inference command and return raw output.""" + cmd = self.generate_command( + model_name=model_name, + inference_input=inference_input, + use_default_query=use_default_query, + insecure=insecure, + token=token, + ) + + res, out, err = run_command(command=shlex.split(cmd), verify_stderr=False, check=False) + if res: + return out + raise ValueError(f"Inference failed with error: {err}\nOutput: {out}\nCommand: {cmd}") + + def run_inference_flow( + self, + model_name: str, + inference_input: Optional[str] = None, + use_default_query: bool = False, + insecure: bool = False, + token: Optional[str] = None, + ) -> Dict[str, Any]: + """Run LLM inference using the same high-level flow as inference_utils.""" + out = self.run_inference( + model_name=model_name, + inference_input=inference_input, + use_default_query=use_default_query, + insecure=insecure, + token=token, + ) + return {"output": out} + + +def _validate_unauthorized_response(res: Dict[str, Any], token: Optional[str], inference: LLMUserInference) -> None: + """Validate response for unauthorized users.""" + auth_header = "x-ext-auth-reason" + + if auth_reason := re.search(rf"{auth_header}: (.*)", res["output"], re.MULTILINE): + reason = auth_reason.group(1).lower() + + if token: + assert re.search(r"not (?:authenticated|authorized)", reason) + else: + assert "credential not found" in reason + else: + forbidden_patterns = ["Forbidden", "401", "403", "Unauthorized"] + output = res["output"] + + if any(pattern in output for pattern in forbidden_patterns): + return + + raise ValueError(f"Auth header {auth_header} not found in response. Response: {output}") + + +def _validate_authorized_response( + res: Dict[str, Any], + inference: LLMUserInference, + inference_config: Dict[str, Any], + inference_type: str, + expected_response_text: Optional[str], + use_default_query: bool, + model_name: str, +) -> None: + """Validate response for authorized users.""" + + use_regex = False + + if use_default_query: + expected_response_text_config = inference_config.get("default_query_model", {}) + use_regex = expected_response_text_config.get("use_regex", False) + + if not expected_response_text_config: + raise ValueError(f"Missing default_query_model config for inference {inference_config}") + + if inference_config.get("support_multi_default_queries"): + query_config = expected_response_text_config.get(inference_type) + if not query_config: + raise ValueError(f"Missing default_query_model config for inference type {inference_type}") + expected_response_text = query_config.get("query_output", "") + use_regex = query_config.get("use_regex", False) + else: + expected_response_text = expected_response_text_config.get("query_output") + + if not expected_response_text: + raise ValueError(f"Missing response text key for inference {inference_config}") + + if isinstance(expected_response_text, str): + expected_response_text = Template(template=expected_response_text).safe_substitute(model_name=model_name) + elif isinstance(expected_response_text, dict): + response_output = expected_response_text.get("response_output") + if response_output is not None: + expected_response_text = Template(template=response_output).safe_substitute(model_name=model_name) + if inference.inference_response_text_key_name: + if inference_type == inference.STREAMING: + if output := re.findall( + rf"{inference.inference_response_text_key_name}\": \"(.*)\"", + res[inference.inference_response_key_name], + re.MULTILINE, + ): + assert "".join(output) == expected_response_text, ( + f"Expected: {expected_response_text} does not match response: {output}" + ) + elif inference_type == inference.INFER or use_regex: + formatted_res = json.dumps(res[inference.inference_response_text_key_name]).replace(" ", "") + if use_regex and expected_response_text is not None: + assert re.search(expected_response_text, formatted_res), ( + f"Expected: {expected_response_text} not found in: {formatted_res}" + ) + else: + formatted_res = json.dumps(res[inference.inference_response_key_name]).replace(" ", "") + assert formatted_res == expected_response_text, ( + f"Expected: {expected_response_text} does not match output: {formatted_res}" + ) + else: + response = res[inference.inference_response_key_name] + if isinstance(response, list): + response = response[0] + + if isinstance(response, dict): + response_text = response[inference.inference_response_text_key_name] + assert response_text == expected_response_text, ( + f"Expected: {expected_response_text} does not match response: {response_text}" + ) + else: + raise InferenceResponseError( + "Inference response output does not match expected output format." + f"Expected: {expected_response_text}.\nResponse: {res}" + ) + else: + raise InferenceResponseError(f"Inference response output not found in response. Response: {res}") diff --git a/utilities/manifests/opt125m_cpu.py b/utilities/manifests/opt125m_cpu.py new file mode 100644 index 000000000..2326e991d --- /dev/null +++ b/utilities/manifests/opt125m_cpu.py @@ -0,0 +1,27 @@ +OPT125M_CPU_INFERENCE_CONFIG = { + "default_query_model": { + "query_input": "What is the boiling point of water?", + "query_output": r'.*', # Accept any valid response + "use_regex": True, + }, + "chat_completions": { + "http": { + "endpoint": "v1/chat/completions", + "header": "Content-Type:application/json", + "body": '{"model": "$model_name", "messages": [{"role": "user", "content": "$query_input"}], "max_tokens": 50, "temperature": 0.0, "stream": false}', + "response_fields_map": { + "response_output": "output", + }, + }, + }, + "completions": { + "http": { + "endpoint": "v1/completions", + "header": "Content-Type:application/json", + "body": '{"model": "$model_name", "prompt": "$query_input", "max_tokens": 50, "temperature": 0.0}', + "response_fields_map": { + "response_output": "output", + }, + }, + }, +}