|
| 1 | +from typing import Any, Generator |
| 2 | + |
| 3 | +import pytest |
| 4 | +from _pytest.fixtures import FixtureRequest |
| 5 | +from kubernetes.dynamic import DynamicClient |
| 6 | +from ocp_resources.inference_service import InferenceService |
| 7 | +from ocp_resources.namespace import Namespace |
| 8 | +from ocp_resources.secret import Secret |
| 9 | +from ocp_resources.service_account import ServiceAccount |
| 10 | +from ocp_resources.serving_runtime import ServingRuntime |
| 11 | +from simple_logger.logger import get_logger |
| 12 | +from tests.model_serving.model_runtime.vllm.utils import validate_supported_quantization_schema |
| 13 | +from tests.model_serving.model_runtime.vllm.constant import ACCELERATOR_IDENTIFIER, PREDICT_RESOURCES, TEMPLATE_MAP |
| 14 | +from utilities.manifests.vllm import VLLM_INFERENCE_CONFIG |
| 15 | +from utilities.manifests.onnx import ONNX_INFERENCE_CONFIG |
| 16 | + |
| 17 | +from utilities.constants import ( |
| 18 | + KServeDeploymentType, |
| 19 | + RuntimeTemplates, |
| 20 | + Labels, |
| 21 | +) |
| 22 | +from tests.model_serving.model_server.utils import ( |
| 23 | + run_concurrent_load_for_keda_scaling, |
| 24 | +) |
| 25 | +from utilities.constants import ( |
| 26 | + ModelAndFormat, |
| 27 | +) |
| 28 | +from utilities.inference_utils import create_isvc |
| 29 | +from utilities.serving_runtime import ServingRuntimeFromTemplate |
| 30 | +from utilities.constants import THANOS_QUERIER_ADDRESS |
| 31 | +from syrupy.extensions.json import JSONSnapshotExtension |
| 32 | + |
| 33 | +LOGGER = get_logger(name=__name__) |
| 34 | + |
| 35 | + |
| 36 | +def create_keda_auto_scaling_config( |
| 37 | + query: str, |
| 38 | + target_value: str, |
| 39 | +) -> dict[str, Any]: |
| 40 | + """Create KEDA auto-scaling configuration for inference services. |
| 41 | +
|
| 42 | + Args: |
| 43 | + query: The Prometheus query to use for scaling |
| 44 | + model_name: Name of the model |
| 45 | + namespace: Kubernetes namespace |
| 46 | + target_value: Target value for the metric |
| 47 | +
|
| 48 | + Returns: |
| 49 | + dict: Auto-scaling configuration |
| 50 | + """ |
| 51 | + return { |
| 52 | + "metrics": [ |
| 53 | + { |
| 54 | + "type": "External", |
| 55 | + "external": { |
| 56 | + "metric": { |
| 57 | + "backend": "prometheus", |
| 58 | + "serverAddress": THANOS_QUERIER_ADDRESS, |
| 59 | + "query": query, |
| 60 | + }, |
| 61 | + "target": {"type": "Value", "value": target_value}, |
| 62 | + "authenticationRef": { |
| 63 | + "authModes": "bearer", |
| 64 | + "authenticationRef": { |
| 65 | + "name": "inference-prometheus-auth", |
| 66 | + }, |
| 67 | + }, |
| 68 | + }, |
| 69 | + } |
| 70 | + ] |
| 71 | + } |
| 72 | + |
| 73 | + |
| 74 | +@pytest.fixture(scope="class") |
| 75 | +def vllm_cuda_serving_runtime( |
| 76 | + request: FixtureRequest, |
| 77 | + admin_client: DynamicClient, |
| 78 | + model_namespace: Namespace, |
| 79 | + supported_accelerator_type: str, |
| 80 | + vllm_runtime_image: str, |
| 81 | +) -> Generator[ServingRuntime, None, None]: |
| 82 | + template_name = TEMPLATE_MAP.get(supported_accelerator_type.lower(), RuntimeTemplates.VLLM_CUDA) |
| 83 | + with ServingRuntimeFromTemplate( |
| 84 | + client=admin_client, |
| 85 | + name="vllm-runtime", |
| 86 | + namespace=model_namespace.name, |
| 87 | + template_name=template_name, |
| 88 | + deployment_type=request.param["deployment_type"], |
| 89 | + runtime_image=vllm_runtime_image, |
| 90 | + support_tgis_open_ai_endpoints=True, |
| 91 | + ) as model_runtime: |
| 92 | + yield model_runtime |
| 93 | + |
| 94 | + |
| 95 | +@pytest.fixture(scope="class") |
| 96 | +def stressed_keda_vllm_inference_service( |
| 97 | + request: FixtureRequest, |
| 98 | + admin_client: DynamicClient, |
| 99 | + model_namespace: Namespace, |
| 100 | + vllm_cuda_serving_runtime: ServingRuntime, |
| 101 | + supported_accelerator_type: str, |
| 102 | + s3_models_storage_uri: str, |
| 103 | + model_service_account: ServiceAccount, |
| 104 | +) -> Generator[InferenceService, Any, Any]: |
| 105 | + isvc_kwargs = { |
| 106 | + "client": admin_client, |
| 107 | + "name": request.param["name"], |
| 108 | + "namespace": model_namespace.name, |
| 109 | + "runtime": vllm_cuda_serving_runtime.name, |
| 110 | + "storage_uri": s3_models_storage_uri, |
| 111 | + "model_format": vllm_cuda_serving_runtime.instance.spec.supportedModelFormats[0].name, |
| 112 | + "model_service_account": model_service_account.name, |
| 113 | + "deployment_mode": request.param.get("deployment_mode", KServeDeploymentType.RAW_DEPLOYMENT), |
| 114 | + "autoscaler_mode": "keda", |
| 115 | + "external_route": True, |
| 116 | + } |
| 117 | + accelerator_type = supported_accelerator_type.lower() |
| 118 | + gpu_count = request.param.get("gpu_count") |
| 119 | + timeout = request.param.get("timeout") |
| 120 | + identifier = ACCELERATOR_IDENTIFIER.get(accelerator_type, Labels.Nvidia.NVIDIA_COM_GPU) |
| 121 | + resources: Any = PREDICT_RESOURCES["resources"] |
| 122 | + resources["requests"][identifier] = gpu_count |
| 123 | + resources["limits"][identifier] = gpu_count |
| 124 | + isvc_kwargs["resources"] = resources |
| 125 | + if timeout: |
| 126 | + isvc_kwargs["timeout"] = timeout |
| 127 | + if gpu_count > 1: |
| 128 | + isvc_kwargs["volumes"] = PREDICT_RESOURCES["volumes"] |
| 129 | + isvc_kwargs["volumes_mounts"] = PREDICT_RESOURCES["volume_mounts"] |
| 130 | + if arguments := request.param.get("runtime_argument"): |
| 131 | + arguments = [ |
| 132 | + arg |
| 133 | + for arg in arguments |
| 134 | + if not (arg.startswith("--tensor-parallel-size") or arg.startswith("--quantization")) |
| 135 | + ] |
| 136 | + arguments.append(f"--tensor-parallel-size={gpu_count}") |
| 137 | + if quantization := request.param.get("quantization"): |
| 138 | + validate_supported_quantization_schema(q_type=quantization) |
| 139 | + arguments.append(f"--quantization={quantization}") |
| 140 | + isvc_kwargs["argument"] = arguments |
| 141 | + |
| 142 | + isvc_kwargs["min_replicas"] = request.param.get("initial_pod_count") |
| 143 | + isvc_kwargs["max_replicas"] = request.param.get("final_pod_count") |
| 144 | + |
| 145 | + isvc_kwargs["auto_scaling"] = create_keda_auto_scaling_config( |
| 146 | + query=request.param.get("metrics_query"), |
| 147 | + model_name=request.param["name"], |
| 148 | + namespace=model_namespace.name, |
| 149 | + target_value=str(request.param.get("metrics_threshold")), |
| 150 | + ) |
| 151 | + |
| 152 | + with create_isvc(**isvc_kwargs) as isvc: |
| 153 | + isvc.wait_for_condition(condition=isvc.Condition.READY, status="True") |
| 154 | + run_concurrent_load_for_keda_scaling( |
| 155 | + isvc=isvc, |
| 156 | + inference_config=VLLM_INFERENCE_CONFIG, |
| 157 | + response_snapshot=response_snapshot, |
| 158 | + ) |
| 159 | + yield isvc |
| 160 | + |
| 161 | + |
| 162 | +@pytest.fixture(scope="class") |
| 163 | +def stressed_ovms_keda_inference_service( |
| 164 | + request: FixtureRequest, |
| 165 | + unprivileged_client: DynamicClient, |
| 166 | + unprivileged_model_namespace: Namespace, |
| 167 | + ovms_kserve_serving_runtime: ServingRuntime, |
| 168 | + models_endpoint_s3_secret: Secret, |
| 169 | +) -> Generator[InferenceService, Any, Any]: |
| 170 | + model_name = f"{request.param['name']}-raw" |
| 171 | + with create_isvc( |
| 172 | + client=unprivileged_client, |
| 173 | + name=model_name, |
| 174 | + namespace=unprivileged_model_namespace.name, |
| 175 | + external_route=True, |
| 176 | + runtime=ovms_kserve_serving_runtime.name, |
| 177 | + storage_path=request.param["model-dir"], |
| 178 | + storage_key=models_endpoint_s3_secret.name, |
| 179 | + model_format=ModelAndFormat.OPENVINO_IR, |
| 180 | + deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT, |
| 181 | + model_version=request.param["model-version"], |
| 182 | + min_replicas=request.param["initial_pod_count"], |
| 183 | + max_replicas=request.param["final_pod_count"], |
| 184 | + autoscaler_mode="keda", |
| 185 | + auto_scaling=create_keda_auto_scaling_config( |
| 186 | + query=request.param["metrics_query"], |
| 187 | + model_name=model_name, |
| 188 | + namespace=unprivileged_model_namespace.name, |
| 189 | + target_value=str(request.param["metrics_threshold"]), |
| 190 | + ), |
| 191 | + ) as isvc: |
| 192 | + isvc.wait_for_condition(condition=isvc.Condition.READY, status="True") |
| 193 | + run_concurrent_load_for_keda_scaling( |
| 194 | + isvc=isvc, |
| 195 | + inference_config=ONNX_INFERENCE_CONFIG, |
| 196 | + ) |
| 197 | + yield isvc |
| 198 | + |
| 199 | + |
| 200 | +@pytest.fixture(scope="session") |
| 201 | +def skip_if_no_supported_gpu_type(supported_accelerator_type: str) -> None: |
| 202 | + if not supported_accelerator_type: |
| 203 | + pytest.skip("Accelartor type is not provide,vLLM test can not be run on CPU") |
| 204 | + |
| 205 | + |
| 206 | +@pytest.fixture |
| 207 | +def response_snapshot(snapshot: Any) -> Any: |
| 208 | + return snapshot.use_extension(extension_class=JSONSnapshotExtension) |
0 commit comments