From 6f49749ec567f924160d20440b8fef46e7a07050 Mon Sep 17 00:00:00 2001 From: Debarati Basu-Nag Date: Thu, 26 Mar 2026 18:42:44 -0400 Subject: [PATCH 1/4] test: Add async job signing test automation Signed-off-by: Debarati Basu-Nag Co-Authored-By: Claude --- .../model_registry/async_job/conftest.py | 37 -- .../model_registry/model_registry/conftest.py | 36 ++ .../python_client/signing/conftest.py | 415 +++++++++++++++++- .../python_client/signing/constants.py | 24 + .../signing/test_async_signing.py | 92 ++++ .../signing/test_native_job_signing.py | 97 ++++ .../python_client/signing/utils.py | 228 ++++++++++ 7 files changed, 891 insertions(+), 38 deletions(-) create mode 100644 tests/model_registry/model_registry/python_client/signing/test_async_signing.py create mode 100644 tests/model_registry/model_registry/python_client/signing/test_native_job_signing.py diff --git a/tests/model_registry/model_registry/async_job/conftest.py b/tests/model_registry/model_registry/async_job/conftest.py index 8caaf70dc..28c036c45 100644 --- a/tests/model_registry/model_registry/async_job/conftest.py +++ b/tests/model_registry/model_registry/async_job/conftest.py @@ -5,17 +5,14 @@ import pytest import shortuuid from kubernetes.dynamic import DynamicClient -from kubernetes.dynamic.exceptions import ResourceNotFoundError from model_registry import ModelRegistry as ModelRegistryClient from model_registry.types import RegisteredModel -from ocp_resources.config_map import ConfigMap from ocp_resources.job import Job from ocp_resources.role_binding import RoleBinding from ocp_resources.secret import Secret from ocp_resources.service import Service from ocp_resources.service_account import ServiceAccount from pytest import FixtureRequest -from pytest_testconfig import py_config from tests.model_registry.model_registry.async_job.constants import ( ASYNC_JOB_ANNOTATIONS, @@ -109,40 +106,6 @@ def oci_secret_for_async_job( yield secret -@pytest.fixture(scope="class") -def async_upload_image(admin_client: DynamicClient) -> str: - """ - Get the async upload image dynamically from the model-registry-operator-parameters ConfigMap. - - This fetches the image from the cluster at runtime instead of using a hardcoded value. - - Args: - admin_client: Kubernetes client for resource access - - Returns: - str: The async upload image URL from the ConfigMap - - Raises: - KeyError: If the ConfigMap or the required key doesn't exist - """ - config_map = ConfigMap( - client=admin_client, - name="model-registry-operator-parameters", - namespace=py_config["applications_namespace"], - ) - - if not config_map.exists: - raise ResourceNotFoundError( - f"ConfigMap 'model-registry-operator-parameters' not found in" - f" namespace '{py_config['applications_namespace']}'" - ) - - try: - return config_map.instance.data["IMAGES_JOBS_ASYNC_UPLOAD"] - except KeyError as e: - raise KeyError(f"Key 'IMAGES_JOBS_ASYNC_UPLOAD' not found in ConfigMap data: {e}") from e - - @pytest.fixture(scope="class") def model_sync_async_job( admin_client: DynamicClient, diff --git a/tests/model_registry/model_registry/conftest.py b/tests/model_registry/model_registry/conftest.py index 6b9f12667..f59806b4b 100644 --- a/tests/model_registry/model_registry/conftest.py +++ b/tests/model_registry/model_registry/conftest.py @@ -9,6 +9,7 @@ from kubernetes.dynamic.exceptions import ResourceNotFoundError from model_registry import ModelRegistry as ModelRegistryClient from model_registry.types import RegisteredModel +from ocp_resources.config_map import ConfigMap from ocp_resources.deployment import Deployment from ocp_resources.namespace import Namespace from ocp_resources.pod import Pod @@ -17,6 +18,7 @@ from ocp_resources.service_account import ServiceAccount from pyhelper_utils.shell import run_command from pytest import FixtureRequest +from pytest_testconfig import config as py_config from tests.model_registry.constants import ( MODEL_REGISTRY_POD_FILTER, @@ -236,3 +238,37 @@ def mr_access_role_binding( LOGGER.info(f"RoleBinding {binding.name} created successfully.") yield binding LOGGER.info(f"RoleBinding {binding.name} deletion initiated by context manager.") + + +@pytest.fixture(scope="class") +def async_upload_image(admin_client: DynamicClient) -> str: + """ + Get the async upload image dynamically from the model-registry-operator-parameters ConfigMap. + + This fetches the image from the cluster at runtime instead of using a hardcoded value. + + Args: + admin_client: Kubernetes client for resource access + + Returns: + str: The async upload image URL from the ConfigMap + + Raises: + KeyError: If the ConfigMap or the required key doesn't exist + """ + config_map = ConfigMap( + client=admin_client, + name="model-registry-operator-parameters", + namespace=py_config["applications_namespace"], + ) + + if not config_map.exists: + raise ResourceNotFoundError( + f"ConfigMap 'model-registry-operator-parameters' not found in" + f" namespace '{py_config['applications_namespace']}'" + ) + + try: + return config_map.instance.data["IMAGES_JOBS_ASYNC_UPLOAD"] + except KeyError as e: + raise KeyError(f"Key 'IMAGES_JOBS_ASYNC_UPLOAD' not found in ConfigMap data: {e}") from e diff --git a/tests/model_registry/model_registry/python_client/signing/conftest.py b/tests/model_registry/model_registry/python_client/signing/conftest.py index 23c09f6c7..b708bc0e2 100644 --- a/tests/model_registry/model_registry/python_client/signing/conftest.py +++ b/tests/model_registry/model_registry/python_client/signing/conftest.py @@ -1,5 +1,6 @@ """Fixtures for Model Registry Python Client Signing Tests.""" +import base64 import json import logging import os @@ -9,38 +10,73 @@ import pytest import requests +import shortuuid import structlog from huggingface_hub import snapshot_download from kubernetes.dynamic import DynamicClient +from model_registry import ModelRegistry as ModelRegistryClient from model_registry.signing import Signer +from model_registry.types import RegisteredModel from ocp_resources.config_map import ConfigMap from ocp_resources.deployment import Deployment +from ocp_resources.job import Job from ocp_resources.namespace import Namespace from ocp_resources.pod import Pod +from ocp_resources.role_binding import RoleBinding +from ocp_resources.secret import Secret from ocp_resources.service import Service +from ocp_resources.service_account import ServiceAccount from ocp_resources.subscription import Subscription from ocp_utilities.operators import install_operator, uninstall_operator from pyhelper_utils.shell import run_command from pytest_testconfig import config as py_config from timeout_sampler import TimeoutSampler +from tests.model_registry.model_registry.async_job.constants import ( + ASYNC_UPLOAD_JOB_NAME, + MODEL_SYNC_CONFIG, +) +from tests.model_registry.model_registry.async_job.utils import get_latest_job_pod from tests.model_registry.model_registry.python_client.signing.constants import ( + IDENTITY_TOKEN_MOUNT_PATH, + MODEL_CONTENT, + NATIVE_SIGNING_REPO, + NATIVE_SIGNING_TAG, SECURESIGN_API_VERSION, SECURESIGN_NAME, SECURESIGN_NAMESPACE, + SIGNING_ASYNC_REPO, + SIGNING_ASYNC_TAG, + SIGNING_MODEL_DATA, SIGNING_OCI_REPO_NAME, SIGNING_OCI_TAG, TAS_CONNECTION_TYPE_NAME, ) from tests.model_registry.model_registry.python_client.signing.utils import ( + create_async_upload_job, create_connection_type_field, generate_token, + get_base_async_job_env_vars, + get_model_registry_host, + get_oci_image_with_digest, + get_oci_internal_endpoint, get_organization_config, get_root_checksum, get_tas_service_urls, + run_minio_uploader_pod, +) +from utilities.constants import ( + OPENSHIFT_OPERATORS, + ApiGroups, + Labels, + MinIo, + ModelCarImage, + OCIRegistry, + Timeout, ) -from utilities.constants import OPENSHIFT_OPERATORS, Labels, ModelCarImage, OCIRegistry, Timeout +from utilities.general import b64_encoded_string, get_s3_secret_dict from utilities.infra import get_openshift_token, is_managed_cluster +from utilities.resources.model_registry_modelregistry_opendatahub_io import ModelRegistry from utilities.resources.route import Route from utilities.resources.securesign import Securesign @@ -577,3 +613,380 @@ def signed_model(signer, downloaded_model_dir) -> Path: LOGGER.info("Model signed successfully") return downloaded_model_dir + + +@pytest.fixture(scope="class") +def signing_s3_secret( + admin_client: DynamicClient, + service_account: ServiceAccount, + minio_service: Service, +) -> Generator[Secret, Any, Any]: + """Create S3 data connection for signing async upload jobs.""" + minio_endpoint = ( + f"http://{minio_service.name}.{minio_service.namespace}.svc.cluster.local:{MinIo.Metadata.DEFAULT_PORT}" + ) + + with Secret( + client=admin_client, + name=f"signing-s3-{shortuuid.uuid().lower()}", + namespace=service_account.namespace, + data_dict=get_s3_secret_dict( + aws_access_key=MinIo.Credentials.ACCESS_KEY_VALUE, + aws_secret_access_key=MinIo.Credentials.SECRET_KEY_VALUE, + aws_s3_bucket=MinIo.Buckets.MODELMESH_EXAMPLE_MODELS, + aws_s3_endpoint=minio_endpoint, + aws_default_region="us-east-1", + ), + label={ + Labels.OpenDataHub.DASHBOARD: "true", + Labels.OpenDataHubIo.MANAGED: "true", + }, + annotations={ + f"{ApiGroups.OPENDATAHUB_IO}/connection-type": "s3", + "openshift.io/display-name": "Signing S3 Credentials", + }, + ) as secret: + yield secret + + +@pytest.fixture(scope="class") +def signing_oci_secret( + admin_client: DynamicClient, + service_account: ServiceAccount, + oci_registry_service: Service, +) -> Generator[Secret, Any, Any]: + """Create OCI registry data connection for signing async upload jobs.""" + oci_internal_endpoint = get_oci_internal_endpoint(oci_registry_service=oci_registry_service) + + dockerconfig = { + "auths": { + oci_internal_endpoint: { + "auth": "", + "email": "user@example.com", + } + } + } + + data_dict = { + ".dockerconfigjson": b64_encoded_string(json.dumps(dockerconfig)), + "ACCESS_TYPE": b64_encoded_string(json.dumps('["Push,Pull"]')), + "OCI_HOST": b64_encoded_string(json.dumps(oci_internal_endpoint)), + } + + with Secret( + client=admin_client, + name=f"signing-oci-{shortuuid.uuid().lower()}", + namespace=service_account.namespace, + data_dict=data_dict, + label={ + Labels.OpenDataHub.DASHBOARD: "true", + Labels.OpenDataHubIo.MANAGED: "true", + }, + annotations={ + f"{ApiGroups.OPENDATAHUB_IO}/connection-type-ref": "oci-v1", + "openshift.io/display-name": "Signing OCI Credentials", + }, + type="kubernetes.io/dockerconfigjson", + ) as secret: + yield secret + + +@pytest.fixture(scope="class") +def signing_registered_model( + model_registry_client: list[ModelRegistryClient], +) -> RegisteredModel: + """Register a model in Model Registry for signing async tests.""" + return model_registry_client[0].register_model( + name=SIGNING_MODEL_DATA["model_name"], + uri=SIGNING_MODEL_DATA.get("model_uri"), + version=SIGNING_MODEL_DATA.get("model_version"), + version_description=SIGNING_MODEL_DATA.get("model_description"), + model_format_name=SIGNING_MODEL_DATA.get("model_format"), + model_format_version=SIGNING_MODEL_DATA.get("model_format_version"), + storage_key=SIGNING_MODEL_DATA.get("model_storage_key"), + storage_path=SIGNING_MODEL_DATA.get("model_storage_path"), + ) + + +@pytest.fixture(scope="class") +def class_scoped_signer(set_environment_variables: None) -> Signer: + """Class-scoped signer instance for use in class-scoped fixtures.""" + signer = Signer( + identity_token_path=os.environ["IDENTITY_TOKEN_PATH"], + root_url=os.environ["ROOT_URL"], + root_checksum=os.environ["ROOT_CHECKSUM"], + log_level=logging.DEBUG, + ) + signer.initialize(force=True) + LOGGER.info("Class-scoped signer initialized") + return signer + + +@pytest.fixture(scope="class") +def signed_model_dir(class_scoped_signer: Signer, tmp_path_factory: pytest.TempPathFactory) -> Path: + """Create a model directory, sign it, and return the path.""" + model_dir = tmp_path_factory.mktemp(basename="async-signing-model") + model_file = model_dir / "model.onnx" + model_file.write_bytes(data=MODEL_CONTENT) + + LOGGER.info(f"Signing model in {model_dir}") + class_scoped_signer.sign_model(model_path=str(model_dir)) + + sig_file = model_dir / "model.sig" + assert sig_file.exists(), "model.sig not created after signing" + LOGGER.info(f"Model signed successfully, model.sig size: {sig_file.stat().st_size} bytes") + return model_dir + + +@pytest.fixture(scope="class") +def signed_model_secret( + admin_client: DynamicClient, + model_registry_namespace: str, + signed_model_dir: Path, +) -> Generator[Secret, Any, Any]: + """Create a Secret containing the signed model files for upload to MinIO.""" + model_onnx = (signed_model_dir / "model.onnx").read_bytes() + model_sig = (signed_model_dir / "model.sig").read_bytes() + + with Secret( + client=admin_client, + name=f"signed-model-files-{shortuuid.uuid().lower()}", + namespace=model_registry_namespace, + data_dict={ + "model.onnx": base64.b64encode(model_onnx).decode(), + "model.sig": base64.b64encode(model_sig).decode(), + }, + ) as secret: + LOGGER.info(f"Created signed model secret: {secret.name}") + yield secret + + +@pytest.fixture(scope="class") +def upload_signed_model_to_minio( + admin_client: DynamicClient, + model_registry_namespace: str, + minio_service: Service, + signed_model_secret: Secret, +) -> None: + """Upload signed model files (model.onnx + model.sig) to MinIO.""" + source_key = MODEL_SYNC_CONFIG["SOURCE_AWS_KEY"] + bucket = MinIo.Buckets.MODELMESH_EXAMPLE_MODELS + + run_minio_uploader_pod( + admin_client=admin_client, + namespace=model_registry_namespace, + minio_service=minio_service, + pod_name="signed-model-uploader", + mc_commands=( + f"mc cp /model-files/model.onnx testminio/{bucket}/{source_key}/model.onnx && " + f"mc cp /model-files/model.sig testminio/{bucket}/{source_key}/model.sig && " + f"mc ls testminio/{bucket}/{source_key}/ && " + f"echo 'Signed model upload completed'" + ), + volumes=[{"name": "model-files", "secret": {"secretName": signed_model_secret.name}}], + volume_mounts=[{"name": "model-files", "mountPath": "/model-files", "readOnly": True}], + ) + + +@pytest.fixture(scope="class") +def signing_async_job( + admin_client: DynamicClient, + sa_token: str, + service_account: ServiceAccount, + model_registry_namespace: str, + model_registry_instance: list[ModelRegistry], + signing_s3_secret: Secret, + signing_oci_secret: Secret, + oci_registry_service: Service, + mr_access_role_binding: RoleBinding, + async_upload_image: str, + signing_registered_model: RegisteredModel, + upload_signed_model_to_minio: None, + teardown_resources: bool, +) -> Generator[Job, Any, Any]: + """Create and run the async upload job for the externally signed model.""" + mr_host = get_model_registry_host( + admin_client=admin_client, + model_registry_namespace=model_registry_namespace, + model_registry_instance=model_registry_instance, + ) + oci_internal = get_oci_internal_endpoint(oci_registry_service=oci_registry_service) + + yield from create_async_upload_job( + admin_client=admin_client, + job_name=f"{ASYNC_UPLOAD_JOB_NAME}-signing", + namespace=service_account.namespace, + async_upload_image=async_upload_image, + s3_secret=signing_s3_secret, + oci_secret=signing_oci_secret, + environment_variables=get_base_async_job_env_vars( + mr_host=mr_host, + sa_token=sa_token, + oci_internal=oci_internal, + oci_repo=SIGNING_ASYNC_REPO, + ), + teardown=teardown_resources, + ) + + +@pytest.fixture(scope="class") +def signing_job_pod( + admin_client: DynamicClient, + signing_async_job: Job, +) -> Pod: + """Get the pod created by the async signing job.""" + return get_latest_job_pod(admin_client=admin_client, job=signing_async_job) + + +@pytest.fixture(scope="class") +def oci_image_with_digest( + ai_hub_oci_registry_host: str, + signing_async_job: Job, +) -> Generator[str, Any, Any]: + """Get the OCI image reference with digest after async job completes.""" + yield from get_oci_image_with_digest( + oci_host=ai_hub_oci_registry_host, + repo=SIGNING_ASYNC_REPO, + tag=SIGNING_ASYNC_TAG, + ) + + +@pytest.fixture(scope="class") +def upload_unsigned_model_to_minio( + admin_client: DynamicClient, + model_registry_namespace: str, + minio_service: Service, +) -> None: + """Upload an unsigned model file to MinIO for native job signing test.""" + source_key = MODEL_SYNC_CONFIG["SOURCE_AWS_KEY"] + bucket = MinIo.Buckets.MODELMESH_EXAMPLE_MODELS + + run_minio_uploader_pod( + admin_client=admin_client, + namespace=model_registry_namespace, + minio_service=minio_service, + pod_name="unsigned-model-uploader", + mc_commands=( + f"echo 'test model content for native signing' > /work/model.onnx && " + f"mc cp /work/model.onnx testminio/{bucket}/{source_key}/model.onnx && " + f"mc ls testminio/{bucket}/{source_key}/ && " + f"echo 'Unsigned model upload completed'" + ), + ) + + +@pytest.fixture(scope="class") +def identity_token_secret( + admin_client: DynamicClient, + service_account: ServiceAccount, + securesign_instance: Securesign, +) -> Generator[Secret, Any, Any]: + """Create a Secret containing a service account token for Sigstore keyless signing. + + The async upload job needs this token mounted at a known path so it can + authenticate with Fulcio for keyless signing. + """ + token = generate_token(temp_base_folder=py_config["tmp_base_dir"]) + with open(token) as f: + token_content = f.read() + + with Secret( + client=admin_client, + name=f"signing-identity-token-{shortuuid.uuid().lower()}", + namespace=service_account.namespace, + data_dict={ + "token": b64_encoded_string(token_content), + }, + ) as secret: + LOGGER.info(f"Created identity token secret: {secret.name}") + yield secret + + +@pytest.fixture(scope="class") +def native_signing_async_job( + admin_client: DynamicClient, + sa_token: str, + service_account: ServiceAccount, + model_registry_namespace: str, + model_registry_instance: list[ModelRegistry], + signing_s3_secret: Secret, + signing_oci_secret: Secret, + oci_registry_service: Service, + mr_access_role_binding: RoleBinding, + async_upload_image: str, + signing_registered_model: RegisteredModel, + upload_unsigned_model_to_minio: None, + identity_token_secret: Secret, + securesign_instance: Securesign, + teardown_resources: bool, +) -> Generator[Job, Any, Any]: + """Create and run the async upload job with native signing enabled. + + Configures the job with MODEL_SYNC_SIGN=true and Sigstore environment + variables so the job signs both the model and the OCI image internally. + """ + mr_host = get_model_registry_host( + admin_client=admin_client, + model_registry_namespace=model_registry_namespace, + model_registry_instance=model_registry_instance, + ) + oci_internal = get_oci_internal_endpoint(oci_registry_service=oci_registry_service) + + # Get Sigstore service URLs for the job + securesign_data = securesign_instance.instance.to_dict() + service_urls = get_tas_service_urls(securesign_instance=securesign_data) + + signing_env_vars = [ + {"name": "MODEL_SYNC_SIGN", "value": "true"}, + {"name": "MODEL_SYNC_SIGNING_IDENTITY_TOKEN_PATH", "value": f"{IDENTITY_TOKEN_MOUNT_PATH}/token"}, + {"name": "SIGSTORE_FULCIO_URL", "value": service_urls["fulcio"]}, + {"name": "SIGSTORE_REKOR_URL", "value": service_urls["rekor"]}, + {"name": "SIGSTORE_TSA_URL", "value": service_urls["tsa"]}, + {"name": "SIGSTORE_TUF_URL", "value": service_urls["tuf"]}, + {"name": "COSIGN_ALLOW_INSECURE_REGISTRY", "value": "true"}, + ] + + yield from create_async_upload_job( + admin_client=admin_client, + job_name=f"{ASYNC_UPLOAD_JOB_NAME}-native-signing", + namespace=service_account.namespace, + async_upload_image=async_upload_image, + s3_secret=signing_s3_secret, + oci_secret=signing_oci_secret, + environment_variables=get_base_async_job_env_vars( + mr_host=mr_host, + sa_token=sa_token, + oci_internal=oci_internal, + oci_repo=NATIVE_SIGNING_REPO, + ) + + signing_env_vars, + extra_volume_mounts=[ + {"name": "identity-token", "readOnly": True, "mountPath": IDENTITY_TOKEN_MOUNT_PATH}, + ], + extra_volumes=[ + {"name": "identity-token", "secret": {"secretName": identity_token_secret.name}}, + ], + teardown=teardown_resources, + ) + + +@pytest.fixture(scope="class") +def native_signing_job_pod( + admin_client: DynamicClient, + native_signing_async_job: Job, +) -> Pod: + """Get the pod created by the native signing async job.""" + return get_latest_job_pod(admin_client=admin_client, job=native_signing_async_job) + + +@pytest.fixture(scope="class") +def native_signing_oci_image_with_digest( + ai_hub_oci_registry_host: str, + native_signing_async_job: Job, +) -> Generator[str, Any, Any]: + """Get the OCI image reference with digest after native signing job completes.""" + yield from get_oci_image_with_digest( + oci_host=ai_hub_oci_registry_host, + repo=NATIVE_SIGNING_REPO, + tag=NATIVE_SIGNING_TAG, + ) diff --git a/tests/model_registry/model_registry/python_client/signing/constants.py b/tests/model_registry/model_registry/python_client/signing/constants.py index e0e835d5d..e9873e4d5 100644 --- a/tests/model_registry/model_registry/python_client/signing/constants.py +++ b/tests/model_registry/model_registry/python_client/signing/constants.py @@ -1,5 +1,10 @@ """Constants for Model Registry Python Client Signing Tests.""" +import time + +from tests.model_registry.constants import MODEL_DICT +from tests.model_registry.model_registry.async_job.constants import MODEL_SYNC_CONFIG + # Securesign instance configuration SECURESIGN_NAMESPACE = "trusted-artifact-signer" SECURESIGN_NAME = "securesign-sample" @@ -13,3 +18,22 @@ # OCI Registry configuration for signed model storage SIGNING_OCI_REPO_NAME = "signing-test/signed-model" SIGNING_OCI_TAG = "latest" +SIGNING_ASYNC_REPO = "async-signing-test/signed-model" +SIGNING_ASYNC_TAG = "latest" +NATIVE_SIGNING_REPO = "native-signing-test/signed-model" +NATIVE_SIGNING_TAG = "latest" +MODEL_CONTENT = b"test model content for async signing pipeline validation" +IDENTITY_TOKEN_MOUNT_PATH = "/var/run/secrets/signing" +MINIO_MC_IMAGE = "quay.io/minio/mc@sha256:470f5546b596e16c7816b9c3fa7a78ce4076bb73c2c73f7faeec0c8043923123" +MINIO_UPLOADER_SECURITY_CONTEXT = { + "allowPrivilegeEscalation": False, + "capabilities": {"drop": ["ALL"]}, + "runAsNonRoot": True, + "seccompProfile": {"type": "RuntimeDefault"}, +} +SIGNING_MODEL_DATA = { + **MODEL_DICT, + "model_name": f"signing-model-{int(time.time())}", + "model_storage_key": MODEL_SYNC_CONFIG["SOURCE_AWS_KEY"], + "model_storage_path": "path/to/test/model", +} diff --git a/tests/model_registry/model_registry/python_client/signing/test_async_signing.py b/tests/model_registry/model_registry/python_client/signing/test_async_signing.py new file mode 100644 index 000000000..a1e6317bb --- /dev/null +++ b/tests/model_registry/model_registry/python_client/signing/test_async_signing.py @@ -0,0 +1,92 @@ +"""Tests for model signing integration with async upload pipeline. + +Flow (Option B): +1. Sign model locally (model.sig created) +2. Upload signed model (model + model.sig) to MinIO +3. Async job converts S3 model to OCI image +4. Sign the OCI image after upload +5. Verify OCI image signature +""" + +from pathlib import Path + +import pytest +import structlog +from model_registry.signing import Signer + +from tests.model_registry.model_registry.async_job.constants import ASYNC_UPLOAD_JOB_NAME +from tests.model_registry.model_registry.python_client.signing.utils import check_model_signature_file +from utilities.constants import MinIo + +LOGGER = structlog.get_logger(name=__name__) + +pytestmark = pytest.mark.usefixtures("skip_if_not_managed_cluster", "tas_connection_type") + + +@pytest.mark.parametrize( + "minio_pod", + [pytest.param(MinIo.PodConfig.MODEL_REGISTRY_MINIO_CONFIG)], + indirect=True, +) +@pytest.mark.usefixtures( + "updated_dsc_component_state_scope_session", + "model_registry_namespace", + "model_registry_metadata_db_resources", + "model_registry_instance", + "minio_pod", + "oci_registry_pod", + "oci_registry_service", + "ai_hub_oci_registry_route", + "set_environment_variables", +) +@pytest.mark.custom_namespace +@pytest.mark.downstream_only +@pytest.mark.tier3 +class TestAsyncSigningE2E: + """ + End-to-end test: sign model locally, upload via async job to OCI, then sign and verify OCI image. + """ + + @pytest.mark.dependency(name="test_model_signed_before_upload") + def test_model_signed_before_upload(self, signed_model_dir: Path): + """Verify model was signed locally before upload to MinIO.""" + assert check_model_signature_file(model_dir=str(signed_model_dir)) + LOGGER.info(f"Model signed successfully at {signed_model_dir}") + + @pytest.mark.dependency( + name="test_async_job_uploads_signed_model", + depends=["test_model_signed_before_upload"], + ) + def test_async_job_uploads_signed_model( + self, + signing_job_pod, + ): + """Verify async job completes and uploads signed model to OCI registry.""" + assert signing_job_pod.name.startswith(f"{ASYNC_UPLOAD_JOB_NAME}-signing") + LOGGER.info(f"Async upload job completed, pod: {signing_job_pod.name}") + + @pytest.mark.dependency(depends=["test_async_job_uploads_signed_model"]) + def test_job_log_signing_disabled( + self, + signing_job_pod, + ): + """Verify the job pod log indicates signing is disabled (external signing flow).""" + expected_message = "Signing is disabled" + assert expected_message in signing_job_pod.log(), f"Expected '{expected_message}' not found in job pod log" + + @pytest.mark.dependency( + name="test_sign_oci_image", + depends=["test_async_job_uploads_signed_model"], + ) + def test_sign_oci_image(self, signer: Signer, oci_image_with_digest: str): + """Sign the OCI image that was created by the async upload job.""" + LOGGER.info(f"Signing OCI image: {oci_image_with_digest}") + signer.sign_image(image=oci_image_with_digest) + LOGGER.info("OCI image signed successfully") + + @pytest.mark.dependency(depends=["test_sign_oci_image"]) + def test_verify_oci_image(self, signer: Signer, oci_image_with_digest: str): + """Verify the signed OCI image.""" + LOGGER.info(f"Verifying OCI image: {oci_image_with_digest}") + signer.verify_image(image=oci_image_with_digest) + LOGGER.info("OCI image verified successfully") diff --git a/tests/model_registry/model_registry/python_client/signing/test_native_job_signing.py b/tests/model_registry/model_registry/python_client/signing/test_native_job_signing.py new file mode 100644 index 000000000..35243c6b3 --- /dev/null +++ b/tests/model_registry/model_registry/python_client/signing/test_native_job_signing.py @@ -0,0 +1,97 @@ +"""Tests for native signing in async upload job (PR #2337). + +Flow: +1. Upload unsigned model to MinIO +2. Async job downloads model, signs it, uploads to OCI, and signs the OCI image +3. Verify the OCI image signature externally + +This tests the job's built-in signing capability via MODEL_SYNC_SIGN=true, +where the job itself handles both model signing and OCI image signing internally. +""" + +import pytest +import structlog +from model_registry.signing import Signer + +from tests.model_registry.model_registry.async_job.constants import ASYNC_UPLOAD_JOB_NAME +from utilities.constants import MinIo + +LOGGER = structlog.get_logger(name=__name__) + +pytestmark = pytest.mark.usefixtures("skip_if_not_managed_cluster", "tas_connection_type") + + +@pytest.mark.parametrize( + "minio_pod", + [pytest.param(MinIo.PodConfig.MODEL_REGISTRY_MINIO_CONFIG)], + indirect=True, +) +@pytest.mark.usefixtures( + "updated_dsc_component_state_scope_session", + "model_registry_namespace", + "model_registry_metadata_db_resources", + "model_registry_instance", + "minio_pod", + "oci_registry_pod", + "oci_registry_service", + "ai_hub_oci_registry_route", + "set_environment_variables", +) +@pytest.mark.custom_namespace +@pytest.mark.downstream_only +@pytest.mark.tier3 +class TestNativeJobSigningE2E: + """ + End-to-end test: async job signs model and OCI image internally via MODEL_SYNC_SIGN=true. + + Unlike TestAsyncSigningE2E (Option B) where signing happens externally, + this test verifies that the async upload job can handle signing natively + when configured with Sigstore environment variables. + """ + + @pytest.mark.dependency(name="test_native_signing_job_completes") + def test_native_signing_job_completes( + self, + native_signing_job_pod, + ): + """Verify the async job completes with native signing enabled.""" + assert native_signing_job_pod.name.startswith(f"{ASYNC_UPLOAD_JOB_NAME}-native-signing") + LOGGER.info(f"Native signing async job completed, pod: {native_signing_job_pod.name}") + + @pytest.mark.dependency(depends=["test_native_signing_job_completes"]) + def test_job_log_contains_signed_image( + self, + native_signing_job_pod, + ): + """Verify the job pod log contains the 'Signed image successfully' message.""" + expected_message = "Signed image successfully: " + assert expected_message in native_signing_job_pod.log(), ( + f"Expected '{expected_message}' not found in job pod log" + ) + + @pytest.mark.dependency(depends=["test_native_signing_job_completes"]) + def test_job_log_contains_verified_image( + self, + native_signing_job_pod, + ): + """Verify the job pod log contains the 'Verified image successfully' message.""" + expected_message = "Verified image successfully: " + assert expected_message in native_signing_job_pod.log(), ( + f"Expected '{expected_message}' not found in job pod log" + ) + + @pytest.mark.dependency(depends=["test_native_signing_job_completes"]) + def test_verify_natively_signed_oci_image( + self, + signer: Signer, + native_signing_oci_image_with_digest: str, + ): + """Verify the OCI image that was signed by the async job itself. + + The job should have signed the image internally using cosign with + the identity token and Sigstore service URLs. We verify externally + to confirm the signature is valid. + """ + LOGGER.info(f"Verifying natively signed OCI image: {native_signing_oci_image_with_digest}") + signer.verify_image(image=native_signing_oci_image_with_digest) + LOGGER.info("Natively signed OCI image verified successfully") diff --git a/tests/model_registry/model_registry/python_client/signing/utils.py b/tests/model_registry/model_registry/python_client/signing/utils.py index 2a5f585d5..28d0221a7 100644 --- a/tests/model_registry/model_registry/python_client/signing/utils.py +++ b/tests/model_registry/model_registry/python_client/signing/utils.py @@ -2,16 +2,34 @@ import hashlib import os +from collections.abc import Generator +from typing import Any import requests import structlog +from kubernetes.dynamic import DynamicClient +from ocp_resources.job import Job +from ocp_resources.pod import Pod +from ocp_resources.secret import Secret +from ocp_resources.service import Service from pyhelper_utils.shell import run_command +from timeout_sampler import TimeoutExpiredError, TimeoutSampler +from tests.model_registry.model_registry.async_job.constants import ( + ASYNC_JOB_ANNOTATIONS, + ASYNC_JOB_LABELS, + MODEL_SYNC_CONFIG, + VOLUME_MOUNTS, +) from tests.model_registry.model_registry.python_client.signing.constants import ( SECURESIGN_NAMESPACE, SECURESIGN_ORGANIZATION_EMAIL, SECURESIGN_ORGANIZATION_NAME, ) +from tests.model_registry.utils import get_endpoint_from_mr_service, get_mr_service_by_label +from utilities.constants import MinIo, OCIRegistry, Protocols +from utilities.general import collect_pod_information +from utilities.resources.model_registry_modelregistry_opendatahub_io import ModelRegistry LOGGER = structlog.get_logger(name=__name__) @@ -133,3 +151,213 @@ def check_model_signature_file(model_dir: str) -> bool: else: LOGGER.info(f"Signature file not found: {sig_file_path}") return False + + +def run_minio_uploader_pod( + admin_client: DynamicClient, + namespace: str, + minio_service: Service, + pod_name: str, + mc_commands: str, + volumes: list[dict[str, Any]] | None = None, + volume_mounts: list[dict[str, Any]] | None = None, +) -> None: + """Run a MinIO mc uploader pod with the given shell commands. + + Creates a pod that sets up an mc alias to MinIO and runs the provided commands. + + Args: + admin_client: Kubernetes dynamic client + namespace: Namespace to create the pod in + minio_service: MinIO service for endpoint resolution + pod_name: Name for the uploader pod + mc_commands: Shell commands to run after mc alias setup (e.g. mc cp ...) + volumes: Additional volumes to mount + volume_mounts: Additional volume mounts for the container + """ + from tests.model_registry.model_registry.python_client.signing.constants import ( + MINIO_MC_IMAGE, + MINIO_UPLOADER_SECURITY_CONTEXT, + ) + + mc_url = f"http://{minio_service.name}.{minio_service.namespace}.svc.cluster.local:{MinIo.Metadata.DEFAULT_PORT}" + + all_volumes = [{"name": "work", "emptyDir": {}}] + if volumes: + all_volumes.extend(volumes) + + all_volume_mounts = [{"name": "work", "mountPath": "/work"}] + if volume_mounts: + all_volume_mounts.extend(volume_mounts) + + mc_setup = ( + f"export MC_CONFIG_DIR=/work/.mc && " + f"mc alias set testminio {mc_url} " + f"{MinIo.Credentials.ACCESS_KEY_VALUE} {MinIo.Credentials.SECRET_KEY_VALUE} && " + f"mc mb --ignore-existing testminio/{MinIo.Buckets.MODELMESH_EXAMPLE_MODELS}" + ) + + with Pod( + client=admin_client, + name=pod_name, + namespace=namespace, + restart_policy="Never", + volumes=all_volumes, + containers=[ + { + "name": "minio-uploader", + "image": MINIO_MC_IMAGE, + "command": ["/bin/sh", "-c"], + "args": [f"{mc_setup} && {mc_commands}"], + "volumeMounts": all_volume_mounts, + "securityContext": MINIO_UPLOADER_SECURITY_CONTEXT, + } + ], + wait_for_resource=True, + ) as upload_pod: + LOGGER.info(f"Running minio uploader pod: {pod_name}") + try: + upload_pod.wait_for_status(status="Succeeded", timeout=300) + except TimeoutExpiredError: + collect_pod_information(pod=upload_pod) + raise + LOGGER.info(f"Minio uploader pod '{pod_name}' completed successfully") + + +def get_base_async_job_env_vars( + mr_host: str, + sa_token: str, + oci_internal: str, + oci_repo: str, +) -> list[dict[str, str]]: + """Build the common environment variables for async upload jobs.""" + return [ + {"name": "MODEL_SYNC_SOURCE_TYPE", "value": MODEL_SYNC_CONFIG["SOURCE_TYPE"]}, + {"name": "MODEL_SYNC_SOURCE_AWS_KEY", "value": MODEL_SYNC_CONFIG["SOURCE_AWS_KEY"]}, + {"name": "MODEL_SYNC_SOURCE_S3_CREDENTIALS_PATH", "value": VOLUME_MOUNTS["SOURCE_CREDS_PATH"]}, + {"name": "MODEL_SYNC_MODEL_ID", "value": MODEL_SYNC_CONFIG["MODEL_ID"]}, + {"name": "MODEL_SYNC_MODEL_VERSION_ID", "value": MODEL_SYNC_CONFIG["MODEL_VERSION_ID"]}, + {"name": "MODEL_SYNC_MODEL_ARTIFACT_ID", "value": MODEL_SYNC_CONFIG["MODEL_ARTIFACT_ID"]}, + {"name": "MODEL_SYNC_REGISTRY_SERVER_ADDRESS", "value": f"https://{mr_host}"}, + {"name": "MODEL_SYNC_REGISTRY_USER_TOKEN", "value": sa_token}, + {"name": "MODEL_SYNC_REGISTRY_IS_SECURE", "value": "False"}, + {"name": "MODEL_SYNC_DESTINATION_OCI_REGISTRY", "value": oci_internal}, + {"name": "MODEL_SYNC_DESTINATION_OCI_URI", "value": f"{oci_internal}/{oci_repo}"}, + {"name": "MODEL_SYNC_DESTINATION_OCI_BASE_IMAGE", "value": MODEL_SYNC_CONFIG["DESTINATION_OCI_BASE_IMAGE"]}, + { + "name": "MODEL_SYNC_DESTINATION_OCI_ENABLE_TLS_VERIFY", + "value": MODEL_SYNC_CONFIG["DESTINATION_OCI_ENABLE_TLS_VERIFY"], + }, + ] + + +def get_oci_internal_endpoint(oci_registry_service: Service) -> str: + """Build the internal OCI registry endpoint from a Service.""" + oci_internal_host = f"{oci_registry_service.name}.{oci_registry_service.namespace}.svc.cluster.local" + return f"{oci_internal_host}:{OCIRegistry.Metadata.DEFAULT_PORT}" + + +def get_model_registry_host( + admin_client: DynamicClient, + model_registry_namespace: str, + model_registry_instance: list[ModelRegistry], +) -> str: + """Resolve the Model Registry REST host from the first instance.""" + mr_instance = model_registry_instance[0] + mr_service = get_mr_service_by_label( + client=admin_client, namespace_name=model_registry_namespace, mr_instance=mr_instance + ) + mr_endpoint = get_endpoint_from_mr_service(svc=mr_service, protocol=Protocols.REST) + return mr_endpoint.split(":")[0] + + +def create_async_upload_job( + admin_client: DynamicClient, + job_name: str, + namespace: str, + async_upload_image: str, + s3_secret: Secret, + oci_secret: Secret, + environment_variables: list[dict[str, str]], + extra_volume_mounts: list[dict[str, Any]] | None = None, + extra_volumes: list[dict[str, Any]] | None = None, + teardown: bool = True, +) -> Generator[Job, Any, Any]: + """Create and run an async upload Job with the given configuration.""" + volume_mounts = [ + {"name": "source-credentials", "readOnly": True, "mountPath": VOLUME_MOUNTS["SOURCE_CREDS_PATH"]}, + {"name": "destination-credentials", "readOnly": True, "mountPath": VOLUME_MOUNTS["DEST_CREDS_PATH"]}, + ] + volumes = [ + {"name": "source-credentials", "secret": {"secretName": s3_secret.name}}, + {"name": "destination-credentials", "secret": {"secretName": oci_secret.name}}, + ] + + if extra_volume_mounts: + volume_mounts.extend(extra_volume_mounts) + if extra_volumes: + volumes.extend(extra_volumes) + + with Job( + client=admin_client, + name=job_name, + namespace=namespace, + label=ASYNC_JOB_LABELS, + annotations=ASYNC_JOB_ANNOTATIONS, + restart_policy="Never", + containers=[ + { + "name": "async-upload", + "image": async_upload_image, + "volumeMounts": volume_mounts, + "env": environment_variables, + } + ], + volumes=volumes, + teardown=teardown, + ) as job: + job.wait_for_condition(condition="Complete", status="True") + LOGGER.info(f"Async upload job '{job_name}' completed successfully") + yield job + + +def get_oci_image_with_digest(oci_host: str, repo: str, tag: str) -> Generator[str, Any, Any]: + """Get OCI image reference with digest and set COSIGN_ALLOW_INSECURE_REGISTRY.""" + registry_url = f"https://{oci_host}" + + LOGGER.info(f"Waiting for OCI registry to be reachable at {registry_url}/v2/") + for sample in TimeoutSampler( + wait_timeout=120, + sleep=5, + func=requests.get, + url=f"{registry_url}/v2/", + timeout=5, + verify=False, + ): + if sample.ok: + break + + tags_url = f"{registry_url}/v2/{repo}/tags/list" + response = requests.get(url=tags_url, verify=False, timeout=10) + response.raise_for_status() + tags_data = response.json() + LOGGER.info(f"OCI registry tags: {tags_data}") + assert tag in tags_data.get("tags", []), f"Expected tag '{tag}' not found in registry: {tags_data}" + + manifest_url = f"{registry_url}/v2/{repo}/manifests/{tag}" + manifest_response = requests.get( + url=manifest_url, + headers={"Accept": "application/vnd.oci.image.index.v1+json, application/vnd.oci.image.manifest.v1+json"}, + verify=False, + timeout=10, + ) + manifest_response.raise_for_status() + digest = manifest_response.headers.get("Docker-Content-Digest") + assert digest, "Could not get digest from manifest response" + + image_ref = f"{oci_host}/{repo}@{digest}" + LOGGER.info(f"OCI image reference with digest: {image_ref}") + + os.environ["COSIGN_ALLOW_INSECURE_REGISTRY"] = "true" + yield image_ref + os.environ.pop("COSIGN_ALLOW_INSECURE_REGISTRY", None) From add2c18e1d2be585f3ece227dcbf62b6cef60280 Mon Sep 17 00:00:00 2001 From: Debarati Basu-Nag Date: Thu, 26 Mar 2026 19:26:33 -0400 Subject: [PATCH 2/4] chore: rename signing test file names Signed-off-by: Debarati Basu-Nag Co-Authored-By: Claude --- ...t_signing_infrastructure.py => test_infrastructure_signing.py} | 0 .../signing/{test_async_signing.py => test_signing_async_job.py} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename tests/model_registry/model_registry/python_client/signing/{test_signing_infrastructure.py => test_infrastructure_signing.py} (100%) rename tests/model_registry/model_registry/python_client/signing/{test_async_signing.py => test_signing_async_job.py} (100%) diff --git a/tests/model_registry/model_registry/python_client/signing/test_signing_infrastructure.py b/tests/model_registry/model_registry/python_client/signing/test_infrastructure_signing.py similarity index 100% rename from tests/model_registry/model_registry/python_client/signing/test_signing_infrastructure.py rename to tests/model_registry/model_registry/python_client/signing/test_infrastructure_signing.py diff --git a/tests/model_registry/model_registry/python_client/signing/test_async_signing.py b/tests/model_registry/model_registry/python_client/signing/test_signing_async_job.py similarity index 100% rename from tests/model_registry/model_registry/python_client/signing/test_async_signing.py rename to tests/model_registry/model_registry/python_client/signing/test_signing_async_job.py From 91af61f1bf9c4be50409bfb6feb2db673b15ef1d Mon Sep 17 00:00:00 2001 From: Debarati Basu-Nag Date: Thu, 26 Mar 2026 19:43:34 -0400 Subject: [PATCH 3/4] fix: namespace calls should have client param Signed-off-by: Debarati Basu-Nag Co-Authored-By: Claude --- .../model_registry/python_client/signing/conftest.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/model_registry/model_registry/python_client/signing/conftest.py b/tests/model_registry/model_registry/python_client/signing/conftest.py index b708bc0e2..8cccbe9d4 100644 --- a/tests/model_registry/model_registry/python_client/signing/conftest.py +++ b/tests/model_registry/model_registry/python_client/signing/conftest.py @@ -138,7 +138,7 @@ def installed_tas_operator(admin_client: DynamicClient) -> Generator[None, Any]: None: Operator is ready for use """ distribution = py_config["distribution"] - operator_ns = Namespace(name=OPENSHIFT_OPERATORS, ensure_exists=True) + operator_ns = Namespace(client=admin_client, name=OPENSHIFT_OPERATORS, ensure_exists=True) package_name = "rhtas-operator" # Determine operator source: ODH uses community-operators, RHOAI uses redhat-operators @@ -179,7 +179,7 @@ def installed_tas_operator(admin_client: DynamicClient) -> Generator[None, Any]: clean_up_namespace=False, ) # Ensure namespace exists for Securesign - ns = Namespace(name=SECURESIGN_NAMESPACE) + ns = Namespace(client=admin_client, name=SECURESIGN_NAMESPACE) if ns.exists: ns.delete(wait=True) else: @@ -212,7 +212,7 @@ def securesign_instance( Resource: Securesign resource instance """ # Ensure namespace exists for Securesign - ns = Namespace(name=SECURESIGN_NAMESPACE) + ns = Namespace(client=admin_client, name=SECURESIGN_NAMESPACE) ns.wait_for_status(status=Namespace.Status.ACTIVE) # Build Securesign CR spec From 390ddd62c89adc23be6c00d636ac16bc0c354454 Mon Sep 17 00:00:00 2001 From: Debarati Basu-Nag Date: Fri, 27 Mar 2026 11:57:52 -0400 Subject: [PATCH 4/4] fix: remove unnecessary tests Signed-off-by: Debarati Basu-Nag Co-Authored-By: Claude --- .../python_client/signing/conftest.py | 147 ------------------ .../signing/test_signing_async_job.py | 92 ----------- 2 files changed, 239 deletions(-) delete mode 100644 tests/model_registry/model_registry/python_client/signing/test_signing_async_job.py diff --git a/tests/model_registry/model_registry/python_client/signing/conftest.py b/tests/model_registry/model_registry/python_client/signing/conftest.py index 8cccbe9d4..3d3e6a887 100644 --- a/tests/model_registry/model_registry/python_client/signing/conftest.py +++ b/tests/model_registry/model_registry/python_client/signing/conftest.py @@ -1,6 +1,5 @@ """Fixtures for Model Registry Python Client Signing Tests.""" -import base64 import json import logging import os @@ -39,14 +38,11 @@ from tests.model_registry.model_registry.async_job.utils import get_latest_job_pod from tests.model_registry.model_registry.python_client.signing.constants import ( IDENTITY_TOKEN_MOUNT_PATH, - MODEL_CONTENT, NATIVE_SIGNING_REPO, NATIVE_SIGNING_TAG, SECURESIGN_API_VERSION, SECURESIGN_NAME, SECURESIGN_NAMESPACE, - SIGNING_ASYNC_REPO, - SIGNING_ASYNC_TAG, SIGNING_MODEL_DATA, SIGNING_OCI_REPO_NAME, SIGNING_OCI_TAG, @@ -708,149 +704,6 @@ def signing_registered_model( ) -@pytest.fixture(scope="class") -def class_scoped_signer(set_environment_variables: None) -> Signer: - """Class-scoped signer instance for use in class-scoped fixtures.""" - signer = Signer( - identity_token_path=os.environ["IDENTITY_TOKEN_PATH"], - root_url=os.environ["ROOT_URL"], - root_checksum=os.environ["ROOT_CHECKSUM"], - log_level=logging.DEBUG, - ) - signer.initialize(force=True) - LOGGER.info("Class-scoped signer initialized") - return signer - - -@pytest.fixture(scope="class") -def signed_model_dir(class_scoped_signer: Signer, tmp_path_factory: pytest.TempPathFactory) -> Path: - """Create a model directory, sign it, and return the path.""" - model_dir = tmp_path_factory.mktemp(basename="async-signing-model") - model_file = model_dir / "model.onnx" - model_file.write_bytes(data=MODEL_CONTENT) - - LOGGER.info(f"Signing model in {model_dir}") - class_scoped_signer.sign_model(model_path=str(model_dir)) - - sig_file = model_dir / "model.sig" - assert sig_file.exists(), "model.sig not created after signing" - LOGGER.info(f"Model signed successfully, model.sig size: {sig_file.stat().st_size} bytes") - return model_dir - - -@pytest.fixture(scope="class") -def signed_model_secret( - admin_client: DynamicClient, - model_registry_namespace: str, - signed_model_dir: Path, -) -> Generator[Secret, Any, Any]: - """Create a Secret containing the signed model files for upload to MinIO.""" - model_onnx = (signed_model_dir / "model.onnx").read_bytes() - model_sig = (signed_model_dir / "model.sig").read_bytes() - - with Secret( - client=admin_client, - name=f"signed-model-files-{shortuuid.uuid().lower()}", - namespace=model_registry_namespace, - data_dict={ - "model.onnx": base64.b64encode(model_onnx).decode(), - "model.sig": base64.b64encode(model_sig).decode(), - }, - ) as secret: - LOGGER.info(f"Created signed model secret: {secret.name}") - yield secret - - -@pytest.fixture(scope="class") -def upload_signed_model_to_minio( - admin_client: DynamicClient, - model_registry_namespace: str, - minio_service: Service, - signed_model_secret: Secret, -) -> None: - """Upload signed model files (model.onnx + model.sig) to MinIO.""" - source_key = MODEL_SYNC_CONFIG["SOURCE_AWS_KEY"] - bucket = MinIo.Buckets.MODELMESH_EXAMPLE_MODELS - - run_minio_uploader_pod( - admin_client=admin_client, - namespace=model_registry_namespace, - minio_service=minio_service, - pod_name="signed-model-uploader", - mc_commands=( - f"mc cp /model-files/model.onnx testminio/{bucket}/{source_key}/model.onnx && " - f"mc cp /model-files/model.sig testminio/{bucket}/{source_key}/model.sig && " - f"mc ls testminio/{bucket}/{source_key}/ && " - f"echo 'Signed model upload completed'" - ), - volumes=[{"name": "model-files", "secret": {"secretName": signed_model_secret.name}}], - volume_mounts=[{"name": "model-files", "mountPath": "/model-files", "readOnly": True}], - ) - - -@pytest.fixture(scope="class") -def signing_async_job( - admin_client: DynamicClient, - sa_token: str, - service_account: ServiceAccount, - model_registry_namespace: str, - model_registry_instance: list[ModelRegistry], - signing_s3_secret: Secret, - signing_oci_secret: Secret, - oci_registry_service: Service, - mr_access_role_binding: RoleBinding, - async_upload_image: str, - signing_registered_model: RegisteredModel, - upload_signed_model_to_minio: None, - teardown_resources: bool, -) -> Generator[Job, Any, Any]: - """Create and run the async upload job for the externally signed model.""" - mr_host = get_model_registry_host( - admin_client=admin_client, - model_registry_namespace=model_registry_namespace, - model_registry_instance=model_registry_instance, - ) - oci_internal = get_oci_internal_endpoint(oci_registry_service=oci_registry_service) - - yield from create_async_upload_job( - admin_client=admin_client, - job_name=f"{ASYNC_UPLOAD_JOB_NAME}-signing", - namespace=service_account.namespace, - async_upload_image=async_upload_image, - s3_secret=signing_s3_secret, - oci_secret=signing_oci_secret, - environment_variables=get_base_async_job_env_vars( - mr_host=mr_host, - sa_token=sa_token, - oci_internal=oci_internal, - oci_repo=SIGNING_ASYNC_REPO, - ), - teardown=teardown_resources, - ) - - -@pytest.fixture(scope="class") -def signing_job_pod( - admin_client: DynamicClient, - signing_async_job: Job, -) -> Pod: - """Get the pod created by the async signing job.""" - return get_latest_job_pod(admin_client=admin_client, job=signing_async_job) - - -@pytest.fixture(scope="class") -def oci_image_with_digest( - ai_hub_oci_registry_host: str, - signing_async_job: Job, -) -> Generator[str, Any, Any]: - """Get the OCI image reference with digest after async job completes.""" - yield from get_oci_image_with_digest( - oci_host=ai_hub_oci_registry_host, - repo=SIGNING_ASYNC_REPO, - tag=SIGNING_ASYNC_TAG, - ) - - @pytest.fixture(scope="class") def upload_unsigned_model_to_minio( admin_client: DynamicClient, diff --git a/tests/model_registry/model_registry/python_client/signing/test_signing_async_job.py b/tests/model_registry/model_registry/python_client/signing/test_signing_async_job.py deleted file mode 100644 index a1e6317bb..000000000 --- a/tests/model_registry/model_registry/python_client/signing/test_signing_async_job.py +++ /dev/null @@ -1,92 +0,0 @@ -"""Tests for model signing integration with async upload pipeline. - -Flow (Option B): -1. Sign model locally (model.sig created) -2. Upload signed model (model + model.sig) to MinIO -3. Async job converts S3 model to OCI image -4. Sign the OCI image after upload -5. Verify OCI image signature -""" - -from pathlib import Path - -import pytest -import structlog -from model_registry.signing import Signer - -from tests.model_registry.model_registry.async_job.constants import ASYNC_UPLOAD_JOB_NAME -from tests.model_registry.model_registry.python_client.signing.utils import check_model_signature_file -from utilities.constants import MinIo - -LOGGER = structlog.get_logger(name=__name__) - -pytestmark = pytest.mark.usefixtures("skip_if_not_managed_cluster", "tas_connection_type") - - -@pytest.mark.parametrize( - "minio_pod", - [pytest.param(MinIo.PodConfig.MODEL_REGISTRY_MINIO_CONFIG)], - indirect=True, -) -@pytest.mark.usefixtures( - "updated_dsc_component_state_scope_session", - "model_registry_namespace", - "model_registry_metadata_db_resources", - "model_registry_instance", - "minio_pod", - "oci_registry_pod", - "oci_registry_service", - "ai_hub_oci_registry_route", - "set_environment_variables", -) -@pytest.mark.custom_namespace -@pytest.mark.downstream_only -@pytest.mark.tier3 -class TestAsyncSigningE2E: - """ - End-to-end test: sign model locally, upload via async job to OCI, then sign and verify OCI image. - """ - - @pytest.mark.dependency(name="test_model_signed_before_upload") - def test_model_signed_before_upload(self, signed_model_dir: Path): - """Verify model was signed locally before upload to MinIO.""" - assert check_model_signature_file(model_dir=str(signed_model_dir)) - LOGGER.info(f"Model signed successfully at {signed_model_dir}") - - @pytest.mark.dependency( - name="test_async_job_uploads_signed_model", - depends=["test_model_signed_before_upload"], - ) - def test_async_job_uploads_signed_model( - self, - signing_job_pod, - ): - """Verify async job completes and uploads signed model to OCI registry.""" - assert signing_job_pod.name.startswith(f"{ASYNC_UPLOAD_JOB_NAME}-signing") - LOGGER.info(f"Async upload job completed, pod: {signing_job_pod.name}") - - @pytest.mark.dependency(depends=["test_async_job_uploads_signed_model"]) - def test_job_log_signing_disabled( - self, - signing_job_pod, - ): - """Verify the job pod log indicates signing is disabled (external signing flow).""" - expected_message = "Signing is disabled" - assert expected_message in signing_job_pod.log(), f"Expected '{expected_message}' not found in job pod log" - - @pytest.mark.dependency( - name="test_sign_oci_image", - depends=["test_async_job_uploads_signed_model"], - ) - def test_sign_oci_image(self, signer: Signer, oci_image_with_digest: str): - """Sign the OCI image that was created by the async upload job.""" - LOGGER.info(f"Signing OCI image: {oci_image_with_digest}") - signer.sign_image(image=oci_image_with_digest) - LOGGER.info("OCI image signed successfully") - - @pytest.mark.dependency(depends=["test_sign_oci_image"]) - def test_verify_oci_image(self, signer: Signer, oci_image_with_digest: str): - """Verify the signed OCI image.""" - LOGGER.info(f"Verifying OCI image: {oci_image_with_digest}") - signer.verify_image(image=oci_image_with_digest) - LOGGER.info("OCI image verified successfully")