Skip to content

Commit 7492878

Browse files
committed
change: use wait_for_condition
1 parent 7907902 commit 7492878

File tree

3 files changed

+1
-72
lines changed

3 files changed

+1
-72
lines changed

tests/model_registry/async_job/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ def model_sync_async_job(
230230
],
231231
teardown=teardown_resources,
232232
) as job:
233+
job.wait_for_condition(condition="Complete", status="True")
233234
yield job
234235

235236

tests/model_registry/async_job/test_async_upload_e2e.py

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,12 @@
66
from ocp_resources.job import Job
77
from ocp_resources.route import Route
88
from model_registry.types import ArtifactState, RegisteredModelState
9-
from timeout_sampler import TimeoutSampler, TimeoutExpiredError
109
from tests.model_registry.async_job.constants import (
11-
ASYNC_UPLOAD_IMAGE,
1210
ASYNC_UPLOAD_JOB_NAME,
1311
)
1412
from tests.model_registry.async_job.utils import (
1513
get_latest_job_pod,
1614
pull_manifest_from_oci_registry,
17-
check_job_completion,
1815
)
1916
from tests.model_registry.constants import MODEL_DICT
2017
from utilities.constants import MinIo, OCIRegistry
@@ -65,10 +62,6 @@
6562
class TestAsyncUploadE2E:
6663
"""Test for async upload job with real MinIO, OCI registry, and Model Registry"""
6764

68-
@pytest.mark.skipif(
69-
ASYNC_UPLOAD_IMAGE.startswith("PLACEHOLDER"),
70-
reason="Downstream image not yet available - job will fail to start",
71-
)
7265
def test_async_upload_job(
7366
self: Self,
7467
admin_client: DynamicClient,
@@ -82,31 +75,6 @@ def test_async_upload_job(
8275
job_pod = get_latest_job_pod(admin_client=admin_client, job=model_sync_async_job)
8376
assert job_pod.name.startswith(ASYNC_UPLOAD_JOB_NAME)
8477

85-
# Verify job is created and configured correctly
86-
assert model_sync_async_job.exists
87-
LOGGER.info("Job created successfully")
88-
89-
# Wait for job to complete
90-
try:
91-
for sample in TimeoutSampler(
92-
wait_timeout=300, # 5 minutes
93-
sleep=10, # Check every 10 seconds
94-
func=lambda: check_job_completion(admin_client=admin_client, job=model_sync_async_job),
95-
):
96-
if sample: # Job completed successfully
97-
break
98-
except TimeoutExpiredError:
99-
# Timeout reached - get final logs for debugging
100-
LOGGER.error("Job timed out after 300 seconds")
101-
try:
102-
current_pod = get_latest_job_pod(admin_client=admin_client, job=model_sync_async_job)
103-
pod_logs = current_pod.log()
104-
LOGGER.error(f"Final pod logs: {pod_logs}")
105-
except Exception as e:
106-
LOGGER.error(f"Could not retrieve final pod logs: {e}")
107-
108-
pytest.fail("Async upload job did not complete within 300 seconds")
109-
11078
# Verify OCI registry contains the uploaded artifact
11179
registry_host = oci_registry_route.instance.spec.host
11280
registry_url = f"http://{registry_host}"
@@ -123,18 +91,15 @@ def test_async_upload_job(
12391
# Verify the manifest has the expected structure
12492
assert "manifests" in manifest, "Manifest should contain manifests section"
12593
assert len(manifest["manifests"]) > 0, "Manifest should have at least one manifest"
126-
12794
LOGGER.info(f"Manifest contains {len(manifest['manifests'])} layer(s)")
12895

12996
# Verify model registry metadata was updated
13097
LOGGER.info("Verifying model registry model and artifact")
13198
client = model_registry_client[0]
13299
model = client.get_registered_model(name=MODEL_NAME)
133-
LOGGER.info(f"Model: {model}")
134100
assert model.state == RegisteredModelState.LIVE
135101

136102
model_artifact = client.get_model_artifact(name=MODEL_NAME, version=MODEL_DATA["model_version"])
137-
LOGGER.info(f"Model artifact: {model_artifact}")
138103

139104
# Validate model artifact attributes
140105
assert model_artifact.name == MODEL_NAME

tests/model_registry/async_job/utils.py

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,49 +6,12 @@
66
from ocp_resources.pod import Pod
77
from ocp_resources.service import Service
88
from utilities.constants import MinIo
9-
109
from utilities.general import b64_encoded_string
1110
from simple_logger.logger import get_logger
1211

1312
LOGGER = get_logger(name=__name__)
1413

1514

16-
def check_job_completion(admin_client: DynamicClient, job: Job) -> bool:
17-
"""
18-
Check if job has completed successfully.
19-
20-
Args:
21-
admin_client: Kubernetes dynamic client
22-
job: Job object to check
23-
24-
Returns:
25-
bool: True if job completed successfully, False otherwise
26-
"""
27-
job_status = job.instance.status
28-
29-
if hasattr(job_status, "succeeded") and job_status.succeeded:
30-
LOGGER.info("Job completed successfully")
31-
return True
32-
33-
# Log current pod status for debugging
34-
try:
35-
current_pod = get_latest_job_pod(admin_client=admin_client, job=job)
36-
pod_status = current_pod.instance.status
37-
if hasattr(pod_status, "phase"):
38-
if pod_status.phase == "Succeeded":
39-
LOGGER.info(f"Pod {current_pod.name} succeeded")
40-
elif pod_status.phase == "Failed":
41-
LOGGER.warning(f"Pod {current_pod.name} failed, job may retry with new pod")
42-
elif pod_status.phase == "Running":
43-
LOGGER.info(f"Pod {current_pod.name} is running...")
44-
else:
45-
LOGGER.info(f"Pod {current_pod.name} in phase: {pod_status.phase}")
46-
except Exception as e:
47-
LOGGER.warning(f"Could not get pod status: {e}")
48-
49-
return False
50-
51-
5215
def get_latest_job_pod(admin_client: DynamicClient, job: Job) -> Pod:
5316
"""Get the latest (most recently created) Pod created by a Job"""
5417
pods = list(

0 commit comments

Comments
 (0)