|
1 | | -import pytest |
2 | 1 | from typing import Any, Generator |
| 2 | +import json |
| 3 | + |
| 4 | +import pytest |
| 5 | +from kubernetes.dynamic import DynamicClient |
| 6 | +from ocp_resources.job import Job |
| 7 | + |
| 8 | +from tests.model_registry.async_job.constants import ( |
| 9 | + ASYNC_JOB_ANNOTATIONS, |
| 10 | + ASYNC_JOB_LABELS, |
| 11 | + ASYNC_UPLOAD_IMAGE, |
| 12 | + ASYNC_UPLOAD_JOB_NAME, |
| 13 | + MODEL_SYNC_CONFIG, |
| 14 | + VOLUME_MOUNTS, |
| 15 | +) |
| 16 | + |
3 | 17 | import shortuuid |
4 | 18 | from pytest import FixtureRequest |
5 | 19 |
|
6 | 20 | from ocp_resources.namespace import Namespace |
7 | 21 | from ocp_resources.pod import Pod |
| 22 | +from ocp_resources.role_binding import RoleBinding |
8 | 23 | from ocp_resources.route import Route |
| 24 | +from ocp_resources.secret import Secret |
9 | 25 | from ocp_resources.service import Service |
| 26 | +from ocp_resources.service_account import ServiceAccount |
| 27 | +from ocp_resources.model_registry_modelregistry_opendatahub_io import ModelRegistry |
| 28 | +from model_registry.types import RegisteredModel |
| 29 | +from model_registry import ModelRegistry as ModelRegistryClient |
10 | 30 |
|
11 | 31 | from utilities.infra import create_ns |
12 | 32 | from utilities.constants import OCIRegistry, MinIo, Protocols, Labels |
| 33 | +from utilities.general import b64_encoded_string |
| 34 | +from tests.model_registry.async_job.utils import get_async_job_s3_secret_dict, upload_test_model_to_minio_from_image |
| 35 | +from tests.model_registry.utils import get_mr_service_by_label, get_endpoint_from_mr_service |
| 36 | +from tests.model_registry.async_job.constants import REPO_NAME |
13 | 37 |
|
14 | | -from kubernetes.dynamic import DynamicClient |
| 38 | + |
| 39 | +# We need to upstream this to the wrapper library |
| 40 | +class JobWithVolumes(Job): |
| 41 | + """Extended Job class that supports volumes""" |
| 42 | + |
| 43 | + def __init__(self, volumes=None, **kwargs): |
| 44 | + super().__init__(**kwargs) |
| 45 | + self.volumes = volumes or [] |
| 46 | + |
| 47 | + def to_dict(self) -> None: |
| 48 | + super().to_dict() |
| 49 | + if not self.kind_dict and not self.yaml_file and self.volumes: |
| 50 | + self.res["spec"].setdefault("template", {}).setdefault("spec", {}) |
| 51 | + self.res["spec"]["template"]["spec"]["volumes"] = self.volumes |
| 52 | + |
| 53 | + |
| 54 | +@pytest.fixture(scope="function") |
| 55 | +def s3_secret_for_async_job( |
| 56 | + admin_client: DynamicClient, |
| 57 | + service_account: ServiceAccount, |
| 58 | + minio_service: Service, |
| 59 | +) -> Generator[Secret, Any, Any]: |
| 60 | + """Create S3 credentials secret for async upload job""" |
| 61 | + # Construct MinIO endpoint from service |
| 62 | + minio_endpoint = ( |
| 63 | + f"http://{minio_service.name}.{minio_service.namespace}.svc.cluster.local:{MinIo.Metadata.DEFAULT_PORT}" |
| 64 | + ) |
| 65 | + |
| 66 | + with Secret( |
| 67 | + client=admin_client, |
| 68 | + name=f"async-job-s3-secret-{shortuuid.uuid().lower()}", |
| 69 | + namespace=service_account.namespace, |
| 70 | + data_dict=get_async_job_s3_secret_dict( |
| 71 | + access_key=MinIo.Credentials.ACCESS_KEY_VALUE, |
| 72 | + secret_access_key=MinIo.Credentials.SECRET_KEY_VALUE, |
| 73 | + s3_bucket=MinIo.Buckets.MODELMESH_EXAMPLE_MODELS, |
| 74 | + s3_endpoint=minio_endpoint, |
| 75 | + s3_region="us-east-1", # Default region for MinIO |
| 76 | + ), |
| 77 | + type="Opaque", |
| 78 | + ) as secret: |
| 79 | + yield secret |
| 80 | + |
| 81 | + |
| 82 | +@pytest.fixture(scope="function") |
| 83 | +def oci_secret_for_async_job( |
| 84 | + admin_client: DynamicClient, |
| 85 | + service_account: ServiceAccount, |
| 86 | + oci_registry_host: str, |
| 87 | +) -> Generator[Secret, Any, Any]: |
| 88 | + """Create OCI registry credentials secret for async upload job""" |
| 89 | + |
| 90 | + # Create anonymous dockerconfig for OCI registry (no authentication) |
| 91 | + # This matches the zot registry setup which allows anonymous access |
| 92 | + dockerconfig = { |
| 93 | + "auths": { |
| 94 | + f"{oci_registry_host}:{OCIRegistry.Metadata.DEFAULT_PORT}": { |
| 95 | + "auth": "", |
| 96 | + "email": "[email protected]", # Anonymous access |
| 97 | + } |
| 98 | + } |
| 99 | + } |
| 100 | + |
| 101 | + with Secret( |
| 102 | + client=admin_client, |
| 103 | + name=f"async-job-oci-secret-{shortuuid.uuid().lower()}", |
| 104 | + namespace=service_account.namespace, |
| 105 | + data_dict={ |
| 106 | + ".dockerconfigjson": b64_encoded_string(json.dumps(dockerconfig)), |
| 107 | + "ACCESS_TYPE": b64_encoded_string(json.dumps('["Push,Pull"]')), |
| 108 | + "OCI_HOST": b64_encoded_string(json.dumps(f"{oci_registry_host}:{OCIRegistry.Metadata.DEFAULT_PORT}")), |
| 109 | + }, |
| 110 | + type="kubernetes.io/dockerconfigjson", |
| 111 | + ) as secret: |
| 112 | + yield secret |
| 113 | + |
| 114 | + |
| 115 | +@pytest.fixture(scope="function") |
| 116 | +def model_sync_async_job( |
| 117 | + admin_client: DynamicClient, |
| 118 | + sa_token: str, |
| 119 | + service_account: ServiceAccount, |
| 120 | + model_registry_namespace: str, |
| 121 | + model_registry_instance: list[ModelRegistry], |
| 122 | + s3_secret_for_async_job: Secret, |
| 123 | + oci_secret_for_async_job: Secret, |
| 124 | + oci_registry_host: str, |
| 125 | + mr_access_role_binding: RoleBinding, |
| 126 | + teardown_resources: bool, |
| 127 | +) -> Generator[Job, Any, Any]: |
| 128 | + """Core Job fixture focused on Job deployment and configuration""" |
| 129 | + # Get dynamic OCI URI from route |
| 130 | + dynamic_oci_uri = f"{oci_registry_host}/{REPO_NAME}" |
| 131 | + |
| 132 | + # Get model registry service and endpoint |
| 133 | + mr_instance = model_registry_instance[0] # Use first instance |
| 134 | + mr_service = get_mr_service_by_label( |
| 135 | + client=admin_client, namespace_name=model_registry_namespace, mr_instance=mr_instance |
| 136 | + ) |
| 137 | + mr_endpoint = get_endpoint_from_mr_service(svc=mr_service, protocol=Protocols.REST) |
| 138 | + mr_host = mr_endpoint.split(":")[0] |
| 139 | + mr_port = mr_endpoint.split(":")[1] |
| 140 | + |
| 141 | + with JobWithVolumes( |
| 142 | + client=admin_client, |
| 143 | + name=ASYNC_UPLOAD_JOB_NAME, |
| 144 | + namespace=service_account.namespace, |
| 145 | + label=ASYNC_JOB_LABELS, |
| 146 | + annotations=ASYNC_JOB_ANNOTATIONS, |
| 147 | + restart_policy="Never", |
| 148 | + containers=[ |
| 149 | + { |
| 150 | + "name": "async-upload", |
| 151 | + "image": ASYNC_UPLOAD_IMAGE, |
| 152 | + "volumeMounts": [ |
| 153 | + { |
| 154 | + "name": "source-credentials", |
| 155 | + "readOnly": True, |
| 156 | + "mountPath": VOLUME_MOUNTS["SOURCE_CREDS_PATH"], |
| 157 | + }, |
| 158 | + { |
| 159 | + "name": "destination-credentials", |
| 160 | + "readOnly": True, |
| 161 | + "mountPath": VOLUME_MOUNTS["DEST_CREDS_PATH"], |
| 162 | + }, |
| 163 | + ], |
| 164 | + "env": [ |
| 165 | + # Proxy settings |
| 166 | + {"name": "HTTP_PROXY", "value": ""}, |
| 167 | + {"name": "HTTPS_PROXY", "value": ""}, |
| 168 | + {"name": "NO_PROXY", "value": "*.svc.cluster.local"}, |
| 169 | + # Source configuration |
| 170 | + {"name": "MODEL_SYNC_SOURCE_TYPE", "value": MODEL_SYNC_CONFIG["SOURCE_TYPE"]}, |
| 171 | + {"name": "MODEL_SYNC_SOURCE_AWS_KEY", "value": MODEL_SYNC_CONFIG["SOURCE_AWS_KEY"]}, |
| 172 | + { |
| 173 | + "name": "MODEL_SYNC_SOURCE_S3_CREDENTIALS_PATH", |
| 174 | + "value": VOLUME_MOUNTS["SOURCE_CREDS_PATH"], |
| 175 | + }, |
| 176 | + # Destination configuration |
| 177 | + {"name": "MODEL_SYNC_DESTINATION_TYPE", "value": MODEL_SYNC_CONFIG["DESTINATION_TYPE"]}, |
| 178 | + { |
| 179 | + "name": "MODEL_SYNC_DESTINATION_OCI_URI", |
| 180 | + "value": f"{dynamic_oci_uri}", |
| 181 | + }, |
| 182 | + { |
| 183 | + "name": "MODEL_SYNC_DESTINATION_OCI_REGISTRY", |
| 184 | + "value": f"{oci_registry_host}:{OCIRegistry.Metadata.DEFAULT_PORT}", |
| 185 | + }, |
| 186 | + { |
| 187 | + "name": "MODEL_SYNC_DESTINATION_OCI_CREDENTIALS_PATH", |
| 188 | + "value": VOLUME_MOUNTS["DEST_DOCKERCONFIG_PATH"], |
| 189 | + }, |
| 190 | + { |
| 191 | + "name": "MODEL_SYNC_DESTINATION_OCI_BASE_IMAGE", |
| 192 | + "value": MODEL_SYNC_CONFIG["DESTINATION_OCI_BASE_IMAGE"], |
| 193 | + }, |
| 194 | + { |
| 195 | + "name": "MODEL_SYNC_DESTINATION_OCI_ENABLE_TLS_VERIFY", |
| 196 | + "value": MODEL_SYNC_CONFIG["DESTINATION_OCI_ENABLE_TLS_VERIFY"], |
| 197 | + }, |
| 198 | + # Model parameters |
| 199 | + {"name": "MODEL_SYNC_MODEL_ID", "value": MODEL_SYNC_CONFIG["MODEL_ID"]}, |
| 200 | + {"name": "MODEL_SYNC_MODEL_VERSION_ID", "value": MODEL_SYNC_CONFIG["MODEL_VERSION_ID"]}, |
| 201 | + { |
| 202 | + "name": "MODEL_SYNC_MODEL_ARTIFACT_ID", |
| 203 | + "value": MODEL_SYNC_CONFIG["MODEL_ARTIFACT_ID"], |
| 204 | + }, |
| 205 | + # Model Registry client params |
| 206 | + { |
| 207 | + "name": "MODEL_SYNC_REGISTRY_SERVER_ADDRESS", |
| 208 | + "value": f"https://{mr_host}", |
| 209 | + }, |
| 210 | + {"name": "MODEL_SYNC_REGISTRY_PORT", "value": mr_port}, |
| 211 | + {"name": "MODEL_SYNC_REGISTRY_AUTHOR", "value": "RHOAI async job test"}, |
| 212 | + {"name": "MODEL_SYNC_REGISTRY_USER_TOKEN", "value": sa_token}, |
| 213 | + {"name": "MODEL_SYNC_REGISTRY_IS_SECURE", "value": "False"}, |
| 214 | + ], |
| 215 | + } |
| 216 | + ], |
| 217 | + volumes=[ |
| 218 | + { |
| 219 | + "name": "source-credentials", |
| 220 | + "secret": { |
| 221 | + "secretName": s3_secret_for_async_job.name, |
| 222 | + }, |
| 223 | + }, |
| 224 | + { |
| 225 | + "name": "destination-credentials", |
| 226 | + "secret": { |
| 227 | + "secretName": oci_secret_for_async_job.name, |
| 228 | + }, |
| 229 | + }, |
| 230 | + ], |
| 231 | + teardown=teardown_resources, |
| 232 | + ) as job: |
| 233 | + job.wait_for_condition(condition="Complete", status="True") |
| 234 | + yield job |
15 | 235 |
|
16 | 236 |
|
17 | 237 | # OCI Registry |
@@ -129,3 +349,41 @@ def oci_registry_route(admin_client: DynamicClient, oci_registry_service: Servic |
129 | 349 | service=oci_registry_service.name, |
130 | 350 | ) as oci_route: |
131 | 351 | yield oci_route |
| 352 | + |
| 353 | + |
| 354 | +@pytest.fixture(scope="class") |
| 355 | +def oci_registry_host(oci_registry_route: Route) -> str: |
| 356 | + """Get the OCI registry host from the route""" |
| 357 | + return oci_registry_route.instance.spec.host |
| 358 | + |
| 359 | + |
| 360 | +@pytest.fixture(scope="function") |
| 361 | +def create_test_data_in_minio_from_image( |
| 362 | + minio_service: Service, |
| 363 | + admin_client: DynamicClient, |
| 364 | + model_registry_namespace: str, |
| 365 | +) -> None: |
| 366 | + """Extract and upload test model from KSERVE_MINIO_IMAGE to MinIO""" |
| 367 | + upload_test_model_to_minio_from_image( |
| 368 | + admin_client=admin_client, |
| 369 | + namespace=model_registry_namespace, |
| 370 | + minio_service=minio_service, |
| 371 | + object_key="my-model/model.onnx", |
| 372 | + ) |
| 373 | + |
| 374 | + |
| 375 | +@pytest.fixture(scope="class") |
| 376 | +def registered_model_from_image( |
| 377 | + request: FixtureRequest, model_registry_client: list[ModelRegistryClient] |
| 378 | +) -> Generator[RegisteredModel, None, None]: |
| 379 | + """Create a registered model for testing with KSERVE_MINIO_IMAGE data""" |
| 380 | + yield model_registry_client[0].register_model( |
| 381 | + name=request.param.get("model_name"), |
| 382 | + uri=request.param.get("model_uri"), |
| 383 | + version=request.param.get("model_version"), |
| 384 | + description=request.param.get("model_description"), |
| 385 | + model_format_name=request.param.get("model_format"), |
| 386 | + model_format_version=request.param.get("model_format_version"), |
| 387 | + storage_key=request.param.get("model_storage_key"), |
| 388 | + storage_path=request.param.get("model_storage_path"), |
| 389 | + ) |
0 commit comments