|
4 | 4 | import yaml |
5 | 5 | from _pytest.fixtures import FixtureRequest |
6 | 6 | from kubernetes.dynamic import DynamicClient |
| 7 | +from ocp_resources.authorino import Authorino |
| 8 | +from ocp_resources.cluster_service_version import ClusterServiceVersion |
7 | 9 | from ocp_resources.config_map import ConfigMap |
8 | 10 | from ocp_resources.inference_service import InferenceService |
9 | 11 | from ocp_resources.namespace import Namespace |
10 | 12 | from ocp_resources.persistent_volume_claim import PersistentVolumeClaim |
11 | 13 | from ocp_resources.secret import Secret |
12 | 14 | from ocp_resources.service_account import ServiceAccount |
| 15 | +from ocp_resources.service_mesh_control_plane import ServiceMeshControlPlane |
13 | 16 | from ocp_resources.serving_runtime import ServingRuntime |
14 | 17 | from ocp_resources.storage_class import StorageClass |
15 | 18 | from ocp_utilities.monitoring import Prometheus |
| 19 | +from pytest_testconfig import config as py_config |
16 | 20 |
|
17 | 21 | from utilities.constants import ( |
18 | 22 | KServeDeploymentType, |
|
36 | 40 | from utilities.serving_runtime import ServingRuntimeFromTemplate |
37 | 41 |
|
38 | 42 |
|
| 43 | +@pytest.fixture(scope="package") |
| 44 | +def skip_if_no_deployed_openshift_serverless(admin_client: DynamicClient) -> None: |
| 45 | + name = "openshift-serverless" |
| 46 | + csvs = list( |
| 47 | + ClusterServiceVersion.get( |
| 48 | + client=admin_client, |
| 49 | + namespace=name, |
| 50 | + label_selector=f"operators.coreos.com/serverless-operator.{name}", |
| 51 | + ) |
| 52 | + ) |
| 53 | + if not csvs: |
| 54 | + pytest.skip("OpenShift Serverless is not deployed") |
| 55 | + |
| 56 | + csv = csvs[0] |
| 57 | + |
| 58 | + if not (csv.exists and csv.status == csv.Status.SUCCEEDED): |
| 59 | + pytest.skip("OpenShift Serverless is not deployed") |
| 60 | + |
| 61 | + |
39 | 62 | @pytest.fixture(scope="class") |
40 | 63 | def models_endpoint_s3_secret( |
41 | 64 | admin_client: DynamicClient, |
@@ -166,6 +189,28 @@ def skip_if_no_nfs_storage_class(admin_client: DynamicClient) -> None: |
166 | 189 | pytest.skip(f"StorageClass {StorageClassName.NFS} is missing from the cluster") |
167 | 190 |
|
168 | 191 |
|
| 192 | +@pytest.fixture(scope="package") |
| 193 | +def skip_if_no_deployed_redhat_authorino_operator(admin_client: DynamicClient) -> None: |
| 194 | + name = "authorino" |
| 195 | + namespace = f"{py_config['applications_namespace']}-auth-provider" |
| 196 | + |
| 197 | + if not Authorino( |
| 198 | + client=admin_client, |
| 199 | + name=name, |
| 200 | + namespace=namespace, |
| 201 | + ).exists: |
| 202 | + pytest.skip(f"Authorino {name} CR is missing from {namespace} namespace") |
| 203 | + |
| 204 | + |
| 205 | +@pytest.fixture(scope="package") |
| 206 | +def skip_if_no_deployed_openshift_service_mesh(admin_client: DynamicClient) -> None: |
| 207 | + smcp = ServiceMeshControlPlane(client=admin_client, name="data-science-smcp", namespace="istio-system") |
| 208 | + if not smcp or not smcp.exists: |
| 209 | + pytest.skip("OpenShift service mesh operator is not deployed") |
| 210 | + |
| 211 | + smcp.wait_for_condition(condition=smcp.Condition.READY, status="True") |
| 212 | + |
| 213 | + |
169 | 214 | @pytest.fixture(scope="class") |
170 | 215 | def http_s3_openvino_model_mesh_inference_service( |
171 | 216 | request: FixtureRequest, |
|
0 commit comments