diff --git a/pytest.ini b/pytest.ini index 126cb22ed..7adaeb83b 100644 --- a/pytest.ini +++ b/pytest.ini @@ -30,6 +30,7 @@ markers = model_server_gpu: Mark tests which are testing model server with GPU resources gpu: Mark tests which require GPU resources multinode: Mark tests which require multiple nodes + keda: Mark tests which are testing KEDA scaling addopts = -s diff --git a/tests/model_serving/model_server/keda/conftest.py b/tests/model_serving/model_server/keda/conftest.py new file mode 100644 index 000000000..936047374 --- /dev/null +++ b/tests/model_serving/model_server/keda/conftest.py @@ -0,0 +1,208 @@ +from typing import Any, Generator + +import pytest +from _pytest.fixtures import FixtureRequest +from kubernetes.dynamic import DynamicClient +from ocp_resources.inference_service import InferenceService +from ocp_resources.namespace import Namespace +from ocp_resources.secret import Secret +from ocp_resources.service_account import ServiceAccount +from ocp_resources.serving_runtime import ServingRuntime +from simple_logger.logger import get_logger +from tests.model_serving.model_runtime.vllm.utils import validate_supported_quantization_schema +from tests.model_serving.model_runtime.vllm.constant import ACCELERATOR_IDENTIFIER, PREDICT_RESOURCES, TEMPLATE_MAP +from utilities.manifests.vllm import VLLM_INFERENCE_CONFIG +from utilities.manifests.onnx import ONNX_INFERENCE_CONFIG + +from utilities.constants import ( + KServeDeploymentType, + RuntimeTemplates, + Labels, +) +from tests.model_serving.model_server.utils import ( + run_concurrent_load_for_keda_scaling, +) +from utilities.constants import ( + ModelAndFormat, +) +from utilities.inference_utils import create_isvc +from utilities.serving_runtime import ServingRuntimeFromTemplate +from utilities.constants import THANOS_QUERIER_ADDRESS +from syrupy.extensions.json import JSONSnapshotExtension + +LOGGER = get_logger(name=__name__) + + +def create_keda_auto_scaling_config( + query: str, + target_value: str, +) -> dict[str, Any]: + """Create KEDA auto-scaling configuration for inference services. + + Args: + query: The Prometheus query to use for scaling + model_name: Name of the model + namespace: Kubernetes namespace + target_value: Target value for the metric + + Returns: + dict: Auto-scaling configuration + """ + return { + "metrics": [ + { + "type": "External", + "external": { + "metric": { + "backend": "prometheus", + "serverAddress": THANOS_QUERIER_ADDRESS, + "query": query, + }, + "target": {"type": "Value", "value": target_value}, + "authenticationRef": { + "authModes": "bearer", + "authenticationRef": { + "name": "inference-prometheus-auth", + }, + }, + }, + } + ] + } + + +@pytest.fixture(scope="class") +def vllm_cuda_serving_runtime( + request: FixtureRequest, + admin_client: DynamicClient, + model_namespace: Namespace, + supported_accelerator_type: str, + vllm_runtime_image: str, +) -> Generator[ServingRuntime, None, None]: + template_name = TEMPLATE_MAP.get(supported_accelerator_type.lower(), RuntimeTemplates.VLLM_CUDA) + with ServingRuntimeFromTemplate( + client=admin_client, + name="vllm-runtime", + namespace=model_namespace.name, + template_name=template_name, + deployment_type=request.param["deployment_type"], + runtime_image=vllm_runtime_image, + support_tgis_open_ai_endpoints=True, + ) as model_runtime: + yield model_runtime + + +@pytest.fixture(scope="class") +def stressed_keda_vllm_inference_service( + request: FixtureRequest, + admin_client: DynamicClient, + model_namespace: Namespace, + vllm_cuda_serving_runtime: ServingRuntime, + supported_accelerator_type: str, + s3_models_storage_uri: str, + model_service_account: ServiceAccount, +) -> Generator[InferenceService, Any, Any]: + isvc_kwargs = { + "client": admin_client, + "name": request.param["name"], + "namespace": model_namespace.name, + "runtime": vllm_cuda_serving_runtime.name, + "storage_uri": s3_models_storage_uri, + "model_format": vllm_cuda_serving_runtime.instance.spec.supportedModelFormats[0].name, + "model_service_account": model_service_account.name, + "deployment_mode": request.param.get("deployment_mode", KServeDeploymentType.RAW_DEPLOYMENT), + "autoscaler_mode": "keda", + "external_route": True, + } + accelerator_type = supported_accelerator_type.lower() + gpu_count = request.param.get("gpu_count") + timeout = request.param.get("timeout") + identifier = ACCELERATOR_IDENTIFIER.get(accelerator_type, Labels.Nvidia.NVIDIA_COM_GPU) + resources: Any = PREDICT_RESOURCES["resources"] + resources["requests"][identifier] = gpu_count + resources["limits"][identifier] = gpu_count + isvc_kwargs["resources"] = resources + if timeout: + isvc_kwargs["timeout"] = timeout + if gpu_count > 1: + isvc_kwargs["volumes"] = PREDICT_RESOURCES["volumes"] + isvc_kwargs["volumes_mounts"] = PREDICT_RESOURCES["volume_mounts"] + if arguments := request.param.get("runtime_argument"): + arguments = [ + arg + for arg in arguments + if not (arg.startswith("--tensor-parallel-size") or arg.startswith("--quantization")) + ] + arguments.append(f"--tensor-parallel-size={gpu_count}") + if quantization := request.param.get("quantization"): + validate_supported_quantization_schema(q_type=quantization) + arguments.append(f"--quantization={quantization}") + isvc_kwargs["argument"] = arguments + + isvc_kwargs["min_replicas"] = request.param.get("initial_pod_count") + isvc_kwargs["max_replicas"] = request.param.get("final_pod_count") + + isvc_kwargs["auto_scaling"] = create_keda_auto_scaling_config( + query=request.param.get("metrics_query"), + model_name=request.param["name"], + namespace=model_namespace.name, + target_value=str(request.param.get("metrics_threshold")), + ) + + with create_isvc(**isvc_kwargs) as isvc: + isvc.wait_for_condition(condition=isvc.Condition.READY, status="True") + run_concurrent_load_for_keda_scaling( + isvc=isvc, + inference_config=VLLM_INFERENCE_CONFIG, + response_snapshot=response_snapshot, + ) + yield isvc + + +@pytest.fixture(scope="class") +def stressed_ovms_keda_inference_service( + request: FixtureRequest, + unprivileged_client: DynamicClient, + unprivileged_model_namespace: Namespace, + ovms_kserve_serving_runtime: ServingRuntime, + models_endpoint_s3_secret: Secret, +) -> Generator[InferenceService, Any, Any]: + model_name = f"{request.param['name']}-raw" + with create_isvc( + client=unprivileged_client, + name=model_name, + namespace=unprivileged_model_namespace.name, + external_route=True, + runtime=ovms_kserve_serving_runtime.name, + storage_path=request.param["model-dir"], + storage_key=models_endpoint_s3_secret.name, + model_format=ModelAndFormat.OPENVINO_IR, + deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT, + model_version=request.param["model-version"], + min_replicas=request.param["initial_pod_count"], + max_replicas=request.param["final_pod_count"], + autoscaler_mode="keda", + auto_scaling=create_keda_auto_scaling_config( + query=request.param["metrics_query"], + model_name=model_name, + namespace=unprivileged_model_namespace.name, + target_value=str(request.param["metrics_threshold"]), + ), + ) as isvc: + isvc.wait_for_condition(condition=isvc.Condition.READY, status="True") + run_concurrent_load_for_keda_scaling( + isvc=isvc, + inference_config=ONNX_INFERENCE_CONFIG, + ) + yield isvc + + +@pytest.fixture(scope="session") +def skip_if_no_supported_gpu_type(supported_accelerator_type: str) -> None: + if not supported_accelerator_type: + pytest.skip("Accelartor type is not provide,vLLM test can not be run on CPU") + + +@pytest.fixture +def response_snapshot(snapshot: Any) -> Any: + return snapshot.use_extension(extension_class=JSONSnapshotExtension) diff --git a/tests/model_serving/model_server/keda/test_isvc_keda_scaling_cpu.py b/tests/model_serving/model_server/keda/test_isvc_keda_scaling_cpu.py new file mode 100644 index 000000000..38758935c --- /dev/null +++ b/tests/model_serving/model_server/keda/test_isvc_keda_scaling_cpu.py @@ -0,0 +1,102 @@ +import pytest +from simple_logger.logger import get_logger +from typing import Any, Generator +from kubernetes.dynamic import DynamicClient +from ocp_resources.namespace import Namespace +from ocp_resources.inference_service import InferenceService +from tests.model_serving.model_server.utils import verify_keda_scaledobject, verify_final_pod_count +from tests.model_serving.model_runtime.vllm.constant import BASE_RAW_DEPLOYMENT_CONFIG +from tests.model_serving.model_runtime.vllm.basic_model_deployment.test_granite_7b_starter import SERVING_ARGUMENT +from utilities.constants import ModelFormat, ModelVersion, RunTimeConfigs +from utilities.monitoring import validate_metrics_field + +LOGGER = get_logger(name=__name__) + + +BASE_RAW_DEPLOYMENT_CONFIG["runtime_argument"] = SERVING_ARGUMENT + +INITIAL_POD_COUNT = 1 +FINAL_POD_COUNT = 5 + +OVMS_MODEL_NAMESPACE = "ovms-keda" +OVMS_MODEL_NAME = "onnx-raw" +OVMS_METRICS_QUERY = ( + f"sum by (name) (rate(ovms_inference_time_us_sum{{" + f"namespace='{OVMS_MODEL_NAMESPACE}', name='{OVMS_MODEL_NAME}'" + f"}}[5m])) / " + f"sum by (name) (rate(ovms_inference_time_us_count{{" + f"namespace='{OVMS_MODEL_NAMESPACE}', name='{OVMS_MODEL_NAME}'" + f"}}[5m]))" +) +OVMS_METRICS_THRESHOLD = 200 + +pytestmark = [pytest.mark.keda, pytest.mark.usefixtures("valid_aws_config")] + + +@pytest.mark.parametrize( + "unprivileged_model_namespace, ovms_kserve_serving_runtime, stressed_ovms_keda_inference_service", + [ + pytest.param( + {"name": "ovms-keda"}, + RunTimeConfigs.ONNX_OPSET13_RUNTIME_CONFIG, + { + "name": ModelFormat.ONNX, + "model-version": ModelVersion.OPSET13, + "model-dir": "test-dir", + "initial_pod_count": INITIAL_POD_COUNT, + "final_pod_count": FINAL_POD_COUNT, + "metrics_query": OVMS_METRICS_QUERY, + "metrics_threshold": OVMS_METRICS_THRESHOLD, + }, + ) + ], + indirect=True, +) +class TestOVMSKedaScaling: + """ + Test Keda functionality for a cpu based inference service. + This class verifies pod scaling, metrics availability, and the creation of a keda scaled object. + """ + + def test_ovms_keda_scaling_verify_scaledobject( + self, + unprivileged_model_namespace: Namespace, + unprivileged_client: DynamicClient, + ovms_kserve_serving_runtime, + stressed_ovms_keda_inference_service: Generator[InferenceService, Any, Any], + ): + verify_keda_scaledobject( + client=unprivileged_client, + isvc=stressed_ovms_keda_inference_service, + expected_trigger_type="prometheus", + expected_query=OVMS_METRICS_QUERY, + expected_threshold=OVMS_METRICS_THRESHOLD, + ) + + def test_ovms_keda_scaling_verify_metrics( + self, + unprivileged_model_namespace: Namespace, + unprivileged_client: DynamicClient, + ovms_kserve_serving_runtime, + stressed_ovms_keda_inference_service: Generator[InferenceService, Any, Any], + prometheus, + ): + validate_metrics_field( + prometheus=prometheus, + metrics_query=OVMS_METRICS_QUERY, + expected_value=str(OVMS_METRICS_THRESHOLD), + greater_than=True, + ) + + def test_ovms_keda_scaling_verify_final_pod_count( + self, + unprivileged_model_namespace: Namespace, + unprivileged_client: DynamicClient, + ovms_kserve_serving_runtime, + stressed_ovms_keda_inference_service: Generator[InferenceService, Any, Any], + ): + verify_final_pod_count( + unprivileged_client=unprivileged_client, + isvc=stressed_ovms_keda_inference_service, + final_pod_count=FINAL_POD_COUNT, + ) diff --git a/tests/model_serving/model_server/keda/test_isvc_keda_scaling_gpu.py b/tests/model_serving/model_server/keda/test_isvc_keda_scaling_gpu.py new file mode 100644 index 000000000..abf7c34d5 --- /dev/null +++ b/tests/model_serving/model_server/keda/test_isvc_keda_scaling_gpu.py @@ -0,0 +1,99 @@ +import pytest +from simple_logger.logger import get_logger +from typing import Any, Generator +from kubernetes.dynamic import DynamicClient +from ocp_resources.namespace import Namespace +from ocp_resources.inference_service import InferenceService +from utilities.constants import KServeDeploymentType +from tests.model_serving.model_server.utils import verify_keda_scaledobject, verify_final_pod_count +from tests.model_serving.model_runtime.vllm.constant import BASE_RAW_DEPLOYMENT_CONFIG +from tests.model_serving.model_runtime.vllm.basic_model_deployment.test_granite_7b_starter import ( + SERVING_ARGUMENT, + MODEL_PATH, +) +from utilities.monitoring import validate_metrics_field + +LOGGER = get_logger(name=__name__) + + +BASE_RAW_DEPLOYMENT_CONFIG["runtime_argument"] = SERVING_ARGUMENT + +INITIAL_POD_COUNT = 1 +FINAL_POD_COUNT = 5 + +VLLM_MODEL_NAME = "granite-vllm-keda" +VLLM_METRICS_QUERY_REQUESTS = f'vllm:num_requests_running{{namespace="{VLLM_MODEL_NAME}",pod=~"{VLLM_MODEL_NAME}.*"}}' +VLLM_METRICS_THRESHOLD_REQUESTS = 4 + +pytestmark = [pytest.mark.keda, pytest.mark.usefixtures("skip_if_no_supported_gpu_type", "valid_aws_config")] + + +@pytest.mark.parametrize( + "model_namespace, s3_models_storage_uri, vllm_cuda_serving_runtime, stressed_keda_vllm_inference_service", + [ + pytest.param( + {"name": VLLM_MODEL_NAME}, + {"model-dir": MODEL_PATH}, + {"deployment_type": KServeDeploymentType.RAW_DEPLOYMENT}, + { + **BASE_RAW_DEPLOYMENT_CONFIG, + "gpu_count": 1, + "name": VLLM_MODEL_NAME, + "initial_pod_count": INITIAL_POD_COUNT, + "final_pod_count": FINAL_POD_COUNT, + "metrics_query": VLLM_METRICS_QUERY_REQUESTS, + "metrics_threshold": VLLM_METRICS_THRESHOLD_REQUESTS, + }, + id=f"{VLLM_MODEL_NAME}-single-gpu", + ), + ], + indirect=True, +) +class TestVllmKedaScaling: + """ + Test Keda functionality for a gpu based inference service. + This class verifies pod scaling, metrics availability, and the creation of a keda scaled object. + """ + + def test_vllm_keda_scaling_verify_scaledobject( + self, + unprivileged_model_namespace: Namespace, + vllm_cuda_serving_runtime, + unprivileged_client: DynamicClient, + stressed_keda_vllm_inference_service: Generator[InferenceService, Any, Any], + ): + verify_keda_scaledobject( + client=unprivileged_client, + isvc=stressed_keda_vllm_inference_service, + expected_trigger_type="prometheus", + expected_query=VLLM_METRICS_QUERY_REQUESTS, + expected_threshold=VLLM_METRICS_THRESHOLD_REQUESTS, + ) + + def test_vllm_keda_scaling_verify_metrics( + self, + unprivileged_model_namespace: Namespace, + unprivileged_client: DynamicClient, + vllm_cuda_serving_runtime, + stressed_keda_vllm_inference_service: Generator[InferenceService, Any, Any], + prometheus, + ): + validate_metrics_field( + prometheus=prometheus, + metrics_query=VLLM_METRICS_QUERY_REQUESTS, + expected_value=str(VLLM_METRICS_THRESHOLD_REQUESTS), + greater_than=True, + ) + + def test_vllm_keda_scaling_verify_final_pod_count( + self, + unprivileged_model_namespace: Namespace, + unprivileged_client: DynamicClient, + vllm_cuda_serving_runtime, + stressed_keda_vllm_inference_service: Generator[InferenceService, Any, Any], + ): + verify_final_pod_count( + unprivileged_client=unprivileged_client, + isvc=stressed_keda_vllm_inference_service, + final_pod_count=FINAL_POD_COUNT, + ) diff --git a/tests/model_serving/model_server/serverless/test_concurrency_auto_scale.py b/tests/model_serving/model_server/serverless/test_concurrency_auto_scale.py index bb2fdf448..d17f841e1 100644 --- a/tests/model_serving/model_server/serverless/test_concurrency_auto_scale.py +++ b/tests/model_serving/model_server/serverless/test_concurrency_auto_scale.py @@ -1,6 +1,6 @@ import pytest -from tests.model_serving.model_server.serverless.utils import ( +from tests.model_serving.model_server.utils import ( inference_service_pods_sampler, ) from utilities.constants import ( diff --git a/tests/model_serving/model_server/serverless/utils.py b/tests/model_serving/model_server/serverless/utils.py index 9a85ce88d..b8d4ab544 100644 --- a/tests/model_serving/model_server/serverless/utils.py +++ b/tests/model_serving/model_server/serverless/utils.py @@ -146,28 +146,3 @@ def verify_canary_traffic( f"Percentage of inference requests {successful_inferences_percentage} " f"to the new model does not match the expected percentage {expected_percentage}. " ) - - -def inference_service_pods_sampler( - client: DynamicClient, isvc: InferenceService, timeout: int, sleep: int = 1 -) -> TimeoutSampler: - """ - Returns TimeoutSampler for inference service. - - Args: - client (DynamicClient): DynamicClient object - isvc (InferenceService): InferenceService object - timeout (int): Timeout in seconds - sleep (int): Sleep time in seconds - - Returns: - TimeoutSampler: TimeoutSampler object - - """ - return TimeoutSampler( - wait_timeout=timeout, - sleep=sleep, - func=get_pods_by_isvc_label, - client=client, - isvc=isvc, - ) diff --git a/tests/model_serving/model_server/utils.py b/tests/model_serving/model_server/utils.py index 312f62f5f..cb95feb6b 100644 --- a/tests/model_serving/model_server/utils.py +++ b/tests/model_serving/model_server/utils.py @@ -1,8 +1,9 @@ import json import re -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor, as_completed, wait from string import Template from typing import Any, Optional +from kubernetes.dynamic import DynamicClient from ocp_resources.inference_graph import InferenceGraph from ocp_resources.inference_service import InferenceService @@ -12,7 +13,11 @@ from utilities.exceptions import ( InferenceResponseError, ) +from utilities.constants import Timeout from utilities.inference_utils import UserInference +from utilities.infra import get_isvc_keda_scaledobject, get_pods_by_isvc_label +from utilities.constants import Protocols +from timeout_sampler import TimeoutWatch, TimeoutSampler LOGGER = get_logger(name=__name__) @@ -223,3 +228,106 @@ def run_inference_multiple_times( if exceptions: raise InferenceResponseError(f"Failed to run inference. Error: {exceptions}") + + +def verify_keda_scaledobject( + client: DynamicClient, + isvc: InferenceService, + expected_trigger_type: str | None = None, + expected_query: str | None = None, + expected_threshold: str | None = None, +) -> None: + """ + Verify the KEDA ScaledObject. + + Args: + client: DynamicClient instance + isvc: InferenceService instance + expected_trigger_type: Expected trigger type + expected_query: Expected query string + expected_threshold: Expected threshold as string (e.g. "50.000000") + """ + scaled_objects = get_isvc_keda_scaledobject(client=client, isvc=isvc) + scaled_object = scaled_objects[0] + trigger_meta = scaled_object.spec.triggers[0].metadata + trigger_type = scaled_object.spec.triggers[0].type + query = trigger_meta.get("query") + threshold = trigger_meta.get("threshold") + + assert trigger_type == expected_trigger_type, ( + f"Trigger type {trigger_type} does not match expected {expected_trigger_type}" + ) + assert query == expected_query, f"Query {query} does not match expected {expected_query}" + assert threshold == expected_threshold, f"Threshold {threshold} does not match expected {expected_threshold}" + + +def run_concurrent_load_for_keda_scaling( + isvc: InferenceService, + inference_config: dict[str, Any], + num_concurrent: int = 5, + duration: int = 120, +) -> None: + """ + Run a concurrent load to test the keda scaling functionality. + + Args: + isvc: InferenceService instance + inference_config: Inference config + num_concurrent: Number of concurrent requests + duration: Duration in seconds to run the load test + """ + + def _make_request() -> None: + verify_inference_response( + inference_service=isvc, + inference_config=inference_config, + inference_type="completions", + protocol=Protocols.HTTPS, + use_default_query=True, + ) + + timeout_watch = TimeoutWatch(timeout=duration) + with ThreadPoolExecutor(max_workers=num_concurrent) as executor: + while timeout_watch.remaining_time() > 0: + futures = [executor.submit(_make_request) for _ in range(num_concurrent)] + wait(fs=futures) + + +def inference_service_pods_sampler( + client: DynamicClient, isvc: InferenceService, timeout: int, sleep: int = 1 +) -> TimeoutSampler: + """ + Returns TimeoutSampler for inference service. + + Args: + client (DynamicClient): DynamicClient object + isvc (InferenceService): InferenceService object + timeout (int): Timeout in seconds + sleep (int): Sleep time in seconds + + Returns: + TimeoutSampler: TimeoutSampler object + + """ + return TimeoutSampler( + wait_timeout=timeout, + sleep=sleep, + func=get_pods_by_isvc_label, + client=client, + isvc=isvc, + ) + + +def verify_final_pod_count(unprivileged_client: DynamicClient, isvc: InferenceService, final_pod_count: int): + """Verify final pod count after running load tests for KEDA scaling.""" + + for pods in inference_service_pods_sampler( + client=unprivileged_client, + isvc=isvc, + timeout=Timeout.TIMEOUT_5MIN, + sleep=10, + ): + if pods: + assert len(pods) == final_pod_count, ( + f"Final pod count {len(pods)} does not match expected {final_pod_count}" + ) diff --git a/utilities/constants.py b/utilities/constants.py index dc3d208df..e532dfc49 100644 --- a/utilities/constants.py +++ b/utilities/constants.py @@ -322,3 +322,5 @@ class RunTimeConfig: RHOAI_OPERATOR_NAMESPACE = "redhat-ods-operator" OPENSHIFT_OPERATORS: str = "openshift-operators" MARIADB: str = "mariadb" + +THANOS_QUERIER_ADDRESS = "https://thanos-querier.openshift-monitoring.svc:9092" diff --git a/utilities/inference_utils.py b/utilities/inference_utils.py index d69560c53..d5812851b 100644 --- a/utilities/inference_utils.py +++ b/utilities/inference_utils.py @@ -577,6 +577,7 @@ def create_isvc( teardown: bool = True, protocol_version: str | None = None, labels: dict[str, str] | None = None, + auto_scaling: dict[str, Any] | None = None, ) -> Generator[InferenceService, Any, Any]: """ Create InferenceService object. @@ -611,6 +612,7 @@ def create_isvc( model_env_variables (list[dict[str, str]]): Model environment variables teardown (bool): Teardown protocol_version (str): Protocol version of the model server + auto_scaling (dict[str, Any]): Auto scaling configuration for the model Yields: InferenceService: InferenceService object @@ -660,6 +662,8 @@ def create_isvc( predictor_dict["volumes"] = volumes if model_env_variables: predictor_dict["model"]["env"] = model_env_variables + if auto_scaling: + predictor_dict["autoScaling"] = auto_scaling _annotations: dict[str, str] = {} diff --git a/utilities/infra.py b/utilities/infra.py index 5b5369c96..6447ec733 100644 --- a/utilities/infra.py +++ b/utilities/infra.py @@ -1003,6 +1003,29 @@ def wait_for_isvc_pods(client: DynamicClient, isvc: InferenceService, runtime_na return get_pods_by_isvc_label(client=client, isvc=isvc, runtime_name=runtime_name) +def get_isvc_keda_scaledobject(client: DynamicClient, isvc: InferenceService) -> list[Any]: + """ + Get KEDA ScaledObject resources associated with an InferenceService. + + Args: + client (DynamicClient): OCP Client to use. + isvc (InferenceService): InferenceService object. + + Returns: + list[Any]: A list of all matching ScaledObjects + + Raises: + ResourceNotFoundError: if no ScaledObjects are found. + """ + namespace = isvc.namespace + scaled_object_client = client.resources.get(api_version="keda.sh/v1alpha1", kind="ScaledObject") + scaled_object = scaled_object_client.get(namespace=namespace, name=isvc.name + "-predictor") + + if scaled_object: + return scaled_object + raise ResourceNotFoundError(f"{isvc.name} has no KEDA ScaledObjects") + + def get_rhods_subscription() -> Subscription | None: subscriptions = Subscription.get(dyn_client=get_client(), namespace=RHOAI_OPERATOR_NAMESPACE) if subscriptions: diff --git a/utilities/monitoring.py b/utilities/monitoring.py index db880e323..0eeafc51a 100644 --- a/utilities/monitoring.py +++ b/utilities/monitoring.py @@ -56,6 +56,7 @@ def validate_metrics_field( expected_value: Any, field_getter: Callable[..., Any] = get_metrics_value, timeout: int = 60 * 4, + greater_than: bool = False, ) -> None: """ Validate any metric field or label using a custom getter function. @@ -67,6 +68,7 @@ def validate_metrics_field( expected_value (Any): Expected value field_getter (Callable): Function to extract the desired field/label/value timeout (int): Timeout in seconds + greater_than (bool): Whether to check if the metric is greater than the expected value Raises: TimeoutExpiredError: If expected value isn't met within the timeout @@ -79,9 +81,14 @@ def validate_metrics_field( prometheus=prometheus, metrics_query=metrics_query, ): - if sample == expected_value: - LOGGER.info("Metric field matches the expected value!") - return + if greater_than: + if float(sample) >= float(expected_value): + LOGGER.info(f"Metric field {sample} is greater than or equal to expected value {expected_value}!") + return + else: + if sample == expected_value: + LOGGER.info("Metric field matches the expected value!") + return LOGGER.info(f"Current value: {sample}, waiting for: {expected_value}") except TimeoutExpiredError: LOGGER.error(f"Timed out. Last value: {sample}, expected: {expected_value}")