Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions tests/model_registry/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
from utilities.constants import Protocols, DscComponents
from model_registry import ModelRegistry as ModelRegistryClient
from semver import Version
from utilities.infra import get_product_version
from utilities.operator_utils import get_cluster_service_version, validate_operator_subscription_channel
from utilities.general import wait_for_pods_by_labels

LOGGER = get_logger(name=__name__)
Expand Down Expand Up @@ -354,47 +352,6 @@ def model_registry_instance_pod(admin_client: DynamicClient) -> Generator[Pod, A
)[0]


@pytest.fixture(scope="package", autouse=True)
def validate_authorino_operator_version_channel(admin_client: DynamicClient) -> None:
"""Check if Authorino operator is installed with required version and channel.

This fixture is automatically used for all tests in the model_registry directory.
It verifies that:
1. For OpenShift AI: The product version is >= 2.20
2. The Authorino operator is installed
3. The Authorino operator is using the required channel (stable)
4. The Authorino operator is at least version 1.2.1
"""
distribution = py_config["distribution"]
if distribution == "upstream":
# TODO: figure out minimum version for ODH
LOGGER.info(f"Skipping Authorino operator check for {distribution} distribution")
return
# Only check product version for OpenShift AI
if distribution == "downstream":
product_version = get_product_version(admin_client=admin_client)
if product_version < MIN_MR_VERSION:
LOGGER.info(
"Skipping Authorino operator check - product version "
f"{product_version} is below required {MIN_MR_VERSION}"
)
return
operator_name = "authorino-operator"
# Find the CSV for the operator
authorino_csv = get_cluster_service_version(
client=admin_client, prefix=operator_name, namespace=py_config["applications_namespace"]
)
current_authorino_version = authorino_csv.instance.spec.version
if Version.parse(version="1.2.1") > Version.parse(version=current_authorino_version):
pytest.exit(
f"Authorino operator is not at least version 1.2.1. Current version: {current_authorino_version}"
)

validate_operator_subscription_channel(
client=admin_client, namespace="openshift-operators", operator_name=operator_name, channel_name="stable"
)


@pytest.fixture(scope="class")
def is_model_registry_oauth(request: FixtureRequest) -> bool:
return getattr(request, "param", {}).get("use_oauth_proxy", False)
Expand Down
23 changes: 0 additions & 23 deletions utilities/operator_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from simple_logger.logger import get_logger

from ocp_resources.cluster_service_version import ClusterServiceVersion
from ocp_resources.subscription import Subscription
from utilities.exceptions import ResourceMismatchError
from utilities.infra import get_product_version
from pytest_testconfig import config as py_config

Expand All @@ -31,27 +29,6 @@ def get_cluster_service_version(client: DynamicClient, prefix: str, namespace: s
return matching_csvs[0]


def validate_operator_subscription_channel(
client: DynamicClient, operator_name: str, namespace: str, channel_name: str
) -> None:
operator_subscriptions = [
subscription
for subscription in Subscription.get(dyn_client=client, namespace=namespace)
if subscription.instance.spec.name == operator_name
]

if not operator_subscriptions:
raise ResourceNotFoundError(f"Subscription not found for operator {operator_name}")
if len(operator_subscriptions) > 1:
raise ResourceNotUniqueError(f"Multiple subscriptions found for operator {operator_name}")
subscription_channel = operator_subscriptions[0].instance.spec.channel
if not subscription_channel or subscription_channel != channel_name:
raise ResourceMismatchError(
f"For Operator {operator_name}, Subscription points to {subscription_channel}, expected: {channel_name}"
)
LOGGER.info(f"Operator {operator_name} subscription channel is {subscription_channel}")


def get_csv_related_images(admin_client: DynamicClient, csv_name: str | None = None) -> List[Dict[str, str]]:
"""Get relatedImages from the CSV.

Expand Down