Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ markers =
model_server_gpu: Mark tests which are testing model server with GPU resources
gpu: Mark tests which require GPU resources
multinode: Mark tests which require multiple nodes
keda: Mark tests which are testing KEDA scaling

addopts =
-s
Expand Down
208 changes: 208 additions & 0 deletions tests/model_serving/model_server/keda/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
from typing import Any, Generator

import pytest
from _pytest.fixtures import FixtureRequest
from kubernetes.dynamic import DynamicClient
from ocp_resources.inference_service import InferenceService
from ocp_resources.namespace import Namespace
from ocp_resources.secret import Secret
from ocp_resources.service_account import ServiceAccount
from ocp_resources.serving_runtime import ServingRuntime
from simple_logger.logger import get_logger
from tests.model_serving.model_runtime.vllm.utils import validate_supported_quantization_schema
from tests.model_serving.model_runtime.vllm.constant import ACCELERATOR_IDENTIFIER, PREDICT_RESOURCES, TEMPLATE_MAP
from utilities.manifests.vllm import VLLM_INFERENCE_CONFIG
from utilities.manifests.onnx import ONNX_INFERENCE_CONFIG

from utilities.constants import (
KServeDeploymentType,
RuntimeTemplates,
Labels,
)
from tests.model_serving.model_server.utils import (
run_concurrent_load_for_keda_scaling,
)
from utilities.constants import (
ModelAndFormat,
)
Comment thread
mwaykole marked this conversation as resolved.
from utilities.inference_utils import create_isvc
from utilities.serving_runtime import ServingRuntimeFromTemplate
from utilities.constants import THANOS_QUERIER_ADDRESS
from syrupy.extensions.json import JSONSnapshotExtension

LOGGER = get_logger(name=__name__)


def create_keda_auto_scaling_config(
query: str,
target_value: str,
) -> dict[str, Any]:
"""Create KEDA auto-scaling configuration for inference services.

Args:
query: The Prometheus query to use for scaling
model_name: Name of the model
namespace: Kubernetes namespace
target_value: Target value for the metric

Comment thread
mwaykole marked this conversation as resolved.
Returns:
dict: Auto-scaling configuration
"""
return {
"metrics": [
{
"type": "External",
"external": {
"metric": {
"backend": "prometheus",
"serverAddress": THANOS_QUERIER_ADDRESS,
"query": query,
},
"target": {"type": "Value", "value": target_value},
"authenticationRef": {
"authModes": "bearer",
"authenticationRef": {
"name": "inference-prometheus-auth",
},
},
},
}
]
}


@pytest.fixture(scope="class")
def vllm_cuda_serving_runtime(
request: FixtureRequest,
admin_client: DynamicClient,
model_namespace: Namespace,
supported_accelerator_type: str,
vllm_runtime_image: str,
) -> Generator[ServingRuntime, None, None]:
template_name = TEMPLATE_MAP.get(supported_accelerator_type.lower(), RuntimeTemplates.VLLM_CUDA)
with ServingRuntimeFromTemplate(
client=admin_client,
name="vllm-runtime",
namespace=model_namespace.name,
template_name=template_name,
deployment_type=request.param["deployment_type"],
runtime_image=vllm_runtime_image,
support_tgis_open_ai_endpoints=True,
) as model_runtime:
yield model_runtime


@pytest.fixture(scope="class")
def stressed_keda_vllm_inference_service(
request: FixtureRequest,
admin_client: DynamicClient,
model_namespace: Namespace,
vllm_cuda_serving_runtime: ServingRuntime,
supported_accelerator_type: str,
s3_models_storage_uri: str,
model_service_account: ServiceAccount,
) -> Generator[InferenceService, Any, Any]:
isvc_kwargs = {
"client": admin_client,
"name": request.param["name"],
"namespace": model_namespace.name,
"runtime": vllm_cuda_serving_runtime.name,
"storage_uri": s3_models_storage_uri,
"model_format": vllm_cuda_serving_runtime.instance.spec.supportedModelFormats[0].name,
"model_service_account": model_service_account.name,
"deployment_mode": request.param.get("deployment_mode", KServeDeploymentType.RAW_DEPLOYMENT),
"autoscaler_mode": "keda",
"external_route": True,
}
accelerator_type = supported_accelerator_type.lower()
gpu_count = request.param.get("gpu_count")
timeout = request.param.get("timeout")
identifier = ACCELERATOR_IDENTIFIER.get(accelerator_type, Labels.Nvidia.NVIDIA_COM_GPU)
resources: Any = PREDICT_RESOURCES["resources"]
resources["requests"][identifier] = gpu_count
resources["limits"][identifier] = gpu_count
isvc_kwargs["resources"] = resources
if timeout:
isvc_kwargs["timeout"] = timeout
if gpu_count > 1:
isvc_kwargs["volumes"] = PREDICT_RESOURCES["volumes"]
isvc_kwargs["volumes_mounts"] = PREDICT_RESOURCES["volume_mounts"]
if arguments := request.param.get("runtime_argument"):
arguments = [
arg
for arg in arguments
if not (arg.startswith("--tensor-parallel-size") or arg.startswith("--quantization"))
]
arguments.append(f"--tensor-parallel-size={gpu_count}")
if quantization := request.param.get("quantization"):
validate_supported_quantization_schema(q_type=quantization)
arguments.append(f"--quantization={quantization}")
isvc_kwargs["argument"] = arguments

isvc_kwargs["min_replicas"] = request.param.get("initial_pod_count")
isvc_kwargs["max_replicas"] = request.param.get("final_pod_count")

isvc_kwargs["auto_scaling"] = create_keda_auto_scaling_config(
query=request.param.get("metrics_query"),
model_name=request.param["name"],
namespace=model_namespace.name,
target_value=str(request.param.get("metrics_threshold")),
)
Comment thread
mwaykole marked this conversation as resolved.

with create_isvc(**isvc_kwargs) as isvc:
isvc.wait_for_condition(condition=isvc.Condition.READY, status="True")
run_concurrent_load_for_keda_scaling(
isvc=isvc,
inference_config=VLLM_INFERENCE_CONFIG,
response_snapshot=response_snapshot,
)
yield isvc


@pytest.fixture(scope="class")
def stressed_ovms_keda_inference_service(
request: FixtureRequest,
unprivileged_client: DynamicClient,
unprivileged_model_namespace: Namespace,
ovms_kserve_serving_runtime: ServingRuntime,
models_endpoint_s3_secret: Secret,
) -> Generator[InferenceService, Any, Any]:
model_name = f"{request.param['name']}-raw"
with create_isvc(
client=unprivileged_client,
name=model_name,
namespace=unprivileged_model_namespace.name,
external_route=True,
runtime=ovms_kserve_serving_runtime.name,
storage_path=request.param["model-dir"],
storage_key=models_endpoint_s3_secret.name,
model_format=ModelAndFormat.OPENVINO_IR,
deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT,
model_version=request.param["model-version"],
min_replicas=request.param["initial_pod_count"],
max_replicas=request.param["final_pod_count"],
autoscaler_mode="keda",
auto_scaling=create_keda_auto_scaling_config(
query=request.param["metrics_query"],
model_name=model_name,
namespace=unprivileged_model_namespace.name,
target_value=str(request.param["metrics_threshold"]),
),
) as isvc:
isvc.wait_for_condition(condition=isvc.Condition.READY, status="True")
run_concurrent_load_for_keda_scaling(
isvc=isvc,
inference_config=ONNX_INFERENCE_CONFIG,
)
yield isvc


@pytest.fixture(scope="session")
def skip_if_no_supported_gpu_type(supported_accelerator_type: str) -> None:
if not supported_accelerator_type:
pytest.skip("Accelartor type is not provide,vLLM test can not be run on CPU")


@pytest.fixture
def response_snapshot(snapshot: Any) -> Any:
return snapshot.use_extension(extension_class=JSONSnapshotExtension)
102 changes: 102 additions & 0 deletions tests/model_serving/model_server/keda/test_isvc_keda_scaling_cpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import pytest
from simple_logger.logger import get_logger
from typing import Any, Generator
from kubernetes.dynamic import DynamicClient
from ocp_resources.namespace import Namespace
from ocp_resources.inference_service import InferenceService
from tests.model_serving.model_server.utils import verify_keda_scaledobject, verify_final_pod_count
from tests.model_serving.model_runtime.vllm.constant import BASE_RAW_DEPLOYMENT_CONFIG
from tests.model_serving.model_runtime.vllm.basic_model_deployment.test_granite_7b_starter import SERVING_ARGUMENT
Comment thread
mwaykole marked this conversation as resolved.
from utilities.constants import ModelFormat, ModelVersion, RunTimeConfigs
from utilities.monitoring import validate_metrics_field

LOGGER = get_logger(name=__name__)


BASE_RAW_DEPLOYMENT_CONFIG["runtime_argument"] = SERVING_ARGUMENT
Comment thread
mwaykole marked this conversation as resolved.

INITIAL_POD_COUNT = 1
FINAL_POD_COUNT = 5

OVMS_MODEL_NAMESPACE = "ovms-keda"
OVMS_MODEL_NAME = "onnx-raw"
OVMS_METRICS_QUERY = (
f"sum by (name) (rate(ovms_inference_time_us_sum{{"
f"namespace='{OVMS_MODEL_NAMESPACE}', name='{OVMS_MODEL_NAME}'"
f"}}[5m])) / "
f"sum by (name) (rate(ovms_inference_time_us_count{{"
f"namespace='{OVMS_MODEL_NAMESPACE}', name='{OVMS_MODEL_NAME}'"
f"}}[5m]))"
)
OVMS_METRICS_THRESHOLD = 200

pytestmark = [pytest.mark.keda, pytest.mark.usefixtures("valid_aws_config")]


@pytest.mark.parametrize(
"unprivileged_model_namespace, ovms_kserve_serving_runtime, stressed_ovms_keda_inference_service",
[
pytest.param(
{"name": "ovms-keda"},
RunTimeConfigs.ONNX_OPSET13_RUNTIME_CONFIG,
{
"name": ModelFormat.ONNX,
"model-version": ModelVersion.OPSET13,
"model-dir": "test-dir",
"initial_pod_count": INITIAL_POD_COUNT,
"final_pod_count": FINAL_POD_COUNT,
"metrics_query": OVMS_METRICS_QUERY,
"metrics_threshold": OVMS_METRICS_THRESHOLD,
},
)
],
indirect=True,
)
class TestOVMSKedaScaling:
Comment thread
VedantMahabaleshwarkar marked this conversation as resolved.
"""
Test Keda functionality for a cpu based inference service.
This class verifies pod scaling, metrics availability, and the creation of a keda scaled object.
"""

def test_ovms_keda_scaling_verify_scaledobject(
self,
unprivileged_model_namespace: Namespace,
unprivileged_client: DynamicClient,
ovms_kserve_serving_runtime,
stressed_ovms_keda_inference_service: Generator[InferenceService, Any, Any],
):
verify_keda_scaledobject(
client=unprivileged_client,
isvc=stressed_ovms_keda_inference_service,
expected_trigger_type="prometheus",
expected_query=OVMS_METRICS_QUERY,
expected_threshold=OVMS_METRICS_THRESHOLD,
)
Comment on lines +68 to +74
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix type mismatch in expected_threshold parameter.

The verify_keda_scaledobject function expects expected_threshold as a string, but you're passing an integer. This will cause the assertion to fail.

-            expected_threshold=OVMS_METRICS_THRESHOLD,
+            expected_threshold=str(OVMS_METRICS_THRESHOLD),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
verify_keda_scaledobject(
client=unprivileged_client,
isvc=stressed_ovms_keda_inference_service,
expected_trigger_type="prometheus",
expected_query=OVMS_METRICS_QUERY,
expected_threshold=OVMS_METRICS_THRESHOLD,
)
verify_keda_scaledobject(
client=unprivileged_client,
isvc=stressed_ovms_keda_inference_service,
expected_trigger_type="prometheus",
expected_query=OVMS_METRICS_QUERY,
expected_threshold=str(OVMS_METRICS_THRESHOLD),
)
🤖 Prompt for AI Agents
In tests/model_serving/model_server/keda/test_isvc_keda_scaling_cpu.py around
lines 60 to 66, the expected_threshold parameter passed to
verify_keda_scaledobject is an integer, but the function expects a string.
Convert the OVMS_METRICS_THRESHOLD value to a string before passing it as
expected_threshold to fix the type mismatch and prevent assertion failures.


def test_ovms_keda_scaling_verify_metrics(
self,
unprivileged_model_namespace: Namespace,
unprivileged_client: DynamicClient,
ovms_kserve_serving_runtime,
stressed_ovms_keda_inference_service: Generator[InferenceService, Any, Any],
prometheus,
):
validate_metrics_field(
prometheus=prometheus,
metrics_query=OVMS_METRICS_QUERY,
expected_value=str(OVMS_METRICS_THRESHOLD),
greater_than=True,
)

def test_ovms_keda_scaling_verify_final_pod_count(
self,
unprivileged_model_namespace: Namespace,
unprivileged_client: DynamicClient,
ovms_kserve_serving_runtime,
stressed_ovms_keda_inference_service: Generator[InferenceService, Any, Any],
):
verify_final_pod_count(
unprivileged_client=unprivileged_client,
isvc=stressed_ovms_keda_inference_service,
final_pod_count=FINAL_POD_COUNT,
)
Loading