diff --git a/tests/model_serving/model_server/llmd/__init__.py b/tests/model_serving/model_server/llmd/__init__.py index 98274df1a..b740cc862 100644 --- a/tests/model_serving/model_server/llmd/__init__.py +++ b/tests/model_serving/model_server/llmd/__init__.py @@ -1 +1 @@ -"""LLMD (LLM Deployment) test module for OpenDataHub and OpenShift AI.""" +"""llm-d test module for OpenDataHub and OpenShift AI.""" diff --git a/tests/model_serving/model_server/llmd/conftest.py b/tests/model_serving/model_server/llmd/conftest.py index df26d2fe3..6a8e1afa1 100644 --- a/tests/model_serving/model_server/llmd/conftest.py +++ b/tests/model_serving/model_server/llmd/conftest.py @@ -1,540 +1,299 @@ +import logging +from collections import namedtuple from collections.abc import Generator -from contextlib import ExitStack +from contextlib import ExitStack, contextmanager +from typing import Any import pytest import yaml from _pytest.fixtures import FixtureRequest from kubernetes.dynamic import DynamicClient +from ocp_resources.config_map import ConfigMap from ocp_resources.gateway import Gateway from ocp_resources.llm_inference_service import LLMInferenceService from ocp_resources.namespace import Namespace from ocp_resources.role import Role from ocp_resources.role_binding import RoleBinding -from ocp_resources.secret import Secret from ocp_resources.service_account import ServiceAccount - -from tests.model_serving.model_server.llmd.constants import ( - LLMD_LIVENESS_PROBE, - PREFIX_CACHE_BLOCK_SIZE, - PREFIX_CACHE_HASH_ALGO, - PREFIX_CACHE_HASH_SEED, - ROUTER_SCHEDULER_CONFIG_ESTIMATED_PREFIX_CACHE, -) -from utilities.constants import ResourceLimits, Timeout -from utilities.infra import create_inference_token, s3_endpoint_secret -from utilities.llmd_constants import ( - ContainerImages, - LLMDDefaults, - LLMDGateway, - ModelNames, - ModelStorage, -) -from utilities.llmd_utils import create_llmd_gateway, create_llmisvc +from simple_logger.logger import get_logger + +from tests.model_serving.model_server.llmd.llmd_configs import TinyLlamaOciConfig +from tests.model_serving.model_server.llmd.utils import wait_for_llmisvc +from utilities.constants import Timeout +from utilities.infra import create_inference_token, s3_endpoint_secret, update_configmap_data +from utilities.llmd_constants import LLMDGateway +from utilities.llmd_utils import create_llmd_gateway from utilities.logger import RedactedString +LOGGER = get_logger(name=__name__) +logging.getLogger("timeout_sampler").setLevel(logging.WARNING) -# ********************************* -# ** S3 fixtures ** -# ********************************* -@pytest.fixture(scope="class") -def llmd_s3_secret( - admin_client: DynamicClient, - unprivileged_model_namespace: Namespace, - aws_access_key_id: str, - aws_secret_access_key: str, - models_s3_bucket_name: str, - models_s3_bucket_region: str, - models_s3_bucket_endpoint: str, -) -> Generator[Secret]: - """Create a Kubernetes secret with S3 credentials for LLMD model storage.""" - with s3_endpoint_secret( - client=admin_client, - name="llmd-s3-secret", - namespace=unprivileged_model_namespace.name, - aws_access_key=aws_access_key_id, - aws_secret_access_key=aws_secret_access_key, - aws_s3_region=models_s3_bucket_region, - aws_s3_bucket=models_s3_bucket_name, - aws_s3_endpoint=models_s3_bucket_endpoint, - ) as secret: - yield secret +AuthEntry = namedtuple(typename="AuthEntry", field_names=["service", "token"]) -@pytest.fixture(scope="class") -def llmd_s3_service_account(admin_client: DynamicClient, llmd_s3_secret: Secret) -> Generator[ServiceAccount]: - """Create a service account linked to the S3 secret for LLMD pods.""" - with ServiceAccount( - client=admin_client, - namespace=llmd_s3_secret.namespace, - name="llmd-s3-service-account", - secrets=[{"name": llmd_s3_secret.name}], - ) as sa: - yield sa - - -# ********************************* -# ** Gateway fixtures ** -# ********************************* -@pytest.fixture(scope="session") -def gateway_namespace() -> str: - """Return the namespace for LLMD gateway.""" - return LLMDGateway.DEFAULT_NAMESPACE - - -@pytest.fixture(scope="session") -def shared_llmd_gateway( - admin_client: DynamicClient, - gateway_namespace: str, -) -> Generator[Gateway]: +# =========================================== +# Gateway +# =========================================== +@pytest.fixture(scope="session", autouse=True) +def shared_llmd_gateway(admin_client: DynamicClient) -> Generator[Gateway]: """Shared LLMD gateway for all tests.""" with create_llmd_gateway( client=admin_client, - namespace=gateway_namespace, + namespace=LLMDGateway.DEFAULT_NAMESPACE, gateway_class_name=LLMDGateway.DEFAULT_CLASS, wait_for_condition=True, - timeout=Timeout.TIMEOUT_5MIN, + timeout=Timeout.TIMEOUT_1MIN, teardown=True, ) as gateway: yield gateway +# =========================================== +# Storage — S3 secret + service account +# =========================================== @pytest.fixture(scope="class") -def llmd_gateway(shared_llmd_gateway: Gateway) -> Gateway: - """Class-scoped LLMD gateway fixture.""" - return shared_llmd_gateway - - -# ********************************* -# ** Auth fixtures ** -# ********************************* -@pytest.fixture(scope="class") -def llmisvc_auth_service_account( +def s3_service_account( + request: FixtureRequest, admin_client: DynamicClient, unprivileged_model_namespace: Namespace, -) -> Generator: - """Factory fixture to create service accounts for authentication testing.""" +) -> Generator[str]: + """Create S3 secret + service account. Resolved automatically for S3 configs.""" with ExitStack() as stack: - - def _create_service_account(name: str) -> ServiceAccount: - """Create a single service account.""" - return stack.enter_context( - cm=ServiceAccount( - client=admin_client, - namespace=unprivileged_model_namespace.name, - name=name, - ) + secret = stack.enter_context( + cm=s3_endpoint_secret( + client=admin_client, + name="llmd-s3-secret", + namespace=unprivileged_model_namespace.name, + aws_access_key=request.getfixturevalue(argname="aws_access_key_id"), + aws_secret_access_key=request.getfixturevalue(argname="aws_secret_access_key"), + aws_s3_region=request.getfixturevalue(argname="models_s3_bucket_region"), + aws_s3_bucket=request.getfixturevalue(argname="models_s3_bucket_name"), + aws_s3_endpoint=request.getfixturevalue(argname="models_s3_bucket_endpoint"), ) - - yield _create_service_account - - -@pytest.fixture(scope="class") -def llmisvc_auth_view_role( - admin_client: DynamicClient, -) -> Generator: - """Factory fixture to create view roles for LLMInferenceServices.""" - with ExitStack() as stack: - - def _create_view_role(llm_service: LLMInferenceService) -> Role: - """Create a single view role for a given LLMInferenceService.""" - return stack.enter_context( - cm=Role( - client=admin_client, - name=f"{llm_service.name}-view", - namespace=llm_service.namespace, - rules=[ - { - "apiGroups": [llm_service.api_group], - "resources": ["llminferenceservices"], - "verbs": ["get"], - "resourceNames": [llm_service.name], - }, - ], - ) - ) - - yield _create_view_role - - -@pytest.fixture(scope="class") -def llmisvc_auth_role_binding( - admin_client: DynamicClient, -) -> Generator: - """Factory fixture to create role bindings.""" - with ExitStack() as stack: - - def _create_role_binding( - service_account: ServiceAccount, - role: Role, - ) -> RoleBinding: - """Create a single role binding.""" - return stack.enter_context( - cm=RoleBinding( - client=admin_client, - namespace=service_account.namespace, - name=f"{service_account.name}-view", - role_ref_name=role.name, - role_ref_kind=role.kind, - subjects_kind="ServiceAccount", - subjects_name=service_account.name, - ) + ) + sa = stack.enter_context( + cm=ServiceAccount( + client=admin_client, + namespace=unprivileged_model_namespace.name, + name="llmd-s3-service-account", + secrets=[{"name": secret.name}], ) + ) + yield sa.name - yield _create_role_binding - - -@pytest.fixture(scope="class") -def llmisvc_auth_token() -> Generator: - """Factory fixture to create inference tokens with all required RBAC resources.""" - - def _create_token( - service_account: ServiceAccount, - llmisvc: LLMInferenceService, - view_role_factory, - role_binding_factory, - ) -> str: - """Create role, role binding, and return an inference token for an existing service account.""" - # Create role and role binding (these factories manage their own cleanup via ExitStack) - role = view_role_factory(llm_service=llmisvc) - role_binding_factory(service_account=service_account, role=role) - return RedactedString(value=create_inference_token(model_service_account=service_account)) - yield _create_token +# =========================================== +# GPU guards +# =========================================== +@pytest.fixture(scope="session") +def skip_if_less_than_2_gpus(gpu_count_on_cluster: int) -> None: + """Skip test if fewer than 2 GPUs are available on the cluster.""" + if gpu_count_on_cluster < 2: + pytest.skip(f"Test requires at least 2 GPUs (found {gpu_count_on_cluster})") +# =========================================== +# LLMInferenceService creation +# =========================================== @pytest.fixture(scope="class") -def authenticated_llmisvc_token( - request: FixtureRequest, - llmisvc_auth_token, - llmisvc_auth_view_role, - llmisvc_auth_role_binding, -) -> str: - """Create an authentication token for accessing a protected LLMInferenceService.""" - service_account_fixture_name = request.param["service_account_fixture"] - llmisvc_fixture_name = request.param["llmisvc_fixture"] - - # Get fixtures dynamically - service_account = request.getfixturevalue(argname=service_account_fixture_name) - llmisvc = request.getfixturevalue(argname=llmisvc_fixture_name) - - # Create and return token - return llmisvc_auth_token( - service_account=service_account, - llmisvc=llmisvc, - view_role_factory=llmisvc_auth_view_role, - role_binding_factory=llmisvc_auth_role_binding, - ) - - -# ************************************ -# ** LLM Inference Service fixtures ** -# ************************************ -@pytest.fixture(scope="class") -def llmd_inference_service( +def llmisvc( request: FixtureRequest, admin_client: DynamicClient, unprivileged_model_namespace: Namespace, ) -> Generator[LLMInferenceService]: - """Basic LLMInferenceService fixture for OCI storage with CPU runtime. + """LLMInferenceService fixture driven by a config class. - This is the most commonly used fixture for basic LLMD tests. It uses - OCI container registry for model storage and defaults to CPU resources. - """ - if isinstance(request.param, str): - name_suffix = request.param - kwargs = {} - else: - name_suffix = request.param.get("name_suffix", "basic") - kwargs = {k: v for k, v in request.param.items() if k != "name_suffix"} - - service_name = kwargs.get("name", f"llm-{name_suffix}") - - if "llmd_gateway" in request.fixturenames: - request.getfixturevalue(argname="llmd_gateway") - container_resources = kwargs.get( - "container_resources", - { - "limits": {"cpu": "2", "memory": "16Gi"}, - "requests": {"cpu": "500m", "memory": "12Gi"}, - }, - ) - - create_kwargs = { - "client": admin_client, - "name": service_name, - "namespace": unprivileged_model_namespace.name, - "storage_uri": kwargs.get("storage_uri", ModelStorage.TINYLLAMA_OCI), - "container_image": kwargs.get("container_image", ContainerImages.VLLM_CPU), - "container_resources": container_resources, - "liveness_probe": LLMD_LIVENESS_PROBE, - "wait": True, - "timeout": Timeout.TIMEOUT_15MIN, - **{k: v for k, v in kwargs.items() if k != "name"}, - } - - with create_llmisvc(**create_kwargs) as llm_service: - yield llm_service + Usage: + NAMESPACE = ns_from_file(__file__) + @pytest.mark.parametrize( + "unprivileged_model_namespace, llmisvc", + [({"name": NAMESPACE}, SomeConfig)], + indirect=True, + ) + """ + config_cls = request.param + namespace = unprivileged_model_namespace.name -@pytest.fixture(scope="class") -def llmisvc_auth( - admin_client: DynamicClient, - unprivileged_model_namespace: Namespace, - llmisvc_auth_service_account, -) -> Generator: - """Factory fixture to create LLMInferenceService instances for authentication testing.""" - with ExitStack() as stack: - - def _create_llmd_auth_service( - service_name: str, - service_account_name: str, - storage_uri: str = ModelStorage.TINYLLAMA_OCI, - container_image: str = ContainerImages.VLLM_CPU, - container_resources: dict | None = None, - ) -> tuple[LLMInferenceService, ServiceAccount]: - """Create a single LLMInferenceService instance with its service account.""" - if container_resources is None: - container_resources = { - "limits": {"cpu": "1", "memory": "10Gi"}, - "requests": {"cpu": "100m", "memory": "8Gi"}, - } - - # Create the service account first - sa = llmisvc_auth_service_account(name=service_account_name) - - create_kwargs = { - "client": admin_client, - "name": service_name, - "namespace": unprivileged_model_namespace.name, - "storage_uri": storage_uri, - "container_image": container_image, - "container_resources": container_resources, - "liveness_probe": LLMD_LIVENESS_PROBE, - "service_account": service_account_name, - "wait": True, - "timeout": Timeout.TIMEOUT_15MIN, - "enable_auth": True, - } - - llm_service = stack.enter_context(cm=create_llmisvc(**create_kwargs)) - return (llm_service, sa) + service_account = None + if config_cls.storage_uri.startswith("s3://"): + service_account = request.getfixturevalue(argname="s3_service_account") - yield _create_llmd_auth_service + with _create_llmisvc_from_config( + config_cls=config_cls, namespace=namespace, client=admin_client, service_account=service_account + ) as svc: + yield svc @pytest.fixture(scope="class") -def llmd_inference_service_s3( - request: FixtureRequest, +def llmisvc_auth_pair( admin_client: DynamicClient, unprivileged_model_namespace: Namespace, - llmd_s3_service_account: ServiceAccount, -) -> Generator[LLMInferenceService]: - """Create an LLMInferenceService that loads models from S3 storage.""" - if isinstance(request.param, str): - name_suffix = request.param - kwargs = {} - else: - name_suffix = request.param.get("name_suffix", "s3") - kwargs = {k: v for k, v in request.param.items() if k != "name_suffix"} - - service_name = kwargs.get("name", f"llm-{name_suffix}") - - container_resources = kwargs.get( - "container_resources", - { - "limits": {"cpu": "1", "memory": "10Gi"}, - "requests": {"cpu": "100m", "memory": "8Gi"}, - }, - ) - - create_kwargs = { - "client": admin_client, - "name": service_name, - "namespace": unprivileged_model_namespace.name, - "storage_uri": kwargs.get("storage_uri", ModelStorage.TINYLLAMA_S3), - "container_image": kwargs.get("container_image", ContainerImages.VLLM_CPU), - "container_resources": container_resources, - "liveness_probe": LLMD_LIVENESS_PROBE, - "service_account": llmd_s3_service_account.name, - "wait": True, - "timeout": Timeout.TIMEOUT_15MIN, - **{ - k: v - for k, v in kwargs.items() - if k not in ["name", "storage_uri", "container_image", "container_resources"] - }, - } - - with create_llmisvc(**create_kwargs) as llm_service: - yield llm_service +) -> Generator[tuple[AuthEntry, AuthEntry]]: + """Two auth-enabled LLMISVCs with independent tokens for cross-auth testing.""" + namespace = unprivileged_model_namespace.name + with ExitStack() as stack: + entries = [] + for i in range(2): + cfg = TinyLlamaOciConfig.with_overrides( + name=f"llmisvc-auth-{i}", + enable_auth=True, + ) + svc = stack.enter_context( + cm=_create_llmisvc_from_config( + config_cls=cfg, + namespace=namespace, + client=admin_client, + ) + ) + token = stack.enter_context( + cm=_create_auth_resources( + client=admin_client, + namespace=namespace, + svc=svc, + sa_name=f"auth-sa-{i}", + ) + ) + entries.append(AuthEntry(service=svc, token=token)) + yield tuple(entries) +# =========================================== +# Auth — SA + RBAC + token +# =========================================== @pytest.fixture(scope="class") -def llmd_inference_service_gpu( - request: FixtureRequest, +def llmisvc_token( admin_client: DynamicClient, - unprivileged_model_namespace: Namespace, - llmd_s3_service_account: ServiceAccount, -) -> Generator[LLMInferenceService]: - """Create an LLMInferenceService with GPU resources for accelerated inference.""" - if isinstance(request.param, str): - name_suffix = request.param - kwargs = {} - else: - name_suffix = request.param.get("name_suffix", "gpu-hf") - kwargs = {k: v for k, v in request.param.items() if k != "name_suffix"} - - service_name = kwargs.get("name", f"llm-{name_suffix}") - - if "llmd_gateway" in request.fixturenames: - request.getfixturevalue(argname="llmd_gateway") - - if kwargs.get("enable_prefill_decode", False): - container_resources = kwargs.get( - "container_resources", - { - "limits": {"cpu": "4", "memory": "32Gi", "nvidia.com/gpu": "1"}, - "requests": {"cpu": "2", "memory": "16Gi", "nvidia.com/gpu": "1"}, - }, - ) - else: - container_resources = kwargs.get( - "container_resources", - { - "limits": { - "cpu": ResourceLimits.GPU.CPU_LIMIT, - "memory": ResourceLimits.GPU.MEMORY_LIMIT, - "nvidia.com/gpu": ResourceLimits.GPU.LIMIT, - }, - "requests": { - "cpu": ResourceLimits.GPU.CPU_REQUEST, - "memory": ResourceLimits.GPU.MEMORY_REQUEST, - "nvidia.com/gpu": ResourceLimits.GPU.REQUEST, - }, - }, - ) - - replicas = kwargs.get("replicas", LLMDDefaults.REPLICAS) - if kwargs.get("enable_prefill_decode", False): - replicas = kwargs.get("replicas", 3) - - prefill_config = None - if kwargs.get("enable_prefill_decode", False): - prefill_config = { - "replicas": kwargs.get("prefill_replicas", 1), - } - - create_kwargs = { - "client": admin_client, - "name": service_name, - "namespace": unprivileged_model_namespace.name, - "storage_uri": kwargs.get("storage_uri", ModelStorage.S3_QWEN), - "model_name": kwargs.get("model_name", ModelNames.QWEN), - "replicas": replicas, - "container_resources": container_resources, - "liveness_probe": LLMD_LIVENESS_PROBE, - "prefill_config": prefill_config, - "disable_scheduler": kwargs.get("disable_scheduler", False), - "enable_prefill_decode": kwargs.get("enable_prefill_decode", False), - "service_account": llmd_s3_service_account.name, - "wait": True, - "timeout": Timeout.TIMEOUT_15MIN, + llmisvc: LLMInferenceService, +) -> Generator[str]: + """Create a dedicated SA with RBAC and return an auth token for the llmisvc.""" + with _create_auth_resources( + client=admin_client, + namespace=llmisvc.namespace, + svc=llmisvc, + sa_name=f"{llmisvc.name}-auth-sa", + ) as token: + yield token + + +# =========================================== +# Monitoring +# =========================================== +@pytest.fixture(scope="session", autouse=True) +def llmd_user_workload_monitoring_config_map( + admin_client: DynamicClient, cluster_monitoring_config: ConfigMap +) -> Generator[ConfigMap]: + """Ephemeral user workload monitoring for LLMD tests.""" + data = { + "config.yaml": yaml.dump({ + "prometheus": { + "logLevel": "debug", + "retention": "15d", + } + }) } - if "container_image" in kwargs: - create_kwargs["container_image"] = kwargs["container_image"] - - with create_llmisvc(**create_kwargs) as llm_service: - yield llm_service - - -@pytest.fixture(scope="class") -def singlenode_estimated_prefix_cache( - admin_client: DynamicClient, - unprivileged_model_namespace: Namespace, - llmd_s3_secret: Secret, - llmd_s3_service_account: ServiceAccount, - llmd_gateway: Gateway, -) -> Generator[LLMInferenceService]: - """LLMInferenceService fixture for single-node estimated prefix cache test.""" - - llmisvc_name = "singlenode-estimated-prefix-cache" - - with create_llmisvc( + with update_configmap_data( client=admin_client, - name=llmisvc_name, - namespace=unprivileged_model_namespace.name, - storage_uri=ModelStorage.TINYLLAMA_S3, - model_name=ModelNames.TINYLLAMA, - replicas=2, - annotations={ - "prometheus.io/port": "8000", - "prometheus.io/path": "/metrics", - }, - container_resources={ - "limits": { - "cpu": ResourceLimits.GPU.CPU_LIMIT, - "memory": ResourceLimits.GPU.MEMORY_LIMIT, - "nvidia.com/gpu": ResourceLimits.GPU.LIMIT, - }, - "requests": { - "cpu": ResourceLimits.GPU.CPU_REQUEST, - "memory": ResourceLimits.GPU.MEMORY_REQUEST, - "nvidia.com/gpu": ResourceLimits.GPU.REQUEST, - }, - }, - container_env=[ - {"name": "MODEL_NAME", "value": ModelNames.TINYLLAMA}, - {"name": "VLLM_LOGGING_LEVEL", "value": "DEBUG"}, - {"name": "PYTHONHASHSEED", "value": str(PREFIX_CACHE_HASH_SEED)}, - { - "name": "VLLM_ADDITIONAL_ARGS", - "value": ( - f"--prefix-caching-hash-algo {PREFIX_CACHE_HASH_ALGO} --block-size {PREFIX_CACHE_BLOCK_SIZE}" - ), - }, - ], - liveness_probe=LLMD_LIVENESS_PROBE, - service_account=llmd_s3_service_account.name, - enable_auth=True, - router_config={ - "scheduler": { - "template": { - "containers": [ - { - "name": "main", - "args": [ - "--v=4", - "--pool-name", - "{{ ChildName .ObjectMeta.Name `-inference-pool` }}", - "--pool-namespace", - "{{ .ObjectMeta.Namespace }}", - "--pool-group", - "inference.networking.x-k8s.io", - "--zap-encoder", - "json", - "--grpc-port", - "9002", - "--grpc-health-port", - "9003", - "--secure-serving", - "--model-server-metrics-scheme", - "https", - "--cert-path", - "/var/run/kserve/tls", - "--config-text", - yaml.dump(ROUTER_SCHEDULER_CONFIG_ESTIMATED_PREFIX_CACHE), - ], - } - ], + name="user-workload-monitoring-config", + namespace="openshift-user-workload-monitoring", + data=data, + ) as cm: + yield cm + + +# =========================================== +# Helpers (not fixtures) +# =========================================== +@contextmanager +def _create_auth_resources( + client: DynamicClient, + namespace: str, + svc: LLMInferenceService, + sa_name: str, +) -> Generator[RedactedString, Any]: + """Create SA + Role + RoleBinding and yield an auth token.""" + with ( + ServiceAccount(client=client, namespace=namespace, name=sa_name) as sa, + Role( + client=client, + name=f"{svc.name}-view", + namespace=namespace, + rules=[ + { + "apiGroups": [svc.api_group], + "resources": ["llminferenceservices"], + "verbs": ["get"], + "resourceNames": [svc.name], } - }, - "route": {}, - "gateway": {}, - }, - disable_scheduler=False, - enable_prefill_decode=False, - wait=True, - timeout=Timeout.TIMEOUT_15MIN, - ) as llm_service: + ], + ) as role, + RoleBinding( + client=client, + namespace=namespace, + name=f"{sa_name}-view", + role_ref_name=role.name, + role_ref_kind=role.kind, + subjects_kind="ServiceAccount", + subjects_name=sa_name, + ), + ): + yield RedactedString(value=create_inference_token(model_service_account=sa)) + + +@contextmanager +def _create_llmisvc_from_config( + config_cls: type, + namespace: str, + client: DynamicClient, + service_account: str | None = None, +) -> Generator[LLMInferenceService, Any]: + """Create an LLMInferenceService from a config class.""" + LOGGER.info(f"\n{config_cls.describe(namespace=namespace)}") + + model: dict[str, Any] = {"uri": config_cls.storage_uri} + if config_cls.model_name: + model["name"] = config_cls.model_name + + main_container: dict[str, Any] = {"name": "main"} + main_container.update({ + k: v + for k, v in { + "image": config_cls.container_image, + "resources": config_cls.container_resources(), + "env": config_cls.container_env(), + "livenessProbe": config_cls.liveness_probe(), + "readinessProbe": config_cls.readiness_probe(), + }.items() + if v + }) + + template: dict[str, Any] = { + "configRef": config_cls.template_config_ref, + "containers": [main_container], + } + if service_account: + template["serviceAccountName"] = service_account + + prefill = config_cls.prefill_config() + + svc_kwargs: dict[str, Any] = { + "client": client, + "name": config_cls.name, + "namespace": namespace, + "annotations": config_cls.annotations(), + "label": config_cls.labels(), + "teardown": True, + "model": model, + "replicas": config_cls.replicas, + "router": config_cls.router_config(), + "template": template, + } + if prefill is not None: + if service_account and "template" in prefill: + prefill["template"]["serviceAccountName"] = service_account + svc_kwargs["prefill"] = prefill + + with LLMInferenceService(**svc_kwargs) as llm_service: + wait_for_llmisvc(llmisvc=llm_service, timeout=config_cls.wait_timeout) yield llm_service diff --git a/tests/model_serving/model_server/llmd/constants.py b/tests/model_serving/model_server/llmd/constants.py deleted file mode 100644 index 72f116ffc..000000000 --- a/tests/model_serving/model_server/llmd/constants.py +++ /dev/null @@ -1,40 +0,0 @@ -# Liveness probe for single-node configurations -LLMD_LIVENESS_PROBE = { - "httpGet": {"path": "/health", "port": 8000, "scheme": "HTTPS"}, - "initialDelaySeconds": 240, - "periodSeconds": 60, - "timeoutSeconds": 60, - "failureThreshold": 10, -} - -# Common parameters for vLLM and llm-d scheduler -PREFIX_CACHE_BLOCK_SIZE = 64 -PREFIX_CACHE_HASH_ALGO = "sha256" -PREFIX_CACHE_HASH_SEED = "42" - -# Scheduler configuration for single-node with estimated prefix cache -ROUTER_SCHEDULER_CONFIG_ESTIMATED_PREFIX_CACHE = { - "apiVersion": "inference.networking.x-k8s.io/v1alpha1", - "kind": "EndpointPickerConfig", - "plugins": [ - { - "type": "prefix-cache-scorer", - "parameters": { - "blockSize": PREFIX_CACHE_BLOCK_SIZE, - "maxPrefixBlocksToMatch": 256, - "lruCapacityPerServer": 31250, - }, - } - ], - "schedulingProfiles": [ - { - "name": "default", - "plugins": [ - { - "pluginRef": "prefix-cache-scorer", - "weight": 5.0, - } - ], - } - ], -} diff --git a/tests/model_serving/model_server/llmd/kueue/__init__.py b/tests/model_serving/model_server/llmd/kueue/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/model_serving/model_server/llmd/llmd_configs/__init__.py b/tests/model_serving/model_server/llmd/llmd_configs/__init__.py new file mode 100644 index 000000000..58f638650 --- /dev/null +++ b/tests/model_serving/model_server/llmd/llmd_configs/__init__.py @@ -0,0 +1,17 @@ +from .config_base import LLMISvcConfig +from .config_estimated_prefix_cache import EstimatedPrefixCacheConfig +from .config_models import QwenHfConfig, QwenS3Config, TinyLlamaHfConfig, TinyLlamaOciConfig, TinyLlamaS3Config +from .config_precise_prefix_cache import PrecisePrefixCacheConfig +from .config_prefill_decode import PrefillDecodeConfig + +__all__ = [ + "EstimatedPrefixCacheConfig", + "LLMISvcConfig", + "PrecisePrefixCacheConfig", + "PrefillDecodeConfig", + "QwenHfConfig", + "QwenS3Config", + "TinyLlamaHfConfig", + "TinyLlamaOciConfig", + "TinyLlamaS3Config", +] diff --git a/tests/model_serving/model_server/llmd/llmd_configs/config_base.py b/tests/model_serving/model_server/llmd/llmd_configs/config_base.py new file mode 100644 index 000000000..1f02732e5 --- /dev/null +++ b/tests/model_serving/model_server/llmd/llmd_configs/config_base.py @@ -0,0 +1,149 @@ +"""Base configuration class for LLMInferenceService resources.""" + +from utilities.constants import ResourceLimits +from utilities.llmd_constants import ContainerImages + + +class LLMISvcConfig: + """Base configuration for an LLMInferenceService resource. + + Subclass and override class attributes or classmethods for each test scenario. + Pass the class directly to create_llmisvc_from_config — no instantiation needed. + """ + + name = "" + model_name = None + storage_uri = "" + replicas = 1 + container_image = None + template_config_ref = "kserve-config-llm-template" + enable_auth = False + wait_timeout = 180 + + @classmethod + def container_resources(cls): + return {} + + @classmethod + def container_env(cls): + """Base environment variables for the vLLM container. + + Subclasses may either: + - Call super().container_env() + [...] to extend the base env vars (used by CpuConfig) + - Return a fresh list to fully replace (used by prefix cache configs that need + exclusive control over VLLM_ADDITIONAL_ARGS) + """ + return [ + {"name": "VLLM_LOGGING_LEVEL", "value": "DEBUG"}, + ] + + @classmethod + def liveness_probe(cls): + return { + "httpGet": {"path": "/health", "port": 8000, "scheme": "HTTPS"}, + "initialDelaySeconds": 240, + "periodSeconds": 60, + "timeoutSeconds": 60, + "failureThreshold": 10, + } + + @classmethod + def readiness_probe(cls): + return None + + @classmethod + def router_config(cls): + return { + "scheduler": {"configRef": "kserve-config-llm-scheduler"}, + "route": {}, + "gateway": {}, + } + + @classmethod + def annotations(cls): + return { + "prometheus.io/port": "8000", + "prometheus.io/path": "/metrics", + "security.opendatahub.io/enable-auth": str(cls.enable_auth).lower(), + } + + @classmethod + def prefill_config(cls): + return None + + @classmethod + def labels(cls): + return {} + + @classmethod + def describe(cls, namespace: str = ""): + """Return a formatted config summary for log output.""" + border = "=" * 60 + lines = [ + border, + f" Config: {cls.__name__}", + border, + f" namespace: {namespace}", + f" name: {cls.name}", + f" storage_uri: {cls.storage_uri}", + f" replicas: {cls.replicas}", + f" container_image: {cls.container_image or '(default)'}", + f" auth: {cls.annotations().get('security.opendatahub.io/enable-auth', 'false')}", + border, + ] + return "\n".join(lines) + + @classmethod + def with_overrides(cls, **overrides): + """Create a derived config class with overridden attributes.""" + return type(f"{cls.__name__}_custom", (cls,), overrides) + + +class CpuConfig(LLMISvcConfig): + """CPU inference base. Sets vLLM CPU image, CPU env vars, and CPU resource limits.""" + + enable_auth = False + container_image = ContainerImages.VLLM_CPU + + @classmethod + def container_env(cls): + # vLLM arguments to reduce engine startup time + # --max-num-seqs 20 + # --max-model-len 128 + # --enforce-eager + return super().container_env() + [ + { + "name": "VLLM_ADDITIONAL_ARGS", + "value": "--max-num-seqs 20 --max-model-len 128 --enforce-eager --ssl-ciphers ECDHE+AESGCM:DHE+AESGCM", + }, + {"name": "VLLM_CPU_KVCACHE_SPACE", "value": "4"}, + ] + + @classmethod + def container_resources(cls): + return { + "limits": {"cpu": "1", "memory": "10Gi"}, + "requests": {"cpu": "100m", "memory": "8Gi"}, + } + + +class GpuConfig(LLMISvcConfig): + """GPU inference base. Sets GPU resource limits.""" + + enable_auth = False + wait_timeout = 480 + + @classmethod + def container_resources(cls): + return { + "limits": { + "cpu": ResourceLimits.GPU.CPU_LIMIT, + "memory": ResourceLimits.GPU.MEMORY_LIMIT, + "nvidia.com/gpu": ResourceLimits.GPU.LIMIT, + }, + "requests": { + "cpu": ResourceLimits.GPU.CPU_REQUEST, + "memory": ResourceLimits.GPU.MEMORY_REQUEST, + "nvidia.com/gpu": ResourceLimits.GPU.REQUEST, + }, + } diff --git a/tests/model_serving/model_server/llmd/llmd_configs/config_estimated_prefix_cache.py b/tests/model_serving/model_server/llmd/llmd_configs/config_estimated_prefix_cache.py new file mode 100644 index 000000000..ae2b846ca --- /dev/null +++ b/tests/model_serving/model_server/llmd/llmd_configs/config_estimated_prefix_cache.py @@ -0,0 +1,90 @@ +"""Estimated prefix cache configuration for single-node LLMInferenceService.""" + +import yaml + +from .config_models import QwenS3Config + + +class EstimatedPrefixCacheConfig(QwenS3Config): + """Single-node estimated prefix cache — Qwen via S3, 2 GPU replicas.""" + + enable_auth = True + name = "llmisvc-estimated-prefix" + replicas = 2 + block_size = 64 + hash_algo = "sha256" + hash_seed = "42" + + @classmethod + def container_env(cls): + return [ + {"name": "MODEL_NAME", "value": cls.model_name}, + {"name": "VLLM_LOGGING_LEVEL", "value": "DEBUG"}, + {"name": "PYTHONHASHSEED", "value": cls.hash_seed}, + { + "name": "VLLM_ADDITIONAL_ARGS", + "value": f"--prefix-caching-hash-algo {cls.hash_algo} --block-size {cls.block_size}", + }, + ] + + @classmethod + def _scheduler_config(cls): + """EndpointPickerConfig — estimated prefix cache scorer plugin.""" + return { + "apiVersion": "inference.networking.x-k8s.io/v1alpha1", + "kind": "EndpointPickerConfig", + "plugins": [ + { + "type": "prefix-cache-scorer", + "parameters": { + "blockSize": cls.block_size, + "maxPrefixBlocksToMatch": 256, + "lruCapacityPerServer": 31250, + }, + } + ], + "schedulingProfiles": [ + { + "name": "default", + "plugins": [{"pluginRef": "prefix-cache-scorer", "weight": 5.0}], + } + ], + } + + @classmethod + def router_config(cls): + return { + "scheduler": { + "template": { + "containers": [ + { + "name": "main", + "args": [ + "--v=4", + "--pool-name", + "{{ ChildName .ObjectMeta.Name `-inference-pool` }}", + "--pool-namespace", + "{{ .ObjectMeta.Namespace }}", + "--pool-group", + "inference.networking.x-k8s.io", + "--zap-encoder", + "json", + "--grpc-port", + "9002", + "--grpc-health-port", + "9003", + "--secure-serving", + "--model-server-metrics-scheme", + "https", + "--cert-path", + "/var/run/kserve/tls", + "--config-text", + yaml.dump(cls._scheduler_config()), + ], + } + ], + } + }, + "route": {}, + "gateway": {}, + } diff --git a/tests/model_serving/model_server/llmd/llmd_configs/config_models.py b/tests/model_serving/model_server/llmd/llmd_configs/config_models.py new file mode 100644 index 000000000..0e33a9db2 --- /dev/null +++ b/tests/model_serving/model_server/llmd/llmd_configs/config_models.py @@ -0,0 +1,47 @@ +"""Model+storage configurations — bind a model to a storage backend.""" + +from utilities.llmd_constants import ModelNames, ModelStorage + +from .config_base import CpuConfig, GpuConfig + + +class TinyLlamaOciConfig(CpuConfig): + """TinyLlama via OCI container registry, CPU inference.""" + + enable_auth = False + name = "llmisvc-tinyllama-oci-cpu" + storage_uri = ModelStorage.TINYLLAMA_OCI + + +class TinyLlamaS3Config(CpuConfig): + """TinyLlama via S3 bucket, CPU inference.""" + + enable_auth = False + name = "llmisvc-tinyllama-s3-cpu" + storage_uri = ModelStorage.TINYLLAMA_S3 + + +class TinyLlamaHfConfig(CpuConfig): + """TinyLlama via HuggingFace, CPU inference.""" + + enable_auth = False + name = "llmisvc-tinyllama-hf-cpu" + storage_uri = ModelStorage.HF_TINYLLAMA + + +class QwenS3Config(GpuConfig): + """Qwen 7B via S3 bucket, GPU inference.""" + + enable_auth = False + name = "llmisvc-qwen-s3-gpu" + storage_uri = ModelStorage.S3_QWEN + model_name = ModelNames.QWEN + + +class QwenHfConfig(GpuConfig): + """Qwen 7B via HuggingFace, GPU inference.""" + + enable_auth = False + name = "llmisvc-qwen-hf-gpu" + storage_uri = ModelStorage.HF_QWEN_7B_INSTRUCT + model_name = ModelNames.QWEN diff --git a/tests/model_serving/model_server/llmd/llmd_configs/config_precise_prefix_cache.py b/tests/model_serving/model_server/llmd/llmd_configs/config_precise_prefix_cache.py new file mode 100644 index 000000000..da78d7170 --- /dev/null +++ b/tests/model_serving/model_server/llmd/llmd_configs/config_precise_prefix_cache.py @@ -0,0 +1,142 @@ +"""Precise prefix cache configuration for single-node LLMInferenceService.""" + +import json + +import yaml + +from .config_models import QwenHfConfig + + +class PrecisePrefixCacheConfig(QwenHfConfig): + """Single-node precise prefix cache — Qwen 7B via HuggingFace, 2 GPU replicas.""" + + name = "llmisvc-precise-prefix" + # The precise-prefix-cache-scorer scheduler plugin downloads the tokenizer from + # HuggingFace using the model name. It must be the full HF repo ID, not an alias. + model_name = "Qwen/Qwen2.5-7B-Instruct" + replicas = 2 + block_size = 64 + hash_algo = "sha256_cbor" + hash_seed = "42" + enable_auth = True + + @classmethod + def container_env(cls): + kv_events_config = { + "enable_kv_cache_events": True, + "publisher": "zmq", + "endpoint": f"tcp://{cls.name}-epp-service:5557", + "topic": "kv@$(POD_IP)@$(MODEL_NAME)", + } + return [ + { + "name": "POD_IP", + "valueFrom": {"fieldRef": {"apiVersion": "v1", "fieldPath": "status.podIP"}}, + }, + {"name": "MODEL_NAME", "value": cls.model_name}, + {"name": "VLLM_LOGGING_LEVEL", "value": "DEBUG"}, + {"name": "CUDA_LAUNCH_BLOCKING", "value": "1"}, + {"name": "PYTHONHASHSEED", "value": cls.hash_seed}, + { + "name": "VLLM_ADDITIONAL_ARGS", + "value": ( + f"--enable-prefix-caching " + f"--prefix-caching-hash-algo {cls.hash_algo} " + f"--block-size {cls.block_size} " + f"--kv-events-config '{json.dumps(kv_events_config)}'" + ), + }, + ] + + @classmethod + def _scheduler_config(cls): + """EndpointPickerConfig — precise prefix cache with KV block index tracking.""" + return { + "apiVersion": "inference.networking.x-k8s.io/v1alpha1", + "kind": "EndpointPickerConfig", + "plugins": [ + {"type": "single-profile-handler"}, + { + "type": "precise-prefix-cache-scorer", + "parameters": { + "kvEventsConfig": {"zmqEndpoint": "tcp://*:5557", "topicFilter": "kv"}, + "indexerConfig": { + "tokenProcessorConfig": { + "blockSize": cls.block_size, + "hashSeed": cls.hash_seed, + }, + "kvBlockIndexConfig": { + "enableMetrics": True, + "metricsLoggingInterval": 60000000000, + }, + "tokenizersPoolConfig": { + "hf": {"tokenizersCacheDir": "/mnt/tokenizers"}, + }, + }, + }, + }, + {"type": "load-aware-scorer"}, + {"type": "max-score-picker"}, + ], + "schedulingProfiles": [ + { + "name": "default", + "plugins": [ + {"pluginRef": "precise-prefix-cache-scorer", "weight": 2.0}, + {"pluginRef": "load-aware-scorer", "weight": 1.0}, + {"pluginRef": "max-score-picker"}, + ], + } + ], + } + + @classmethod + def _scheduler_container(cls): + """Scheduler container with ZMQ ports and tokenizer volume mounts.""" + return { + "name": "main", + "ports": [ + {"name": "grpc", "containerPort": 9002, "protocol": "TCP"}, + {"name": "grpc-health", "containerPort": 9003, "protocol": "TCP"}, + {"name": "metrics", "containerPort": 9090, "protocol": "TCP"}, + {"name": "zmq", "containerPort": 5557, "protocol": "TCP"}, + ], + "env": [{"name": "HF_HOME", "value": "/mnt/tokenizers"}], + "volumeMounts": [{"name": "tokenizers", "mountPath": "/mnt/tokenizers", "readOnly": False}], + "args": [ + "--v=4", + "--pool-name", + "{{ ChildName .ObjectMeta.Name `-inference-pool` }}", + "--pool-namespace", + "{{ .ObjectMeta.Namespace }}", + "--pool-group", + "inference.networking.x-k8s.io", + "--zap-encoder", + "json", + "--grpc-port", + "9002", + "--grpc-health-port", + "9003", + "--secure-serving", + "--model-server-metrics-scheme", + "https", + "--model-server-metrics-https-insecure-skip-verify", + "--cert-path", + "/var/run/kserve/tls", + "--config-text", + yaml.dump(cls._scheduler_config()), + ], + } + + @classmethod + def router_config(cls): + return { + "scheduler": { + "template": { + "volumes": [{"name": "tokenizers", "emptyDir": {}}], + "containers": [cls._scheduler_container()], + } + }, + "route": {}, + "gateway": {}, + } diff --git a/tests/model_serving/model_server/llmd/llmd_configs/config_prefill_decode.py b/tests/model_serving/model_server/llmd/llmd_configs/config_prefill_decode.py new file mode 100644 index 000000000..5a07e2a81 --- /dev/null +++ b/tests/model_serving/model_server/llmd/llmd_configs/config_prefill_decode.py @@ -0,0 +1,25 @@ +"""Prefill-decode disaggregation configuration for LLMInferenceService.""" + +from .config_models import QwenS3Config + + +class PrefillDecodeConfig(QwenS3Config): + """S3 GPU with prefill-decode disaggregation — inherits Qwen+S3+GPU from QwenS3Config.""" + + enable_auth = False + name = "llmisvc-prefill-decode-gpu" + + @classmethod + def prefill_config(cls): + return { + "replicas": 1, + "template": { + "containers": [ + { + "name": "main", + "resources": cls.container_resources(), + "env": [{"name": "VLLM_PREFILL_MODE", "value": "true"}], + } + ], + }, + } diff --git a/tests/model_serving/model_server/llmd/test_llmd_auth.py b/tests/model_serving/model_server/llmd/test_llmd_auth.py index 0b8124cae..3bff4a1cc 100644 --- a/tests/model_serving/model_server/llmd/test_llmd_auth.py +++ b/tests/model_serving/model_server/llmd/test_llmd_auth.py @@ -1,125 +1,67 @@ import pytest from tests.model_serving.model_server.llmd.utils import ( - verify_gateway_status, - verify_llm_service_status, + ns_from_file, + parse_completion_text, + send_chat_completions, ) -from utilities.constants import Protocols -from utilities.llmd_utils import verify_inference_response_llmd -from utilities.manifests.tinyllama import TINYLLAMA_INFERENCE_CONFIG -pytestmark = [ - pytest.mark.llmd_cpu, - pytest.mark.smoke, -] +pytestmark = [pytest.mark.tier1] + +NAMESPACE = ns_from_file(file=__file__) @pytest.mark.parametrize( "unprivileged_model_namespace", - [({"name": "llmd-auth-test"})], + [{"name": NAMESPACE}], indirect=True, ) class TestLLMISVCAuth: - """Authentication testing for LLMD.""" + """Deploy TinyLlama on CPU with authentication enabled and verify access control on chat completions.""" - @pytest.fixture(scope="class", autouse=True) - def setup_auth_resources( - self, - llmd_gateway, - llmisvc_auth, - llmisvc_auth_token, - llmisvc_auth_view_role, - llmisvc_auth_role_binding, - ): - """Set up gateway, LLMInferenceServices, and tokens once for all tests.""" - llmisvc_auth_prefix = "llmisvc-auth-user-" - sa_prefix = "llmisvc-auth-sa-" + def test_llmisvc_authorized(self, llmisvc_auth_pair): + """Test steps: - # Create LLMInferenceService instances using the factory fixture - llmisvc_user_a, sa_user_a = llmisvc_auth( - service_name=llmisvc_auth_prefix + "a", - service_account_name=sa_prefix + "a", - ) - llmisvc_user_b, sa_user_b = llmisvc_auth( - service_name=llmisvc_auth_prefix + "b", - service_account_name=sa_prefix + "b", - ) - - # Create tokens with all RBAC resources - token_user_a = llmisvc_auth_token( - service_account=sa_user_a, - llmisvc=llmisvc_user_a, - view_role_factory=llmisvc_auth_view_role, - role_binding_factory=llmisvc_auth_role_binding, - ) - token_user_b = llmisvc_auth_token( - service_account=sa_user_b, - llmisvc=llmisvc_user_b, - view_role_factory=llmisvc_auth_view_role, - role_binding_factory=llmisvc_auth_role_binding, - ) + 1. Send a chat completion request to each service using its owner's token. + 2. Assert both responses return status 200. + 3. Assert both completion texts contain the expected answer. + """ + entry_a, entry_b = llmisvc_auth_pair - # Verify all resources are ready - assert verify_gateway_status(llmd_gateway), "Gateway should be ready" - assert verify_llm_service_status(llmisvc_user_a), "LLMInferenceService user A should be ready" - assert verify_llm_service_status(llmisvc_user_b), "LLMInferenceService user B should be ready" + prompt = "What is the capital of Italy?" + expected = "rome" - # Store resources as class attributes for use in tests - TestLLMISVCAuth.llmisvc_user_a = llmisvc_user_a - TestLLMISVCAuth.llmisvc_user_b = llmisvc_user_b - TestLLMISVCAuth.token_user_a = token_user_a - TestLLMISVCAuth.token_user_b = token_user_b + for entry in [entry_a, entry_b]: + status, body = send_chat_completions( + llmisvc=entry.service, prompt=prompt, token=entry.token, insecure=False + ) + assert status == 200, f"Authorized request failed with {status}: {body}" + completion = parse_completion_text(response_body=body) + assert expected in completion.lower(), f"Expected '{expected}' in response, got: {completion}" - def test_llmisvc_authorized(self): - """Test that authorized users can access their own LLMInferenceServices.""" - # Verify inference for user A with user A's token (should succeed) - verify_inference_response_llmd( - llm_service=self.llmisvc_user_a, - inference_config=TINYLLAMA_INFERENCE_CONFIG, - inference_type="chat_completions", - protocol=Protocols.HTTP, - use_default_query=True, - insecure=False, - model_name=self.llmisvc_user_a.name, - token=self.token_user_a, - authorized_user=True, - ) + def test_llmisvc_unauthorized(self, llmisvc_auth_pair): + """Test steps: - # Verify inference for user B with user B's token (should succeed) - verify_inference_response_llmd( - llm_service=self.llmisvc_user_b, - inference_config=TINYLLAMA_INFERENCE_CONFIG, - inference_type="chat_completions", - protocol=Protocols.HTTP, - use_default_query=True, - insecure=False, - model_name=self.llmisvc_user_b.name, - token=self.token_user_b, - authorized_user=True, - ) + 1. Send a chat completion request to user A's service using user B's token. + 2. Assert the response status is 401 or 403. + 3. Send a chat completion request to user A's service with no token. + 4. Assert the response status is 401 or 403. + """ + entry_a, entry_b = llmisvc_auth_pair - def test_llmisvc_unauthorized(self): - """Test that unauthorized access to LLMInferenceServices is properly blocked.""" - # Verify that user B's token cannot access user A's service (should fail) - verify_inference_response_llmd( - llm_service=self.llmisvc_user_a, - inference_config=TINYLLAMA_INFERENCE_CONFIG, - inference_type="chat_completions", - protocol=Protocols.HTTP, - use_default_query=True, + # User B's token cannot access user A's service + status, _ = send_chat_completions( + llmisvc=entry_a.service, + prompt="What is the capital of Italy?", + token=entry_b.token, insecure=False, - model_name=self.llmisvc_user_a.name, - token=self.token_user_b, - authorized_user=False, ) + assert status in (401, 403), f"Cross-user access should be denied, got {status}" - # Verify that accessing user A's service without a token fails - verify_inference_response_llmd( - llm_service=self.llmisvc_user_a, - inference_config=TINYLLAMA_INFERENCE_CONFIG, - inference_type="chat_completions", - protocol=Protocols.HTTP, - use_default_query=True, + # No token at all fails + status, _ = send_chat_completions( + llmisvc=entry_a.service, + prompt="What is the capital of Italy?", insecure=False, - authorized_user=False, ) + assert status in (401, 403), f"No-token access should be denied, got {status}" diff --git a/tests/model_serving/model_server/llmd/test_llmd_connection_cpu.py b/tests/model_serving/model_server/llmd/test_llmd_connection_cpu.py new file mode 100644 index 000000000..3c290e96f --- /dev/null +++ b/tests/model_serving/model_server/llmd/test_llmd_connection_cpu.py @@ -0,0 +1,41 @@ +import pytest +from ocp_resources.llm_inference_service import LLMInferenceService + +from tests.model_serving.model_server.llmd.llmd_configs import TinyLlamaHfConfig, TinyLlamaS3Config +from tests.model_serving.model_server.llmd.utils import ( + ns_from_file, + parse_completion_text, + send_chat_completions, +) + +pytestmark = [pytest.mark.tier1] + +NAMESPACE = ns_from_file(file=__file__) + + +@pytest.mark.parametrize( + "unprivileged_model_namespace, llmisvc", + [ + pytest.param({"name": NAMESPACE}, TinyLlamaS3Config, id="s3"), + pytest.param({"name": NAMESPACE}, TinyLlamaHfConfig, id="hf"), + ], + indirect=True, +) +@pytest.mark.usefixtures("valid_aws_config") +class TestLlmdConnectionCpu: + """Deploy TinyLlama on CPU via S3 and HuggingFace and verify chat completions.""" + + def test_llmd_connection_cpu(self, llmisvc: LLMInferenceService): + """Test steps: + + 1. Send a chat completion request to /v1/chat/completions. + 2. Assert the response status is 200. + 3. Assert the completion text contains the expected answer. + """ + prompt = "What is the capital of Italy?" + expected = "rome" + + status, body = send_chat_completions(llmisvc=llmisvc, prompt=prompt) + assert status == 200, f"Expected 200, got {status}: {body}" + completion = parse_completion_text(response_body=body) + assert expected in completion.lower(), f"Expected '{expected}' in response, got: {completion}" diff --git a/tests/model_serving/model_server/llmd/test_llmd_connection_gpu.py b/tests/model_serving/model_server/llmd/test_llmd_connection_gpu.py new file mode 100644 index 000000000..0f1e001ee --- /dev/null +++ b/tests/model_serving/model_server/llmd/test_llmd_connection_gpu.py @@ -0,0 +1,44 @@ +import pytest +from ocp_resources.llm_inference_service import LLMInferenceService + +from tests.model_serving.model_server.llmd.llmd_configs import QwenHfConfig, QwenS3Config +from tests.model_serving.model_server.llmd.utils import ( + ns_from_file, + parse_completion_text, + send_chat_completions, +) + +pytestmark = [pytest.mark.tier1, pytest.mark.gpu] + +NAMESPACE = ns_from_file(file=__file__) + + +@pytest.mark.parametrize( + "unprivileged_model_namespace, llmisvc", + [ + pytest.param({"name": NAMESPACE}, QwenS3Config, id="s3"), + pytest.param({"name": NAMESPACE}, QwenHfConfig, id="hf"), + ], + indirect=True, +) +@pytest.mark.usefixtures("valid_aws_config", "skip_if_no_gpu_available") +class TestLlmdConnectionGpu: + """Deploy Qwen on GPU via S3 and HuggingFace and verify chat completions.""" + + def test_llmd_connection_gpu( + self, + llmisvc: LLMInferenceService, + ): + """Test steps: + + 1. Send a chat completion request to /v1/chat/completions. + 2. Assert the response status is 200. + 3. Assert the completion text contains the expected answer. + """ + prompt = "What is the capital of Italy?" + expected = "rome" + + status, body = send_chat_completions(llmisvc=llmisvc, prompt=prompt) + assert status == 200, f"Expected 200, got {status}: {body}" + completion = parse_completion_text(response_body=body) + assert expected in completion.lower(), f"Expected '{expected}' in response, got: {completion}" diff --git a/tests/model_serving/model_server/llmd/kueue/test_kueue_llmisvc_raw.py b/tests/model_serving/model_server/llmd/test_llmd_kueue_integration.py similarity index 61% rename from tests/model_serving/model_server/llmd/kueue/test_kueue_llmisvc_raw.py rename to tests/model_serving/model_server/llmd/test_llmd_kueue_integration.py index c86f84ad2..45c46cc28 100644 --- a/tests/model_serving/model_server/llmd/kueue/test_kueue_llmisvc_raw.py +++ b/tests/model_serving/model_server/llmd/test_llmd_kueue_integration.py @@ -1,26 +1,23 @@ import pytest from ocp_resources.deployment import Deployment +from ocp_resources.llm_inference_service import LLMInferenceService from timeout_sampler import TimeoutExpiredError, TimeoutSampler +from tests.model_serving.model_server.llmd.llmd_configs import TinyLlamaOciConfig from tests.model_serving.model_server.llmd.utils import ( - verify_gateway_status, - verify_llm_service_status, + ns_from_file, + parse_completion_text, + send_chat_completions, ) -from utilities.constants import Labels, Protocols +from utilities.constants import Labels from utilities.exceptions import UnexpectedResourceCountError from utilities.kueue_utils import check_gated_pods_and_running_pods -from utilities.llmd_utils import verify_inference_response_llmd -from utilities.manifests.tinyllama import TINYLLAMA_INFERENCE_CONFIG -pytestmark = [ - pytest.mark.rawdeployment, - pytest.mark.llmd_cpu, - pytest.mark.kueue, - pytest.mark.smoke, -] +pytestmark = [pytest.mark.tier2] + +NAMESPACE = ns_from_file(file=__file__) # --- Test Configuration --- -NAMESPACE_NAME = "test-kueue-llmd-raw" LOCAL_QUEUE_NAME = "llmd-local-queue-raw" CLUSTER_QUEUE_NAME = "llmd-cluster-queue-raw" RESOURCE_FLAVOR_NAME = "llmd-flavor-raw" @@ -28,10 +25,6 @@ # Set a quota sufficient for only ONE model to run CPU_QUOTA = "3" MEMORY_QUOTA = "20Gi" -LLMISVC_RESOURCES = { - "requests": {"cpu": "2", "memory": "6Gi"}, - "limits": {"cpu": CPU_QUOTA, "memory": MEMORY_QUOTA}, -} # INITIAL_REPLICAS needs to be 1 or you need to change the test to check for the number of # available replicas @@ -44,18 +37,30 @@ EXPECTED_GATED_PODS = 1 +class KueueTestConfig(TinyLlamaOciConfig): + """Kueue admission control test — TinyLlama via OCI, CPU inference.""" + + name = "llmd-kueue-scaleup-test" + + @classmethod + def container_resources(cls): + return { + "requests": {"cpu": "2", "memory": "6Gi"}, + "limits": {"cpu": "3", "memory": "20Gi"}, + } + + @classmethod + def labels(cls): + return {Labels.Kueue.QUEUE_NAME: "llmd-local-queue-raw"} + + @pytest.mark.parametrize( - "unprivileged_model_namespace, llmd_inference_service, " + "unprivileged_model_namespace, llmisvc, " "kueue_cluster_queue_from_template, kueue_resource_flavor_from_template, kueue_local_queue_from_template", [ ( - {"name": NAMESPACE_NAME, "add-kueue-label": True}, - { - "name": "llmd-kueue-scaleup-test", - "replicas": INITIAL_REPLICAS, - "labels": {Labels.Kueue.QUEUE_NAME: LOCAL_QUEUE_NAME}, - "container_resources": LLMISVC_RESOURCES, - }, + {"name": NAMESPACE, "add-kueue-label": True}, + KueueTestConfig, { "name": CLUSTER_QUEUE_NAME, "resource_flavor_name": RESOURCE_FLAVOR_NAME, @@ -69,10 +74,7 @@ indirect=True, ) class TestKueueLLMDScaleUp: - """ - Test Kueue admission control for a single LLMInferenceService that scales up - to exceed the available resource quota. - """ + """Deploy TinyLlama on CPU under a Kueue quota, scale to 2 replicas, and verify Kueue gates the excess replica.""" def _get_deployment_status_replicas(self, deployment: Deployment) -> int: deployment.get() @@ -81,26 +83,26 @@ def _get_deployment_status_replicas(self, deployment: Deployment) -> int: def test_kueue_llmd_scaleup( self, unprivileged_client, + unprivileged_model_namespace, kueue_resource_flavor_from_template, kueue_cluster_queue_from_template, kueue_local_queue_from_template, - llmd_inference_service, - llmd_gateway, + llmisvc: LLMInferenceService, ): + """Test steps: + + 1. Find the workload deployment and assert exactly 1 exists with 1 replica. + 2. Scale the LLMInferenceService to 2 replicas. + 3. Wait for the deployment to reach 2 desired replicas. + 4. Assert Kueue admits 1 pod and gates the other. + 5. Send a chat completion request to /v1/chat/completions. + 6. Assert the response status is 200 and the completion text contains the expected answer. """ - Verify that Kueue admits the first replica of an LLMInferenceService and - gates the second replica when the service is scaled up beyond the queue's quota. - """ - # The llmd_inference_service is created with 1 replica at first to ensure the LLMISVC is ready - # Wait for the service and its single pod to become ready. - assert verify_gateway_status(llmd_gateway), "Gateway should be ready" - assert verify_llm_service_status(llmd_inference_service), "LLMInferenceService should be ready" - - selector_labels = [f"app.kubernetes.io/name={llmd_inference_service.name}", "kserve.io/component=workload"] + selector_labels = [f"app.kubernetes.io/name={llmisvc.name}", "kserve.io/component=workload"] deployments = list( Deployment.get( label_selector=",".join(selector_labels), - namespace=llmd_inference_service.namespace, + namespace=llmisvc.namespace, client=unprivileged_client, ) ) @@ -114,9 +116,9 @@ def test_kueue_llmd_scaleup( assert replicas == INITIAL_REPLICAS, f"Deployment should have {INITIAL_REPLICAS} replica, got {replicas}" # Update the LLMInferenceService to request 2 replicas, which exceeds the quota. - isvc_to_update = llmd_inference_service.instance.to_dict() + isvc_to_update = llmisvc.instance.to_dict() isvc_to_update["spec"]["replicas"] = EXPECTED_UPDATED_REPLICAS - llmd_inference_service.update(isvc_to_update) + llmisvc.update(isvc_to_update) # Check the deployment until it has 2 replicas, which means it's been updated try: @@ -138,9 +140,7 @@ def test_kueue_llmd_scaleup( for running_pods, gated_pods in TimeoutSampler( wait_timeout=120, sleep=5, - func=lambda: check_gated_pods_and_running_pods( - selector_labels, llmd_inference_service.namespace, unprivileged_client - ), + func=lambda: check_gated_pods_and_running_pods(selector_labels, llmisvc.namespace, unprivileged_client), ): if running_pods == EXPECTED_RUNNING_PODS and gated_pods == EXPECTED_GATED_PODS: break @@ -152,12 +152,10 @@ def test_kueue_llmd_scaleup( ) from None # Verify that inference still works on the single running pod - verify_inference_response_llmd( - llm_service=llmd_inference_service, - inference_config=TINYLLAMA_INFERENCE_CONFIG, - inference_type="chat_completions", - protocol=Protocols.HTTP, - use_default_query=True, - insecure=True, - model_name=llmd_inference_service.name, - ) + prompt = "What is the capital of Italy?" + expected = "rome" + + status, body = send_chat_completions(llmisvc=llmisvc, prompt=prompt) + assert status == 200, f"Expected 200 after scale-up, got {status}: {body}" + completion = parse_completion_text(response_body=body) + assert expected in completion.lower(), f"Expected '{expected}' in response, got: {completion}" diff --git a/tests/model_serving/model_server/llmd/test_llmd_no_scheduler.py b/tests/model_serving/model_server/llmd/test_llmd_no_scheduler.py new file mode 100644 index 000000000..944e996bd --- /dev/null +++ b/tests/model_serving/model_server/llmd/test_llmd_no_scheduler.py @@ -0,0 +1,49 @@ +import pytest +from ocp_resources.llm_inference_service import LLMInferenceService + +from tests.model_serving.model_server.llmd.llmd_configs import QwenS3Config +from tests.model_serving.model_server.llmd.utils import ( + ns_from_file, + parse_completion_text, + send_chat_completions, +) + +pytestmark = [pytest.mark.tier2, pytest.mark.gpu] + +NAMESPACE = ns_from_file(file=__file__) + + +class S3GpuNoSchedulerConfig(QwenS3Config): + name = "llm-gpu-no-scheduler" + + @classmethod + def router_config(cls): + return {"route": {}} + + +@pytest.mark.parametrize( + "unprivileged_model_namespace, llmisvc", + [({"name": NAMESPACE}, S3GpuNoSchedulerConfig)], + indirect=True, +) +@pytest.mark.usefixtures("valid_aws_config", "skip_if_no_gpu_available") +class TestLlmdNoScheduler: + """Deploy Qwen on GPU with the scheduler disabled and verify chat completions.""" + + def test_llmd_no_scheduler( + self, + llmisvc: LLMInferenceService, + ): + """Test steps: + + 1. Send a chat completion request to /v1/chat/completions. + 2. Assert the response status is 200. + 3. Assert the completion text contains the expected answer. + """ + prompt = "What is the capital of Italy?" + expected = "rome" + + status, body = send_chat_completions(llmisvc=llmisvc, prompt=prompt) + assert status == 200, f"Expected 200, got {status}: {body}" + completion = parse_completion_text(response_body=body) + assert expected in completion.lower(), f"Expected '{expected}' in response, got: {completion}" diff --git a/tests/model_serving/model_server/llmd/test_llmd_oci_cpu.py b/tests/model_serving/model_server/llmd/test_llmd_oci_cpu.py deleted file mode 100644 index 76c986861..000000000 --- a/tests/model_serving/model_server/llmd/test_llmd_oci_cpu.py +++ /dev/null @@ -1,48 +0,0 @@ -import pytest - -from tests.model_serving.model_server.llmd.utils import ( - verify_gateway_status, - verify_llm_service_status, - verify_llmd_no_failed_pods, -) -from utilities.constants import Protocols -from utilities.llmd_utils import verify_inference_response_llmd -from utilities.manifests.tinyllama import TINYLLAMA_INFERENCE_CONFIG - -pytestmark = [ - pytest.mark.llmd_cpu, - pytest.mark.smoke, -] - -BASIC_LLMD_PARAMS = [({"name": "llmd-comprehensive-test"}, "basic")] - - -@pytest.mark.parametrize( - "unprivileged_model_namespace, llmd_inference_service", - BASIC_LLMD_PARAMS, - indirect=True, -) -class TestLLMDOCICPUInference: - """LLMD inference testing with OCI storage and CPU runtime using vLLM. - - Tests CPU-based LLMD inference using OCI container registry for model storage. - This test validates the basic LLMD functionality with CPU resources and - ensures proper integration with the TinyLlama model from OCI storage. - """ - - def test_llmd_oci_cpu(self, unprivileged_client, llmd_gateway, llmd_inference_service): - """Test LLMD inference with OCI storage and CPU runtime.""" - assert verify_gateway_status(llmd_gateway), "Gateway should be ready" - assert verify_llm_service_status(llmd_inference_service), "LLMInferenceService should be ready" - - verify_inference_response_llmd( - llm_service=llmd_inference_service, - inference_config=TINYLLAMA_INFERENCE_CONFIG, - inference_type="chat_completions", - protocol=Protocols.HTTP, - use_default_query=True, - insecure=True, - model_name=llmd_inference_service.name, - ) - - verify_llmd_no_failed_pods(client=unprivileged_client, llm_service=llmd_inference_service) diff --git a/tests/model_serving/model_server/llmd/test_llmd_prefill_decode.py b/tests/model_serving/model_server/llmd/test_llmd_prefill_decode.py new file mode 100644 index 000000000..6e3cccd97 --- /dev/null +++ b/tests/model_serving/model_server/llmd/test_llmd_prefill_decode.py @@ -0,0 +1,41 @@ +import pytest +from ocp_resources.llm_inference_service import LLMInferenceService + +from tests.model_serving.model_server.llmd.llmd_configs import PrefillDecodeConfig +from tests.model_serving.model_server.llmd.utils import ( + ns_from_file, + parse_completion_text, + send_chat_completions, +) + +pytestmark = [pytest.mark.tier2, pytest.mark.gpu] + +NAMESPACE = ns_from_file(file=__file__) + + +@pytest.mark.parametrize( + "unprivileged_model_namespace, llmisvc", + [({"name": NAMESPACE}, PrefillDecodeConfig)], + indirect=True, +) +@pytest.mark.usefixtures("valid_aws_config", "skip_if_no_gpu_available") +class TestLlmdPrefillDecode: + """Deploy Qwen on GPU with prefill-decode disaggregation and verify chat completions.""" + + def test_llmd_prefill_decode( + self, + llmisvc: LLMInferenceService, + ): + """Test steps: + + 1. Send a chat completion request to /v1/chat/completions. + 2. Assert the response status is 200. + 3. Assert the completion text contains the expected answer. + """ + prompt = "What is the capital of Italy?" + expected = "rome" + + status, body = send_chat_completions(llmisvc=llmisvc, prompt=prompt) + assert status == 200, f"Expected 200, got {status}: {body}" + completion = parse_completion_text(response_body=body) + assert expected in completion.lower(), f"Expected '{expected}' in response, got: {completion}" diff --git a/tests/model_serving/model_server/llmd/test_llmd_s3.py b/tests/model_serving/model_server/llmd/test_llmd_s3.py deleted file mode 100644 index 2044e40b5..000000000 --- a/tests/model_serving/model_server/llmd/test_llmd_s3.py +++ /dev/null @@ -1,42 +0,0 @@ -import pytest - -from tests.model_serving.model_server.llmd.utils import ( - verify_gateway_status, - verify_llm_service_status, - verify_llmd_no_failed_pods, -) -from utilities.constants import Protocols -from utilities.llmd_utils import verify_inference_response_llmd -from utilities.manifests.tinyllama import TINYLLAMA_INFERENCE_CONFIG - -pytestmark = [ - pytest.mark.llmd_cpu, - pytest.mark.smoke, -] - - -@pytest.mark.parametrize( - "unprivileged_model_namespace, llmd_inference_service_s3", - [pytest.param({"name": "llmd-s3-test"}, {"name_suffix": "s3"}, id="s3-cpu-basic")], - indirect=True, -) -@pytest.mark.usefixtures("valid_aws_config") -class TestLLMDS3Inference: - """LLMD inference testing with S3 storage and CPU runtime using vLLM.""" - - def test_llmd_s3_cpu(self, unprivileged_client, llmd_gateway, llmd_inference_service_s3): - """Test LLMD inference with S3 storage and CPU runtime.""" - assert verify_gateway_status(llmd_gateway), "Gateway should be ready" - assert verify_llm_service_status(llmd_inference_service_s3), "LLMInferenceService should be ready" - - verify_inference_response_llmd( - llm_service=llmd_inference_service_s3, - inference_config=TINYLLAMA_INFERENCE_CONFIG, - inference_type="chat_completions", - protocol=Protocols.HTTP, - use_default_query=True, - insecure=True, - model_name=llmd_inference_service_s3.name, - ) - - verify_llmd_no_failed_pods(client=unprivileged_client, llm_service=llmd_inference_service_s3) diff --git a/tests/model_serving/model_server/llmd/test_llmd_s3_gpu.py b/tests/model_serving/model_server/llmd/test_llmd_s3_gpu.py deleted file mode 100644 index 12e2f3eef..000000000 --- a/tests/model_serving/model_server/llmd/test_llmd_s3_gpu.py +++ /dev/null @@ -1,67 +0,0 @@ -import pytest - -from tests.model_serving.model_server.llmd.utils import ( - verify_gateway_status, - verify_llm_service_status, -) -from utilities.constants import Protocols -from utilities.llmd_constants import ModelNames, ModelStorage -from utilities.llmd_utils import verify_inference_response_llmd -from utilities.manifests.qwen2_7b_instruct_gpu import QWEN2_7B_INSTRUCT_GPU_INFERENCE_CONFIG - -pytestmark = [ - pytest.mark.llmd_gpu, - pytest.mark.gpu, - pytest.mark.model_server_gpu, -] - -GPU_LLMD_PARAMS = [ - pytest.param({"name": "llmd-gpu-standard"}, {"name_suffix": "gpu-standard"}, id="gpu-standard"), - pytest.param( - {"name": "llmd-gpu-no-scheduler"}, - {"name_suffix": "gpu-no-scheduler", "disable_scheduler": True}, - id="gpu-no-scheduler", - ), - pytest.param( - {"name": "llmd-gpu-pd"}, - { - "name_suffix": "gpu-pd", - "enable_prefill_decode": True, - "replicas": 1, - "prefill_replicas": 1, - "storage_uri": ModelStorage.S3_QWEN, - "model_name": ModelNames.QWEN, - }, - id="gpu-prefill-decode", - ), -] - - -@pytest.mark.parametrize( - "unprivileged_model_namespace, llmd_inference_service_gpu", - GPU_LLMD_PARAMS, - indirect=True, -) -@pytest.mark.usefixtures("valid_aws_config") -class TestLLMDS3GPUInference: - """LLMD inference testing with S3 storage and GPU runtime using vLLM.""" - - def test_llmd_s3_gpu( - self, unprivileged_client, llmd_gateway, llmd_inference_service_gpu, request, gpu_count_on_cluster - ): - """Test LLMD inference with various GPU configurations using S3 storage.""" - if gpu_count_on_cluster < 1: - pytest.skip("No GPUs available on cluster, skipping GPU test") - - assert verify_gateway_status(llmd_gateway), "Gateway should be ready" - assert verify_llm_service_status(llmd_inference_service_gpu), "LLMInferenceService should be ready" - - verify_inference_response_llmd( - llm_service=llmd_inference_service_gpu, - inference_config=QWEN2_7B_INSTRUCT_GPU_INFERENCE_CONFIG, - inference_type="chat_completions", - protocol=Protocols.HTTP, - use_default_query=True, - insecure=True, - model_name=llmd_inference_service_gpu.instance.spec.model.name, - ) diff --git a/tests/model_serving/model_server/llmd/test_llmd_singlenode_estimated_prefix_cache.py b/tests/model_serving/model_server/llmd/test_llmd_singlenode_estimated_prefix_cache.py new file mode 100644 index 000000000..c65f5ab49 --- /dev/null +++ b/tests/model_serving/model_server/llmd/test_llmd_singlenode_estimated_prefix_cache.py @@ -0,0 +1,71 @@ +import pytest +from kubernetes.dynamic import DynamicClient +from ocp_resources.llm_inference_service import LLMInferenceService +from ocp_resources.prometheus import Prometheus + +from tests.model_serving.model_server.llmd.llmd_configs import EstimatedPrefixCacheConfig +from tests.model_serving.model_server.llmd.utils import ( + assert_prefix_cache_routing, + get_llmd_router_scheduler_pod, + get_llmd_workload_pods, + ns_from_file, + send_prefix_cache_requests, +) + +NUM_REQUESTS = 20 +PREFIX_CACHE_PROMPT = ( + "Explain in detail the fundamental principles of quantum mechanics including " + "wave-particle duality, superposition, and entanglement in simple terms. " + "Additionally, describe how these quantum phenomena differ from classical physics " + "and why they are important for understanding the nature of reality at the atomic scale." +) + +NAMESPACE = ns_from_file(file=__file__) + +pytestmark = [pytest.mark.tier2, pytest.mark.gpu] + + +@pytest.mark.parametrize( + "unprivileged_model_namespace, llmisvc", + [({"name": NAMESPACE}, EstimatedPrefixCacheConfig)], + indirect=True, +) +@pytest.mark.usefixtures("valid_aws_config", "skip_if_less_than_2_gpus") +class TestSingleNodeEstimatedPrefixCache: + """Deploy Qwen on GPU with 2 replicas and estimated prefix cache routing, + then verify cache hits via Prometheus metrics. + """ + + def test_singlenode_estimated_prefix_cache( + self, + unprivileged_client: DynamicClient, + llmisvc: LLMInferenceService, + llmisvc_token: str, + prometheus: Prometheus, + ): + """Test steps: + + 1. Assert the router-scheduler pod exists and is Running. + 2. Assert exactly 2 workload pods are found. + 3. Send 20 chat completion requests with a shared long prompt. + 4. Query Prometheus and assert all traffic was routed to a single pod with correct prefix cache hit counts. + """ + router_pod = get_llmd_router_scheduler_pod(client=unprivileged_client, llmisvc=llmisvc) + assert router_pod is not None, "Router-scheduler pod should exist" + assert router_pod.instance.status.phase == "Running", "Router-scheduler pod should be running" + + workload_pods = get_llmd_workload_pods(client=unprivileged_client, llmisvc=llmisvc) + assert len(workload_pods) == 2, f"Expected 2 workload pods, found {len(workload_pods)}" + + successful = send_prefix_cache_requests( + llmisvc=llmisvc, prompt=PREFIX_CACHE_PROMPT, token=llmisvc_token, count=NUM_REQUESTS + ) + assert successful == NUM_REQUESTS, f"Expected all {NUM_REQUESTS} requests to succeed, got {successful}" + + assert_prefix_cache_routing( + prometheus=prometheus, + llmisvc=llmisvc, + pods=workload_pods, + expected_requests=successful, + block_size=EstimatedPrefixCacheConfig.block_size, + ) diff --git a/tests/model_serving/model_server/llmd/test_llmd_singlenode_precise_prefix_cache.py b/tests/model_serving/model_server/llmd/test_llmd_singlenode_precise_prefix_cache.py new file mode 100644 index 000000000..9b9cd7abd --- /dev/null +++ b/tests/model_serving/model_server/llmd/test_llmd_singlenode_precise_prefix_cache.py @@ -0,0 +1,74 @@ +import pytest +from kubernetes.dynamic import DynamicClient +from ocp_resources.llm_inference_service import LLMInferenceService +from ocp_resources.prometheus import Prometheus + +from tests.model_serving.model_server.llmd.llmd_configs import PrecisePrefixCacheConfig +from tests.model_serving.model_server.llmd.utils import ( + assert_prefix_cache_routing, + assert_scheduler_routing, + get_llmd_router_scheduler_pod, + get_llmd_workload_pods, + ns_from_file, + send_prefix_cache_requests, +) + +NUM_REQUESTS = 20 +PREFIX_CACHE_PROMPT = ( + "Explain in detail the fundamental principles of quantum mechanics including " + "wave-particle duality, superposition, and entanglement in simple terms. " + "Additionally, describe how these quantum phenomena differ from classical physics " + "and why they are important for understanding the nature of reality at the atomic scale." +) + +NAMESPACE = ns_from_file(file=__file__) + +pytestmark = [pytest.mark.tier2, pytest.mark.gpu] + + +@pytest.mark.parametrize( + "unprivileged_model_namespace, llmisvc", + [({"name": NAMESPACE}, PrecisePrefixCacheConfig)], + indirect=True, +) +@pytest.mark.usefixtures("valid_aws_config", "skip_if_less_than_2_gpus") +class TestSingleNodePrecisePrefixCache: + """Deploy Qwen on GPU with 2 replicas and precise prefix cache routing, + then verify cache hits via Prometheus metrics. + """ + + def test_singlenode_precise_prefix_cache( + self, + unprivileged_client: DynamicClient, + llmisvc: LLMInferenceService, + llmisvc_token: str, + prometheus: Prometheus, + ): + """Test steps: + + 1. Assert the router-scheduler pod exists and is Running. + 2. Assert exactly 2 workload pods are found. + 3. Send 20 chat completion requests with a shared long prompt. + 4. Query Prometheus and assert all traffic was routed to a single pod with correct prefix cache hit counts. + 5. Assert the scheduler made at least the expected number of routing decisions. + """ + router_pod = get_llmd_router_scheduler_pod(client=unprivileged_client, llmisvc=llmisvc) + assert router_pod is not None, "Router-scheduler pod should exist" + assert router_pod.instance.status.phase == "Running", "Router-scheduler pod should be running" + + workload_pods = get_llmd_workload_pods(client=unprivileged_client, llmisvc=llmisvc) + assert len(workload_pods) == 2, f"Expected 2 workload pods, found {len(workload_pods)}" + + successful = send_prefix_cache_requests( + llmisvc=llmisvc, prompt=PREFIX_CACHE_PROMPT, token=llmisvc_token, count=NUM_REQUESTS + ) + assert successful == NUM_REQUESTS, f"Expected all {NUM_REQUESTS} requests to succeed, got {successful}" + + assert_prefix_cache_routing( + prometheus=prometheus, + llmisvc=llmisvc, + pods=workload_pods, + expected_requests=successful, + block_size=PrecisePrefixCacheConfig.block_size, + ) + assert_scheduler_routing(router_pod=router_pod, min_decisions=successful) diff --git a/tests/model_serving/model_server/llmd/test_llmd_smoke.py b/tests/model_serving/model_server/llmd/test_llmd_smoke.py new file mode 100644 index 000000000..348ebfd68 --- /dev/null +++ b/tests/model_serving/model_server/llmd/test_llmd_smoke.py @@ -0,0 +1,40 @@ +import pytest +from ocp_resources.llm_inference_service import LLMInferenceService + +from tests.model_serving.model_server.llmd.llmd_configs import TinyLlamaOciConfig +from tests.model_serving.model_server.llmd.utils import ( + ns_from_file, + parse_completion_text, + send_chat_completions, +) + +pytestmark = [pytest.mark.smoke] + +NAMESPACE = ns_from_file(file=__file__) + + +@pytest.mark.parametrize( + "unprivileged_model_namespace, llmisvc", + [({"name": NAMESPACE}, TinyLlamaOciConfig)], + indirect=True, +) +class TestLLMDSmoke: + """Smoke test: deploy TinyLlama on CPU via OCI and verify chat completions.""" + + def test_llmd_smoke( + self, + llmisvc: LLMInferenceService, + ): + """Test steps: + + 1. Send a chat completion request to /v1/chat/completions. + 2. Assert the response status is 200. + 3. Assert the completion text contains the expected answer. + """ + prompt = "What is the capital of Italy?" + expected = "rome" + + status, body = send_chat_completions(llmisvc=llmisvc, prompt=prompt) + assert status == 200, f"Expected 200, got {status}: {body}" + completion = parse_completion_text(response_body=body) + assert expected in completion.lower(), f"Expected '{expected}' in response, got: {completion}" diff --git a/tests/model_serving/model_server/llmd/test_singlenode_estimated_prefix_cache.py b/tests/model_serving/model_server/llmd/test_singlenode_estimated_prefix_cache.py deleted file mode 100644 index 953df37ba..000000000 --- a/tests/model_serving/model_server/llmd/test_singlenode_estimated_prefix_cache.py +++ /dev/null @@ -1,91 +0,0 @@ -""" -Test Single-Node Estimated Prefix Caching. - -This test verifies that the LLM-D router correctly routes inference requests -based on cache state, maximizing prefix cache hits. - -Test configuration: -- LLMInferenceService with 2 replicas and router enabled -- Authentication enabled -- Verify router pod and vLLM pods are running -- Send multiple requests with shared prefixes and size greater than PREFIX_CACHE_BLOCK_SIZE -""" - -import pytest -from kubernetes.dynamic import DynamicClient -from ocp_resources.gateway import Gateway -from ocp_resources.llm_inference_service import LLMInferenceService -from ocp_resources.prometheus import Prometheus - -from tests.model_serving.model_server.llmd.utils import ( - get_llmd_router_scheduler_pod, - get_llmd_workload_pods, - send_prefix_cache_test_requests, - verify_estimated_prefix_cache, - verify_gateway_status, - verify_llm_service_status, -) - -# Number of requests to send for prefix cache testing -NUM_REQUESTS = 20 - -pytestmark = [pytest.mark.llmd_gpu] - - -@pytest.mark.parametrize( - "unprivileged_model_namespace, authenticated_llmisvc_token", - [ - pytest.param( - {"name": "llmd-test-singlenode-estimated-prefix-cache"}, - { - "service_account_fixture": "llmd_s3_service_account", - "llmisvc_fixture": "singlenode_estimated_prefix_cache", - }, - ) - ], - indirect=True, -) -@pytest.mark.usefixtures("valid_aws_config", "user_workload_monitoring_config_map") -class TestSingleNodeEstimatedPrefixCache: - """Test class for singlenode estimated prefix cache routing.""" - - def test_singlenode_estimated_prefix_cache( - self, - unprivileged_client: DynamicClient, - llmd_gateway: Gateway, - singlenode_estimated_prefix_cache: LLMInferenceService, - authenticated_llmisvc_token: str, - gpu_count_on_cluster: int, - prometheus: Prometheus, - ): - """Test single-node estimated prefix cache routing.""" - if gpu_count_on_cluster < 2: - pytest.skip(f"Test requires at least 2 GPUs (found {gpu_count_on_cluster})") - - # Verify infrastructure is ready before testing routing - assert verify_gateway_status(llmd_gateway), "Gateway should be ready" - assert verify_llm_service_status(singlenode_estimated_prefix_cache), "LLMInferenceService should be ready" - - router_scheduler_pod = get_llmd_router_scheduler_pod( - client=unprivileged_client, llmisvc=singlenode_estimated_prefix_cache - ) - assert router_scheduler_pod is not None, "Router-scheduler pod should exist" - assert router_scheduler_pod.instance.status.phase == "Running", "Router-scheduler pod should be running" - - workload_pods = get_llmd_workload_pods(client=unprivileged_client, llmisvc=singlenode_estimated_prefix_cache) - assert len(workload_pods) == 2, f"Expected 2 workload pods, found {len(workload_pods)}" - - # Send N identical requests to test prefix cache - num_successful_requests = send_prefix_cache_test_requests( - llmisvc=singlenode_estimated_prefix_cache, - token=authenticated_llmisvc_token, - num_requests=NUM_REQUESTS, - ) - - # Verify estimated prefix cache routing using Prometheus metrics - verify_estimated_prefix_cache( - prometheus=prometheus, - llmisvc=singlenode_estimated_prefix_cache, - workload_pods=workload_pods, - expected_requests=num_successful_requests, - ) diff --git a/tests/model_serving/model_server/llmd/utils.py b/tests/model_serving/model_server/llmd/utils.py index 1ee8c96b3..9de7cde2c 100644 --- a/tests/model_serving/model_server/llmd/utils.py +++ b/tests/model_serving/model_server/llmd/utils.py @@ -5,194 +5,160 @@ Follows the established model server utils pattern for consistency. """ -from typing import Any +import json +from pathlib import Path from kubernetes.dynamic import DynamicClient -from ocp_resources.gateway import Gateway from ocp_resources.llm_inference_service import LLMInferenceService from ocp_resources.pod import Pod from ocp_resources.prometheus import Prometheus +from pyhelper_utils.shell import run_command from simple_logger.logger import get_logger -from timeout_sampler import TimeoutSampler, retry +from timeout_sampler import retry -from tests.model_serving.model_server.llmd.constants import PREFIX_CACHE_BLOCK_SIZE -from utilities.constants import Protocols -from utilities.exceptions import PodContainersRestartError -from utilities.llmd_utils import verify_inference_response_llmd -from utilities.manifests.tinyllama import TINYLLAMA_INFERENCE_CONFIG +from utilities.certificates_utils import get_ca_bundle +from utilities.constants import Timeout from utilities.monitoring import get_metrics_value LOGGER = get_logger(name=__name__) -def verify_gateway_status(gateway: Gateway) -> bool: - """ - Verify that a Gateway is properly configured and programmed. - - Args: - gateway (Gateway): The Gateway resource to verify - - Returns: - bool: True if gateway is properly configured, False otherwise - """ - if not gateway.exists: - LOGGER.warning(f"Gateway {gateway.name} does not exist") - return False - - conditions = gateway.instance.status.get("conditions", []) - for condition in conditions: - if condition["type"] == "Programmed" and condition["status"] == "True": - LOGGER.info(f"Gateway {gateway.name} is programmed and ready") - return True - - LOGGER.warning(f"Gateway {gateway.name} is not in Programmed state") - return False - - -def verify_llm_service_status(llm_service: LLMInferenceService) -> bool: - """ - Verify that an LLMInferenceService is properly configured and ready. - - Args: - llm_service (LLMInferenceService): The LLMInferenceService resource to verify +def ns_from_file(file: str) -> str: + """Derive namespace name from test filename. - Returns: - bool: True if service is properly configured, False otherwise + Example: __file__ of test_llmd_smoke.py → "llmd-smoke" """ - if not llm_service.exists: - LOGGER.warning(f"LLMInferenceService {llm_service.name} does not exist") - return False - - conditions = llm_service.instance.status.get("conditions", []) - for condition in conditions: - if condition["type"] == "Ready" and condition["status"] == "True": - LOGGER.info(f"LLMInferenceService {llm_service.name} is ready") - return True + return Path(file).stem.removeprefix("test_").replace("_", "-")[:63] - LOGGER.warning(f"LLMInferenceService {llm_service.name} is not in Ready state") - return False - - -def verify_llmd_no_failed_pods( - client: DynamicClient, - llm_service: LLMInferenceService, - timeout: int = 300, -) -> None: - """ - Comprehensive verification that LLMD pods are healthy with no failures. - This function combines restart detection with comprehensive failure detection, - similar to verify_no_failed_pods but specifically designed for LLMInferenceService resources. +def wait_for_llmisvc(llmisvc: LLMInferenceService, timeout: int = Timeout.TIMEOUT_5MIN) -> None: + """Wait for LLMISVC to reach Ready condition. Raises on timeout.""" + llmisvc.wait_for_condition( + condition="Ready", + status="True", + timeout=timeout, + ) + LOGGER.info(f"LLMInferenceService {llmisvc.name} is Ready in namespace {llmisvc.namespace}") + + +def _get_inference_url(llmisvc: LLMInferenceService) -> str: + """Extract inference URL from LLMISVC status.""" + status = llmisvc.instance.status + if status and status.get("addresses"): + addresses = status["addresses"] + if addresses and addresses[0].get("url"): + return addresses[0]["url"] + if status and status.get("url"): + return status["url"] + return f"http://{llmisvc.name}.{llmisvc.namespace}.svc.cluster.local" + + +def _build_chat_body(model_name: str, prompt: str, max_tokens: int = 50) -> str: + """Build OpenAI chat completion request body.""" + return json.dumps({ + "model": model_name, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": max_tokens, + "temperature": 0.0, + "stream": False, + }) + + +def _resolve_ca_cert(client: DynamicClient) -> str: + """Get CA cert path for TLS verification. Returns path or empty string.""" + try: + return get_ca_bundle(client=client, deployment_mode="raw") + except Exception: # noqa: BLE001 + return "" + + +def _log_curl_command(url: str, body: str, token: bool, ca_cert: str | None) -> None: + """Log a human-readable curl command with token redacted and payload formatted.""" + formatted_body = json.dumps(json.loads(body), indent=2) + auth_header = "\n -H 'Authorization: Bearer ***REDACTED***'" if token else "" + tls_flag = f"\n --cacert {ca_cert}" if ca_cert else "\n --insecure" + LOGGER.info( + f"curl -s -X POST \\\n" + f" -H 'Content-Type: application/json' \\\n" + f" -H 'Accept: application/json' \\{auth_header}\n" + f" -d '{formatted_body}' \\{tls_flag}\n" + f" {url}" + ) - Checks for: - - Container restarts (restartCount > 0) - - Container waiting states with errors (ImagePullBackOff, CrashLoopBackOff, etc.) - - Container terminated states with errors - - Pod failures (CrashLoopBackOff, Failed phases) - - Pod readiness within timeout - Args: - client (DynamicClient): DynamicClient instance - llm_service (LLMInferenceService): The LLMInferenceService to check pods for - timeout (int): Timeout in seconds for pod readiness check - - Raises: - PodContainersRestartError: If any containers have restarted - FailedPodsError: If any pods are in failed state - TimeoutError: If pods don't become ready within timeout - """ - from ocp_resources.resource import Resource - - from utilities.exceptions import FailedPodsError - - LOGGER.info(f"Comprehensive health check for LLMInferenceService {llm_service.name}") - - container_wait_base_errors = ["InvalidImageName", "CrashLoopBackOff", "ImagePullBackOff", "ErrImagePull"] - container_terminated_base_errors = [Resource.Status.ERROR, "CrashLoopBackOff"] - - def get_llmd_pods(): - """Get LLMD workload pods for this LLMInferenceService.""" - pods = [] - for pod in Pod.get( - client=client, - namespace=llm_service.namespace, - label_selector=( - f"{Pod.ApiGroup.APP_KUBERNETES_IO}/part-of=llminferenceservice," - f"{Pod.ApiGroup.APP_KUBERNETES_IO}/name={llm_service.name}" - ), - ): - labels = pod.instance.metadata.get("labels", {}) - if labels.get("kserve.io/component") == "workload": - pods.append(pod) - return pods - - for pods in TimeoutSampler( - wait_timeout=timeout, - sleep=10, - func=get_llmd_pods, - ): - if not pods: - LOGGER.debug(f"No LLMD workload pods found for {llm_service.name} yet") - continue - - ready_pods = 0 - failed_pods: dict[str, Any] = {} - restarted_containers: dict[str, list[str]] = {} - for pod in pods: - for condition in pod.instance.status.conditions: - if condition.type == pod.Status.READY and condition.status == pod.Condition.Status.TRUE: - ready_pods += 1 - break - if ready_pods == len(pods): - LOGGER.info(f"All {len(pods)} LLMD pods are ready, performing health checks") - - for pod in pods: - pod_status = pod.instance.status - if pod_status.containerStatuses: - for container_status in pod_status.get("containerStatuses", []) + pod_status.get( - "initContainerStatuses", [] - ): - if hasattr(container_status, "restartCount") and container_status.restartCount > 0: - if pod.name not in restarted_containers: - restarted_containers[pod.name] = [] - restarted_containers[pod.name].append(container_status.name) - LOGGER.warning( - f"Container {container_status.name} in pod {pod.name} has restarted " - f"{container_status.restartCount} times" - ) - is_waiting_error = ( - wait_state := container_status.state.waiting - ) and wait_state.reason in container_wait_base_errors - - is_terminated_error = ( - terminate_state := container_status.state.terminated - ) and terminate_state.reason in container_terminated_base_errors - - if is_waiting_error or is_terminated_error: - failed_pods[pod.name] = pod_status - reason = wait_state.reason if is_waiting_error else terminate_state.reason - LOGGER.error( - f"Container {container_status.name} in pod {pod.name} has error state: {reason}" - ) - elif pod_status.phase in ( - pod.Status.CRASH_LOOPBACK_OFF, - pod.Status.FAILED, - ): - failed_pods[pod.name] = pod_status - LOGGER.error(f"Pod {pod.name} is in failed phase: {pod_status.phase}") - if restarted_containers: - error_msg = f"LLMD containers restarted for {llm_service.name}: {restarted_containers}" - LOGGER.error(error_msg) - raise PodContainersRestartError(error_msg) - - if failed_pods: - LOGGER.error(f"LLMD pods failed for {llm_service.name}: {list(failed_pods.keys())}") - raise FailedPodsError(pods=failed_pods) - - LOGGER.info(f"All LLMD pods for {llm_service.name} are healthy - no restarts or failures detected") - return - LOGGER.debug(f"LLMD pods status: {ready_pods}/{len(pods)} ready for {llm_service.name}") - raise TimeoutError(f"LLMD pods for {llm_service.name} did not become ready within {timeout} seconds") +def _curl_post( + url: str, body: str, token: str | None = None, ca_cert: str | None = None, timeout: int = 60 +) -> tuple[int, str]: + """POST to URL via curl. Returns (status_code, response_body).""" + cmd = [ + "curl", + "-s", + "-w", + "\n%{http_code}", + "-X", + "POST", + "-H", + "Content-Type: application/json", + "-H", + "Accept: application/json", + "-d", + body, + "--max-time", + str(timeout), + ] + if token: + cmd.extend(["-H", f"Authorization: Bearer {token}"]) + if ca_cert: + cmd.extend(["--cacert", ca_cert]) + else: + cmd.append("--insecure") + cmd.append(url) + + _log_curl_command(url=url, body=body, token=bool(token), ca_cert=ca_cert) + + _, stdout, stderr = run_command(command=cmd, verify_stderr=False, check=False, hide_log_command=True) + if not stdout.strip(): + raise ConnectionError(f"curl failed with no output: {stderr}") + + parts = stdout.rsplit("\n", 1) + response_body = parts[0] if len(parts) > 1 else "" + try: + status_code = int(parts[-1].strip()) + except ValueError: + status_code = 0 + return status_code, response_body + + +def _get_model_name(llmisvc: LLMInferenceService) -> str: + """Read model name from spec.model.name, falling back to the resource name.""" + return llmisvc.instance.spec.model.get("name", llmisvc.name) + + +def send_chat_completions( + llmisvc: LLMInferenceService, + prompt: str, + token: str | None = None, + insecure: bool = True, +) -> tuple[int, str]: + """Send a chat completion request. Returns (status_code, response_body).""" + url = _get_inference_url(llmisvc) + "/v1/chat/completions" + model_name = _get_model_name(llmisvc=llmisvc) + body = _build_chat_body(model_name=model_name, prompt=prompt) + ca_cert = None if insecure else _resolve_ca_cert(llmisvc.client) + + LOGGER.info(f"Sending inference request to {llmisvc.name} — URL: {url}, Model: {model_name}") + status_code, response_body = _curl_post(url=url, body=body, token=token, ca_cert=ca_cert) + LOGGER.info(f"Inference response — status={status_code}\n{response_body}") + return status_code, response_body + + +def parse_completion_text(response_body: str) -> str: + """Extract completion text from a chat completion response.""" + try: + data = json.loads(response_body) + return data["choices"][0]["message"]["content"] + except (json.JSONDecodeError, KeyError, IndexError, TypeError) as e: + raise ValueError(f"Failed to parse completion response: {e}\nBody: {response_body[:500]}") from e def get_llmd_workload_pods( @@ -252,198 +218,118 @@ def get_llmd_router_scheduler_pod( return None -def send_prefix_cache_test_requests( - llmisvc: LLMInferenceService, - token: str, - num_requests: int = 20, -) -> int: - """ - Send N identical requests to validate prefix cache. - - This function sends the same prompt multiple times to test cache affinity. - All requests after the first should hit the cache and route to the same pod. - - Args: - llmisvc: The LLMInferenceService to send requests to - token: Authentication token - num_requests: Number of identical requests to send (default 20) - - Returns: - int: Number of successful requests completed - """ - successful_requests = 0 - failed_requests = 0 - - # Single prompt to be cached - cached_prompt = ( - "Explain in detail the fundamental principles of quantum mechanics including " - "wave-particle duality, superposition, and entanglement in simple terms. " - "Additionally, describe how these quantum phenomena differ from classical physics " - "and why they are important for understanding the nature of reality at the atomic scale." - ) - - LOGGER.info(f"Sending {num_requests} identical requests to test prefix cache") - - for index in range(num_requests): - LOGGER.info(f"Sending request {index + 1}/{num_requests}") - inference_config = { - "default_query_model": { - "query_input": cached_prompt, - "query_output": r".*", - "use_regex": True, - }, - "chat_completions": TINYLLAMA_INFERENCE_CONFIG["chat_completions"], - } - - try: - verify_inference_response_llmd( - llm_service=llmisvc, - inference_config=inference_config, - inference_type="chat_completions", - protocol=Protocols.HTTPS, - use_default_query=True, - insecure=False, - model_name=llmisvc.instance.spec.model.name, - token=token, - authorized_user=True, - ) - successful_requests += 1 - except Exception as e: # noqa: BLE001 - LOGGER.error(f"Request {index + 1} failed: {e}") - failed_requests += 1 - - # Log statistics - LOGGER.info(f"{successful_requests}/{num_requests} requests completed successfully") - - return successful_requests - - -def get_successful_requests_by_pod( +def query_metric_by_pod( prometheus: Prometheus, + metric_name: str, llmisvc: LLMInferenceService, pods: list[Pod], ) -> dict[str, float]: - """ - Retrieves the total number of successful requests per pod. - - This function queries the `kserve_vllm:request_success_total` counter metric - from Prometheus for the specified inference service. - - Args: - prometheus: The Prometheus client instance. - llmisvc: The LLM Inference Service object to filter by. - pods: A list of pod names to include in the result. - - Returns: - dict[str, float]: A dictionary mapping pod names to their respective - total successful request counts. - """ - success_counts: dict[str, float] = {} - + """Query a Prometheus metric for each pod. Returns {pod_name: value}.""" + result: dict[str, float] = {} for pod in pods: - query = f'sum(kserve_vllm:request_success_total{{namespace="{llmisvc.namespace}",pod="{pod.name}"}})' - count = float(get_metrics_value(prometheus=prometheus, metrics_query=query) or 0) - success_counts[pod.name] = count - - return success_counts + query = f'sum({metric_name}{{namespace="{llmisvc.namespace}",pod="{pod.name}"}})' + result[pod.name] = float(get_metrics_value(prometheus=prometheus, metrics_query=query) or 0) + return result -def get_prefix_cache_hits_by_pod( +@retry(wait_timeout=90, sleep=30, exceptions_dict={AssertionError: []}, print_log=False) +def assert_prefix_cache_routing( prometheus: Prometheus, llmisvc: LLMInferenceService, pods: list[Pod], -) -> dict[str, float]: - """ - Retrieves the total number of prefix cache hits per pod. - - This function queries the `kserve_vllm:prefix_cache_hits_total` counter metric - from Prometheus for the specified inference service. + expected_requests: int, + block_size: int = 64, +) -> bool: + """Assert all traffic routed to 1 pod with correct cache hits. Retries for metric delay.""" + requests = query_metric_by_pod( + prometheus=prometheus, + metric_name="kserve_vllm:request_success_total", + llmisvc=llmisvc, + pods=pods, + ) + LOGGER.info(f"Request count by pod: {requests}") - Args: - prometheus: The Prometheus client instance. - llmisvc: The LLM Inference Service object to filter by. - pods: A list of pod names to include in the result. + pods_with_traffic = [p for p, count in requests.items() if count > 0] + assert len(pods_with_traffic) == 1, f"Expected traffic on exactly 1 pod, got {len(pods_with_traffic)}: {requests}" - Returns: - dict[str, float]: A dictionary mapping pod names to their respective - total prefix cache hit counts. - """ - cache_hits: dict[str, float] = {} + active_pod = pods_with_traffic[0] + assert requests[active_pod] == expected_requests, ( + f"Expected {expected_requests} requests on '{active_pod}', got {requests[active_pod]}" + ) - for pod in pods: - query = f'sum(kserve_vllm:prefix_cache_hits_total{{namespace="{llmisvc.namespace}",pod="{pod.name}"}})' - count = float(get_metrics_value(prometheus=prometheus, metrics_query=query) or 0) - cache_hits[pod.name] = count + hits = query_metric_by_pod( + prometheus=prometheus, + metric_name="kserve_vllm:prefix_cache_hits_total", + llmisvc=llmisvc, + pods=pods, + ) + LOGGER.info(f"Prefix cache hits by pod: {hits}") - return cache_hits + expected_hits = (expected_requests - 1) * block_size + assert hits[active_pod] == expected_hits, ( + f"Expected {expected_hits} cache hits on '{active_pod}', got {hits[active_pod]}" + ) + return True @retry(wait_timeout=90, sleep=30, exceptions_dict={AssertionError: []}, print_log=False) -def verify_estimated_prefix_cache( - prometheus: Prometheus, - llmisvc: LLMInferenceService, - workload_pods: list[Pod], - expected_requests: int, -) -> bool: - """ - Verify that the Estimated Prefix Cache is working correctly via metric assertions. +def assert_scheduler_routing(router_pod: Pod, min_decisions: int) -> bool: + """Assert scheduler made enough routing decisions. Retries for log propagation.""" + logs = get_scheduler_decision_logs(router_scheduler_pod=router_pod) + assert len(logs) >= min_decisions, f"Expected >= {min_decisions} scheduler decisions, got {len(logs)}" + return True - This function polls Prometheus to assess two key behaviors: - 1. all traffic was routed to a single pod - 2. the number of prefix cache hits matches - Retries for up to 90s to allow for metric scraping latency. +def send_prefix_cache_requests( + llmisvc: LLMInferenceService, + prompt: str, + token: str, + count: int = 20, + min_ratio: float = 0.8, +) -> int: + """Send identical requests for prefix cache testing. Returns success count.""" + LOGGER.info(f"Sending {count} identical requests to test prefix cache") + successful = 0 + for i in range(count): + try: + status, _ = send_chat_completions(llmisvc=llmisvc, prompt=prompt, token=token, insecure=False) + if status == 200: + successful += 1 + except Exception: + LOGGER.exception(f"Request {i + 1}/{count} failed") + LOGGER.info(f"{successful}/{count} requests succeeded") + assert successful >= count * min_ratio, f"Too many failures: {successful}/{count} (need {min_ratio * 100}%)" + return successful + + +def get_scheduler_decision_logs( + router_scheduler_pod: Pod, + lookback_seconds: int = 600, +) -> list[dict]: + """ + Retrieve scheduling decision logs from the router-scheduler pod. Args: - prometheus: Prometheus client. - llmisvc: Target Inference Service. - workload_pods: List of serving pods. - expected_requests: Total expected request count. + router_scheduler_pod: The router-scheduler Pod object + lookback_seconds: How far back to look in logs (default: 600s = 10 minutes) Returns: - bool: True if verification succeeds (required by @retry decorator). - - Raises: - AssertionError: If validation fails after the retry timeout. + list[dict]: List of parsed JSON log entries containing scheduler decisions """ - LOGGER.info("Checking Estimated Prefix Cache logic...") + LOGGER.info(f"Retrieving logs from scheduler pod {router_scheduler_pod.name}") - # 1. Verify all traffic is routed to a single pod - request_counts = get_successful_requests_by_pod( - prometheus=prometheus, - llmisvc=llmisvc, - pods=workload_pods, - ) - LOGGER.info(f"Request count by pod: {request_counts}") - - # All requests must be routed to exactly one pod (prefix cache affinity). - # This assertion works regardless of the number of pods in the deployment. - pods_with_traffic = [pod for pod, count in request_counts.items() if count > 0] - assert len(pods_with_traffic) == 1, ( - f"Expected all traffic to be routed to exactly 1 pod, but {len(pods_with_traffic)} pods received traffic. " - f"Distribution: {request_counts}" - ) + # Get all logs from the scheduler pod + # Note: The router-scheduler container is the default/main container + raw_logs = router_scheduler_pod.log() - active_pod = pods_with_traffic[0] - assert request_counts[active_pod] == expected_requests, ( - f"Expected {expected_requests} requests on the active pod '{active_pod}', but got {request_counts[active_pod]}" - ) + # Target decision message + target_decision_msg = "Selecting pods from candidates sorted by max score" - # 2. Verify Prefix Cache Hits on the active pod - # The first request warms the cache, subsequent requests should hit it. - cache_hit_counts = get_prefix_cache_hits_by_pod( - prometheus=prometheus, - llmisvc=llmisvc, - pods=workload_pods, - ) - LOGGER.info(f"Prefix cache hits by pod: {cache_hit_counts}") + # Filtering logs + filtered_logs = "\n".join(line for line in raw_logs.splitlines() if target_decision_msg in line) - # Logic: (N-1) requests * Block Size - expected_hits = (expected_requests - 1) * PREFIX_CACHE_BLOCK_SIZE + # Parsing as json + json_logs = [json.loads(line) for line in filtered_logs.splitlines()] - assert cache_hit_counts[active_pod] == expected_hits, ( - f"Cache hit mismatch on active pod '{active_pod}'. " - f"Expected {expected_hits} hits, got {cache_hit_counts[active_pod]}" - ) - - return True + LOGGER.info(f"Retrieved {len(json_logs)} logs from router-scheduler pod") + return json_logs diff --git a/utilities/constants.py b/utilities/constants.py index 579b9629e..3dba03230 100644 --- a/utilities/constants.py +++ b/utilities/constants.py @@ -314,6 +314,7 @@ class S3: class HuggingFace: TINYLLAMA: str = "hf://TinyLlama/TinyLlama-1.1B-Chat-v1.0" OPT125M: str = "hf://facebook/opt-125m" + QWEN_7B_INSTRUCT: str = "hf://Qwen/Qwen2.5-7B-Instruct" class OCIRegistry: diff --git a/utilities/llmd_constants.py b/utilities/llmd_constants.py index 2707e7233..a3a5d7532 100644 --- a/utilities/llmd_constants.py +++ b/utilities/llmd_constants.py @@ -39,6 +39,7 @@ class ModelStorage: S3_QWEN: str = SharedModelStorage.S3.QWEN_7B_INSTRUCT HF_TINYLLAMA: str = SharedModelStorage.HuggingFace.TINYLLAMA HF_OPT125M: str = SharedModelStorage.HuggingFace.OPT125M + HF_QWEN_7B_INSTRUCT: str = SharedModelStorage.HuggingFace.QWEN_7B_INSTRUCT class ContainerImages: diff --git a/utilities/llmd_utils.py b/utilities/llmd_utils.py index 4b78fd17a..108f93029 100644 --- a/utilities/llmd_utils.py +++ b/utilities/llmd_utils.py @@ -1,29 +1,21 @@ """Utilities for LLM Deployment (LLMD) resources.""" -import json -import re -import shlex from collections.abc import Generator from contextlib import contextmanager -from string import Template from typing import Any from kubernetes.dynamic import DynamicClient from ocp_resources.gateway import Gateway from ocp_resources.llm_inference_service import LLMInferenceService -from pyhelper_utils.shell import run_command from simple_logger.logger import get_logger -from timeout_sampler import TimeoutWatch, retry +from timeout_sampler import TimeoutWatch -from utilities.certificates_utils import get_ca_bundle -from utilities.constants import HTTPRequest, Timeout -from utilities.exceptions import InferenceResponseError +from utilities.constants import Timeout from utilities.infra import get_services_by_isvc_label from utilities.llmd_constants import ( ContainerImages, KServeGateway, LLMDGateway, - LLMEndpoint, ) LOGGER = get_logger(name=__name__) @@ -396,340 +388,3 @@ def get_llm_inference_url(llm_service: LLMInferenceService) -> str: fallback_url = f"http://{llm_service.name}.{llm_service.namespace}.svc.cluster.local" LOGGER.debug(f"Using fallback URL for {llm_service.name}: {fallback_url}") return fallback_url - - -def verify_inference_response_llmd( - llm_service: LLMInferenceService, - inference_config: dict[str, Any], - inference_type: str, - protocol: str, - model_name: str | None = None, - inference_input: Any | None = None, - use_default_query: bool = False, - expected_response_text: str | None = None, - insecure: bool = False, - token: str | None = None, - authorized_user: bool | None = None, -) -> None: - """ - Verify the LLM inference response following the pattern of verify_inference_response. - - Args: - llm_service: LLMInferenceService resource to test - inference_config: Inference configuration dictionary - inference_type: Type of inference ('infer', 'streaming', etc.) - protocol: Protocol to use ('http', 'grpc') - model_name: Name of the model (defaults to service name) - inference_input: Input for inference (optional) - use_default_query: Whether to use default query from config - expected_response_text: Expected response text for validation - insecure: Whether to use insecure connections - token: Authentication token (optional) - authorized_user: Whether user should be authorized (optional) - - Raises: - InferenceResponseError: If inference response is invalid - ValueError: If inference response validation fails - """ - - model_name = model_name or llm_service.name - inference = LLMUserInference( - llm_service=llm_service, - inference_config=inference_config, - inference_type=inference_type, - protocol=protocol, - ) - - res = inference.run_inference_flow( - model_name=model_name, - inference_input=inference_input, - use_default_query=use_default_query, - token=token, - insecure=insecure, - ) - - if authorized_user is False: - _validate_unauthorized_response(res=res, token=token, inference=inference) - else: - _validate_authorized_response( - res=res, - inference=inference, - inference_config=inference_config, - inference_type=inference_type, - expected_response_text=expected_response_text, - use_default_query=use_default_query, - model_name=model_name, - ) - - -class LLMUserInference: - """ - LLM-specific inference handler following the pattern of UserInference. - """ - - STREAMING = "streaming" - INFER = "infer" - - def __init__( - self, - llm_service: LLMInferenceService, - inference_config: dict[str, Any], - inference_type: str, - protocol: str, - ) -> None: - self.llm_service = llm_service - self.inference_config = inference_config - self.inference_type = inference_type - self.protocol = protocol - self.runtime_config = self.get_runtime_config() - - def get_runtime_config(self) -> dict[str, Any]: - """Get runtime config from inference config based on inference type and protocol.""" - if inference_type_config := self.inference_config.get(self.inference_type): - protocol = "http" if self.protocol.lower() in ["http", "https"] else self.protocol - if data := inference_type_config.get(protocol): - return data - else: - raise ValueError(f"Protocol {protocol} not supported for inference type {self.inference_type}") - else: - raise ValueError(f"Inference type {self.inference_type} not supported in config") - - @property - def inference_response_text_key_name(self) -> str | None: - """Get inference response text key name from runtime config.""" - return self.runtime_config.get("response_fields_map", {}).get("response_output") - - @property - def inference_response_key_name(self) -> str: - """Get inference response key name from runtime config.""" - return self.runtime_config.get("response_fields_map", {}).get("response", "output") - - def get_inference_body( - self, - model_name: str, - inference_input: Any | None = None, - use_default_query: bool = False, - ) -> str: - """Get inference body for LLM request.""" - if not use_default_query and inference_input is None: - raise ValueError("Either pass `inference_input` or set `use_default_query` to True") - - if use_default_query: - default_query_config = self.inference_config.get("default_query_model") - if not default_query_config: - raise ValueError(f"Missing default query config for {model_name}") - - if self.inference_config.get("support_multi_default_queries"): - query_config = default_query_config.get(self.inference_type) - if not query_config: - raise ValueError(f"Missing default query for inference type {self.inference_type}") - query_input = query_config.get("query_input", "") - else: - query_input = default_query_config.get("query_input", "") - - # Use the proper JSON body template from runtime config - body_template = self.runtime_config.get("body", "") - if body_template: - # Use template substitution for both model name and query input - template = Template(template=body_template) - body = template.safe_substitute(model_name=model_name, query_input=query_input) - else: - # Fallback to plain text (legacy behavior) - template = Template(template=query_input) - body = template.safe_substitute(model_name=model_name) - else: - # For custom input, create OpenAI-compatible format - if isinstance(inference_input, str): - body = json.dumps({ - "model": model_name, - "messages": [{"role": "user", "content": inference_input}], - "max_tokens": 100, - "temperature": 0.0, - }) - else: - body = json.dumps(inference_input) - - return body - - def generate_command( - self, - model_name: str, - inference_input: str | None = None, - use_default_query: bool = False, - insecure: bool = False, - token: str | None = None, - ) -> str: - """Generate curl command string for LLM inference.""" - base_url = get_llm_inference_url(llm_service=self.llm_service) - endpoint_url = f"{base_url}{LLMEndpoint.CHAT_COMPLETIONS}" - - body = self.get_inference_body( - model_name=model_name, - inference_input=inference_input, - use_default_query=use_default_query, - ) - - header = HTTPRequest.CONTENT_JSON.replace("-H ", "") - cmd_exec = "curl -i -s" - cmd = f"{cmd_exec} -X POST -d '{body}' -H {header} -H 'Accept: application/json'" - - if token: - cmd += f" {HTTPRequest.AUTH_HEADER.format(token=token)}" - - if insecure: - cmd += " --insecure" - else: - try: - from ocp_resources.resource import get_client - - client = get_client() - ca_bundle = get_ca_bundle(client=client) - if ca_bundle: - cmd += f" --cacert {ca_bundle}" - else: - cmd += " --insecure" - except Exception: # noqa: BLE001 - cmd += " --insecure" - - cmd += f" --max-time {LLMEndpoint.DEFAULT_TIMEOUT} {endpoint_url}" - return cmd - - @retry(wait_timeout=Timeout.TIMEOUT_30SEC, sleep=5) - def run_inference( - self, - model_name: str, - inference_input: str | None = None, - use_default_query: bool = False, - insecure: bool = False, - token: str | None = None, - ) -> str: - """Run inference command and return raw output.""" - cmd = self.generate_command( - model_name=model_name, - inference_input=inference_input, - use_default_query=use_default_query, - insecure=insecure, - token=token, - ) - - res, out, err = run_command(command=shlex.split(cmd), verify_stderr=False, check=False) - if res: - return out - raise ValueError(f"Inference failed with error: {err}\nOutput: {out}\nCommand: {cmd}") - - def run_inference_flow( - self, - model_name: str, - inference_input: str | None = None, - use_default_query: bool = False, - insecure: bool = False, - token: str | None = None, - ) -> dict[str, Any]: - """Run LLM inference using the same high-level flow as inference_utils.""" - out = self.run_inference( - model_name=model_name, - inference_input=inference_input, - use_default_query=use_default_query, - insecure=insecure, - token=token, - ) - return {"output": out} - - -def _validate_unauthorized_response(res: dict[str, Any], token: str | None, inference: LLMUserInference) -> None: - """Validate response for unauthorized users.""" - auth_header = "x-ext-auth-reason" - - if auth_reason := re.search(rf"{auth_header}: (.*)", res["output"], re.MULTILINE): - reason = auth_reason.group(1).lower() - - if token: - assert re.search(r"not (?:authenticated|authorized)", reason) - else: - assert "credential not found" in reason - else: - forbidden_patterns = ["Forbidden", "401", "403", "Unauthorized"] - output = res["output"] - - if any(pattern in output for pattern in forbidden_patterns): - return - - raise ValueError(f"Auth header {auth_header} not found in response. Response: {output}") - - -def _validate_authorized_response( - res: dict[str, Any], - inference: LLMUserInference, - inference_config: dict[str, Any], - inference_type: str, - expected_response_text: str | None, - use_default_query: bool, - model_name: str, -) -> None: - """Validate response for authorized users.""" - - use_regex = False - - if use_default_query: - expected_response_text_config = inference_config.get("default_query_model", {}) - use_regex = expected_response_text_config.get("use_regex", False) - - if not expected_response_text_config: - raise ValueError(f"Missing default_query_model config for inference {inference_config}") - - if inference_config.get("support_multi_default_queries"): - query_config = expected_response_text_config.get(inference_type) - if not query_config: - raise ValueError(f"Missing default_query_model config for inference type {inference_type}") - expected_response_text = query_config.get("query_output", "") - use_regex = query_config.get("use_regex", False) - else: - expected_response_text = expected_response_text_config.get("query_output") - - if not expected_response_text: - raise ValueError(f"Missing response text key for inference {inference_config}") - - if isinstance(expected_response_text, str): - expected_response_text = Template(template=expected_response_text).safe_substitute(model_name=model_name) - elif isinstance(expected_response_text, dict): - response_output = expected_response_text.get("response_output") - if response_output is not None: - expected_response_text = Template(template=response_output).safe_substitute(model_name=model_name) - if inference.inference_response_text_key_name: - if inference_type == inference.STREAMING: - if output := re.findall( - rf"{inference.inference_response_text_key_name}\": \"(.*)\"", - res[inference.inference_response_key_name], - re.MULTILINE, - ): - assert "".join(output) == expected_response_text, ( - f"Expected: {expected_response_text} does not match response: {output}" - ) - elif inference_type == inference.INFER or use_regex: - formatted_res = json.dumps(res[inference.inference_response_text_key_name]).replace(" ", "") - if use_regex and expected_response_text is not None: - assert re.search(expected_response_text, formatted_res), ( - f"Expected: {expected_response_text} not found in: {formatted_res}" - ) - else: - formatted_res = json.dumps(res[inference.inference_response_key_name]).replace(" ", "") - assert formatted_res == expected_response_text, ( - f"Expected: {expected_response_text} does not match output: {formatted_res}" - ) - else: - response = res[inference.inference_response_key_name] - if isinstance(response, list): - response = response[0] - - if isinstance(response, dict): - response_text = response[inference.inference_response_text_key_name] - assert response_text == expected_response_text, ( - f"Expected: {expected_response_text} does not match response: {response_text}" - ) - else: - raise InferenceResponseError( - "Inference response output does not match expected output format." - f"Expected: {expected_response_text}.\nResponse: {res}" - ) - else: - raise InferenceResponseError(f"Inference response output not found in response. Response: {res}")