Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ markers =
model_server_gpu: Mark tests which are testing model server with GPU resources
gpu: Mark tests which require GPU resources
multinode: Mark tests which require multiple nodes
keda: Mark tests which are testing KEDA scaling

addopts =
-s
Expand Down
201 changes: 201 additions & 0 deletions tests/model_serving/model_server/keda/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
from typing import Any, Generator

import pytest
from _pytest.fixtures import FixtureRequest
from kubernetes.dynamic import DynamicClient
from ocp_resources.inference_service import InferenceService
from ocp_resources.namespace import Namespace
from ocp_resources.secret import Secret
from ocp_resources.service_account import ServiceAccount
from ocp_resources.serving_runtime import ServingRuntime
from simple_logger.logger import get_logger
from tests.model_serving.model_runtime.vllm.utils import validate_supported_quantization_schema
from tests.model_serving.model_runtime.vllm.constant import ACCELERATOR_IDENTIFIER, PREDICT_RESOURCES, TEMPLATE_MAP

from utilities.constants import (
KServeDeploymentType,
RuntimeTemplates,
Labels,
)
from tests.model_serving.model_server.utils import (
run_vllm_concurrent_load,
run_ovms_concurrent_load,
)
from utilities.constants import (
ModelAndFormat,
)
Comment thread
mwaykole marked this conversation as resolved.
from utilities.inference_utils import create_isvc
from utilities.serving_runtime import ServingRuntimeFromTemplate
from utilities.constants import THANOS_QUERIER_ADDRESS
from syrupy.extensions.json import JSONSnapshotExtension

LOGGER = get_logger(name=__name__)


def create_keda_auto_scaling_config(
query: str,
target_value: str,
) -> dict[str, Any]:
"""Create KEDA auto-scaling configuration for inference services.

Args:
query: The Prometheus query to use for scaling
model_name: Name of the model
namespace: Kubernetes namespace
target_value: Target value for the metric

Comment thread
mwaykole marked this conversation as resolved.
Returns:
dict: Auto-scaling configuration
"""
return {
"metrics": [
{
"type": "External",
"external": {
"metric": {
"backend": "prometheus",
"serverAddress": THANOS_QUERIER_ADDRESS,
"query": query,
},
"target": {"type": "Value", "value": target_value},
"authenticationRef": {
"authModes": "bearer",
"authenticationRef": {
"name": "inference-prometheus-auth",
},
},
},
}
]
}


@pytest.fixture(scope="class")
def vllm_serving_runtime(
Comment thread
VedantMahabaleshwarkar marked this conversation as resolved.
Outdated
request: FixtureRequest,
admin_client: DynamicClient,
model_namespace: Namespace,
supported_accelerator_type: str,
vllm_runtime_image: str,
) -> Generator[ServingRuntime, None, None]:
accelerator_type = supported_accelerator_type.lower()
template_name = TEMPLATE_MAP.get(accelerator_type, RuntimeTemplates.VLLM_CUDA)
Comment thread
VedantMahabaleshwarkar marked this conversation as resolved.
Outdated
with ServingRuntimeFromTemplate(
client=admin_client,
name="vllm-runtime",
namespace=model_namespace.name,
template_name=template_name,
deployment_type=request.param["deployment_type"],
runtime_image=vllm_runtime_image,
support_tgis_open_ai_endpoints=True,
) as model_runtime:
yield model_runtime


@pytest.fixture(scope="class")
def stressed_keda_vllm_inference_service(
request: FixtureRequest,
admin_client: DynamicClient,
model_namespace: Namespace,
vllm_serving_runtime: ServingRuntime,
supported_accelerator_type: str,
s3_models_storage_uri: str,
model_service_account: ServiceAccount,
) -> Generator[InferenceService, Any, Any]:
isvc_kwargs = {
"client": admin_client,
"name": request.param["name"],
"namespace": model_namespace.name,
"runtime": vllm_serving_runtime.name,
"storage_uri": s3_models_storage_uri,
"model_format": vllm_serving_runtime.instance.spec.supportedModelFormats[0].name,
"model_service_account": model_service_account.name,
"deployment_mode": request.param.get("deployment_mode", KServeDeploymentType.RAW_DEPLOYMENT),
"autoscaler_mode": "keda",
"external_route": True,
}
accelerator_type = supported_accelerator_type.lower()
gpu_count = request.param.get("gpu_count")
timeout = request.param.get("timeout")
identifier = ACCELERATOR_IDENTIFIER.get(accelerator_type, Labels.Nvidia.NVIDIA_COM_GPU)
resources: Any = PREDICT_RESOURCES["resources"]
resources["requests"][identifier] = gpu_count
resources["limits"][identifier] = gpu_count
isvc_kwargs["resources"] = resources
if timeout:
isvc_kwargs["timeout"] = timeout
if gpu_count > 1:
isvc_kwargs["volumes"] = PREDICT_RESOURCES["volumes"]
isvc_kwargs["volumes_mounts"] = PREDICT_RESOURCES["volume_mounts"]
if arguments := request.param.get("runtime_argument"):
arguments = [
arg
for arg in arguments
if not (arg.startswith("--tensor-parallel-size") or arg.startswith("--quantization"))
]
arguments.append(f"--tensor-parallel-size={gpu_count}")
if quantization := request.param.get("quantization"):
validate_supported_quantization_schema(q_type=quantization)
arguments.append(f"--quantization={quantization}")
isvc_kwargs["argument"] = arguments

isvc_kwargs["min_replicas"] = request.param.get("initial_pod_count")
isvc_kwargs["max_replicas"] = request.param.get("final_pod_count")

isvc_kwargs["auto_scaling"] = create_keda_auto_scaling_config(
query=request.param.get("metrics_query"),
model_name=request.param["name"],
namespace=model_namespace.name,
target_value=str(request.param.get("metrics_threshold")),
)
Comment thread
mwaykole marked this conversation as resolved.

with create_isvc(**isvc_kwargs) as isvc:
isvc.wait_for_condition(condition=isvc.Condition.READY, status="True")
run_vllm_concurrent_load(isvc=isvc, response_snapshot=response_snapshot)
Comment thread
mwaykole marked this conversation as resolved.
Outdated
yield isvc


@pytest.fixture(scope="class")
def stressed_ovms_keda_inference_service(
request: FixtureRequest,
unprivileged_client: DynamicClient,
unprivileged_model_namespace: Namespace,
ovms_kserve_serving_runtime: ServingRuntime,
models_endpoint_s3_secret: Secret,
) -> Generator[InferenceService, Any, Any]:
model_name = f"{request.param['name']}-raw"
with create_isvc(
client=unprivileged_client,
name=model_name,
namespace=unprivileged_model_namespace.name,
external_route=True,
runtime=ovms_kserve_serving_runtime.name,
storage_path=request.param["model-dir"],
storage_key=models_endpoint_s3_secret.name,
model_format=ModelAndFormat.OPENVINO_IR,
deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT,
model_version=request.param["model-version"],
min_replicas=request.param["initial_pod_count"],
max_replicas=request.param["final_pod_count"],
autoscaler_mode="keda",
auto_scaling=create_keda_auto_scaling_config(
query=request.param["metrics_query"],
model_name=model_name,
namespace=unprivileged_model_namespace.name,
target_value=str(request.param["metrics_threshold"]),
),
) as isvc:
isvc.wait_for_condition(condition=isvc.Condition.READY, status="True")
run_ovms_concurrent_load(isvc=isvc)
yield isvc


@pytest.fixture(scope="session")
def skip_if_no_supported_gpu_type(supported_accelerator_type: str) -> None:
if not supported_accelerator_type:
pytest.skip("Accelartor type is not provide,vLLM test can not be run on CPU")


@pytest.fixture
def response_snapshot(snapshot: Any) -> Any:
return snapshot.use_extension(extension_class=JSONSnapshotExtension)
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import pytest
from simple_logger.logger import get_logger
from typing import Any, Generator
from kubernetes.dynamic import DynamicClient
from ocp_resources.namespace import Namespace
from ocp_resources.inference_service import InferenceService
from tests.model_serving.model_server.utils import verify_keda_scaledobject, verify_final_pod_count
from tests.model_serving.model_runtime.vllm.constant import BASE_RAW_DEPLOYMENT_CONFIG
from tests.model_serving.model_runtime.vllm.basic_model_deployment.test_granite_7b_starter import SERVING_ARGUMENT
Comment thread
mwaykole marked this conversation as resolved.
from utilities.constants import ModelFormat, ModelVersion, RunTimeConfigs
from utilities.monitoring import validate_metrics_field

LOGGER = get_logger(name=__name__)


BASE_RAW_DEPLOYMENT_CONFIG["runtime_argument"] = SERVING_ARGUMENT
Comment thread
mwaykole marked this conversation as resolved.

INITIAL_POD_COUNT = 1
FINAL_POD_COUNT = 5

OVMS_MODEL_NAMESPACE = "ovms-keda"
OVMS_MODEL_NAME = "onnx-raw"
OVMS_METRICS_QUERY = (
f"sum by (name) (rate(ovms_inference_time_us_sum{{"
f"namespace='{OVMS_MODEL_NAMESPACE}', name='{OVMS_MODEL_NAME}'"
f"}}[5m])) / "
f"sum by (name) (rate(ovms_inference_time_us_count{{"
f"namespace='{OVMS_MODEL_NAMESPACE}', name='{OVMS_MODEL_NAME}'"
f"}}[5m]))"
)
OVMS_METRICS_THRESHOLD = 200

pytestmark = [pytest.mark.keda, pytest.mark.usefixtures("valid_aws_config")]


@pytest.mark.parametrize(
"unprivileged_model_namespace, ovms_kserve_serving_runtime, stressed_ovms_keda_inference_service",
[
pytest.param(
{"name": "ovms-keda"},
RunTimeConfigs.ONNX_OPSET13_RUNTIME_CONFIG,
{
"name": ModelFormat.ONNX,
"model-version": ModelVersion.OPSET13,
"model-dir": "test-dir",
"initial_pod_count": INITIAL_POD_COUNT,
"final_pod_count": FINAL_POD_COUNT,
"metrics_query": OVMS_METRICS_QUERY,
"metrics_threshold": OVMS_METRICS_THRESHOLD,
},
)
],
indirect=True,
)
class TestOVMSKedaScaling:
Comment thread
VedantMahabaleshwarkar marked this conversation as resolved.
def test_ovms_keda_scaling_verify_scaledobject(
self,
unprivileged_model_namespace: Namespace,
unprivileged_client: DynamicClient,
ovms_kserve_serving_runtime,
stressed_ovms_keda_inference_service: Generator[InferenceService, Any, Any],
):
verify_keda_scaledobject(
client=unprivileged_client,
isvc=stressed_ovms_keda_inference_service,
expected_trigger_type="prometheus",
expected_query=OVMS_METRICS_QUERY,
expected_threshold=OVMS_METRICS_THRESHOLD,
)
Comment on lines +68 to +74
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix type mismatch in expected_threshold parameter.

The verify_keda_scaledobject function expects expected_threshold as a string, but you're passing an integer. This will cause the assertion to fail.

-            expected_threshold=OVMS_METRICS_THRESHOLD,
+            expected_threshold=str(OVMS_METRICS_THRESHOLD),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
verify_keda_scaledobject(
client=unprivileged_client,
isvc=stressed_ovms_keda_inference_service,
expected_trigger_type="prometheus",
expected_query=OVMS_METRICS_QUERY,
expected_threshold=OVMS_METRICS_THRESHOLD,
)
verify_keda_scaledobject(
client=unprivileged_client,
isvc=stressed_ovms_keda_inference_service,
expected_trigger_type="prometheus",
expected_query=OVMS_METRICS_QUERY,
expected_threshold=str(OVMS_METRICS_THRESHOLD),
)
🤖 Prompt for AI Agents
In tests/model_serving/model_server/keda/test_isvc_keda_scaling_cpu.py around
lines 60 to 66, the expected_threshold parameter passed to
verify_keda_scaledobject is an integer, but the function expects a string.
Convert the OVMS_METRICS_THRESHOLD value to a string before passing it as
expected_threshold to fix the type mismatch and prevent assertion failures.


def test_ovms_keda_scaling_verify_metrics(
self,
unprivileged_model_namespace: Namespace,
unprivileged_client: DynamicClient,
ovms_kserve_serving_runtime,
stressed_ovms_keda_inference_service: Generator[InferenceService, Any, Any],
prometheus,
):
validate_metrics_field(
prometheus=prometheus,
metrics_query=OVMS_METRICS_QUERY,
expected_value=str(OVMS_METRICS_THRESHOLD),
greater_than=True,
)

def test_ovms_keda_scaling_verify_final_pod_count(
self,
unprivileged_model_namespace: Namespace,
unprivileged_client: DynamicClient,
ovms_kserve_serving_runtime,
stressed_ovms_keda_inference_service: Generator[InferenceService, Any, Any],
):
verify_final_pod_count(
unprivileged_client=unprivileged_client,
isvc=stressed_ovms_keda_inference_service,
final_pod_count=FINAL_POD_COUNT,
)
Loading