Skip to content

Commit eb11bfa

Browse files
authored
Fix race condition in Kueue tests by waiting for CRDs to be available (opendatahub-io#1017)
* add function with retry logic to make sure crd are available * refactor ensure_kueue_unmanaged_in_dsc fixture * add comments and improve docstring * fix precommit issues * add false positive fix for dsc readiness check * remove .get() calls for dsc
1 parent bf9f169 commit eb11bfa

File tree

3 files changed

+139
-21
lines changed

3 files changed

+139
-21
lines changed

tests/model_serving/model_server/conftest.py

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from contextlib import ExitStack
12
from typing import Any, Generator, Dict
23

34
import pytest
@@ -23,6 +24,7 @@
2324
LocalQueue,
2425
ClusterQueue,
2526
ResourceFlavor,
27+
wait_for_kueue_crds_available,
2628
)
2729
from pytest_testconfig import config as py_config
2830
from simple_logger.logger import get_logger
@@ -38,6 +40,10 @@
3840
from utilities.constants import (
3941
ModelAndFormat,
4042
)
43+
from utilities.data_science_cluster_utils import (
44+
get_dsc_ready_condition,
45+
wait_for_dsc_reconciliation,
46+
)
4147
from utilities.inference_utils import create_isvc
4248
from utilities.infra import (
4349
s3_endpoint_secret,
@@ -418,6 +424,7 @@ def gpu_model_car_inference_service(
418424

419425
# Kueue Fixtures
420426
def _is_kueue_operator_installed(admin_client: DynamicClient) -> bool:
427+
"""Check if the Kueue operator is installed and ready."""
421428
try:
422429
csvs = list(
423430
ClusterServiceVersion.get(
@@ -438,35 +445,39 @@ def _is_kueue_operator_installed(admin_client: DynamicClient) -> bool:
438445
def ensure_kueue_unmanaged_in_dsc(
439446
admin_client: DynamicClient, dsc_resource: DataScienceCluster
440447
) -> Generator[None, Any, None]:
448+
"""Set DSC Kueue to Unmanaged and wait for CRDs to be available."""
441449
try:
442450
if not _is_kueue_operator_installed(admin_client):
443451
pytest.skip("Kueue operator is not installed, skipping Kueue tests")
444452

445-
dsc_resource.get()
453+
# Check current Kueue state
446454
kueue_management_state = dsc_resource.instance.spec.components[DscComponents.KUEUE].managementState
447455

448-
if kueue_management_state == DscComponents.ManagementState.UNMANAGED:
449-
LOGGER.info("Kueue is already Unmanaged in DSC, proceeding with tests")
450-
yield
451-
else:
452-
LOGGER.info(f"Kueue management state is {kueue_management_state}, updating to Unmanaged")
453-
dsc_dict = {
454-
"spec": {
455-
"components": {DscComponents.KUEUE: {"managementState": DscComponents.ManagementState.UNMANAGED}}
456+
with ExitStack() as stack:
457+
# Only patch if Kueue is not already Unmanaged
458+
if kueue_management_state != DscComponents.ManagementState.UNMANAGED:
459+
LOGGER.info(f"Patching Kueue from {kueue_management_state} to Unmanaged")
460+
# Read timestamp BEFORE applying patch
461+
ready_condition = get_dsc_ready_condition(dsc=dsc_resource)
462+
pre_patch_time = ready_condition.get("lastTransitionTime") if ready_condition else None
463+
464+
dsc_dict = {
465+
"spec": {
466+
"components": {
467+
DscComponents.KUEUE: {"managementState": DscComponents.ManagementState.UNMANAGED}
468+
}
469+
}
456470
}
457-
}
471+
stack.enter_context(cm=ResourceEditor(patches={dsc_resource: dsc_dict}))
472+
473+
# Wait for DSC to reconcile the patch
474+
wait_for_dsc_reconciliation(dsc=dsc_resource, baseline_time=pre_patch_time)
475+
else:
476+
LOGGER.info("Kueue already Unmanaged, no patch needed")
458477

459-
with ResourceEditor(patches={dsc_resource: dsc_dict}):
460-
LOGGER.info("Updated Kueue to Unmanaged, waiting for DSC to be ready")
461-
dsc_resource.wait_for_condition(condition="Ready", status="True", timeout=300)
462-
LOGGER.info("DSC is ready, proceeding with tests")
463-
yield
464-
465-
LOGGER.info(f"Restoring Kueue management state to {kueue_management_state}")
466-
restore_dict = {"spec": {"components": {DscComponents.KUEUE: {"managementState": kueue_management_state}}}}
467-
with ResourceEditor(patches={dsc_resource: restore_dict}):
468-
dsc_resource.wait_for_condition(condition="Ready", status="True", timeout=300)
469-
LOGGER.info("Restored Kueue management state")
478+
# Always wait for Kueue CRDs and controller pods (regardless of patch)
479+
wait_for_kueue_crds_available(client=admin_client)
480+
yield
470481

471482
except (AttributeError, KeyError) as e:
472483
pytest.skip(f"Kueue component not found in DSC: {e}")

utilities/data_science_cluster_utils.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from ocp_resources.data_science_cluster import DataScienceCluster
55
from ocp_resources.resource import ResourceEditor
66
from simple_logger.logger import get_logger
7+
from timeout_sampler import retry
78

89
from utilities.constants import DscComponents
910

@@ -52,3 +53,61 @@ def update_components_in_dsc(
5253

5354
else:
5455
yield dsc
56+
57+
58+
def get_dsc_ready_condition(dsc: DataScienceCluster) -> dict[str, Any] | None:
59+
"""Get DSC Ready condition.
60+
61+
Args:
62+
dsc: DataScienceCluster resource
63+
64+
Returns:
65+
The Ready condition dict (with 'status', 'lastTransitionTime', etc.), or None if not found
66+
"""
67+
return next(
68+
(
69+
condition
70+
for condition in dsc.instance.status.conditions or []
71+
if condition.type == DataScienceCluster.Condition.READY
72+
),
73+
None,
74+
)
75+
76+
77+
@retry(wait_timeout=300, sleep=5)
78+
def wait_for_dsc_reconciliation(dsc: DataScienceCluster, baseline_time: str | None) -> bool:
79+
"""Wait for DSC to reconcile after a ResourceEditor patch.
80+
81+
This function prevents false positives where DSC reports Ready=True immediately
82+
after a patch, before actual reconciliation begins. It waits for:
83+
1. lastTransitionTime to change (reconciliation started)
84+
2. Ready=True condition (reconciliation completed)
85+
86+
Args:
87+
dsc: DataScienceCluster resource
88+
baseline_time: The Ready condition lastTransitionTime before the patch, or None if not found
89+
90+
Returns:
91+
True when DSC has reconciled and is Ready
92+
"""
93+
ready_condition = get_dsc_ready_condition(dsc=dsc)
94+
current_time = ready_condition.get("lastTransitionTime") if ready_condition else None
95+
dsc_reconciling = current_time != baseline_time
96+
dsc_ready = ready_condition and ready_condition.get("status") == DataScienceCluster.Condition.Status.TRUE
97+
98+
# Still waiting for reconciliation to start (timestamp unchanged)
99+
if not dsc_reconciling:
100+
LOGGER.info(f"Waiting for DSC reconciliation to start (baseline: {baseline_time or 'None'})...")
101+
return False
102+
103+
# Timestamp changed but DSC is not Ready yet
104+
if not dsc_ready:
105+
LOGGER.info(f"DSC reconciliation in progress (timestamp: {current_time or 'None'}), waiting for Ready=True...")
106+
return False
107+
108+
# DSC Reconciled: timestamp changed AND Ready=True
109+
LOGGER.info(
110+
f"DSC reconciliation complete: timestamp changed from {baseline_time or 'None'} "
111+
f"to {current_time or 'None'} and Ready=True"
112+
)
113+
return True

utilities/kueue_utils.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
from kubernetes.dynamic import DynamicClient
44
from ocp_resources.resource import NamespacedResource, Resource, MissingRequiredArgumentError
55
from ocp_resources.pod import Pod
6+
from simple_logger.logger import get_logger
7+
from timeout_sampler import retry
8+
from utilities.constants import Timeout
9+
10+
LOGGER = get_logger(name=__name__)
611

712

813
class ResourceFlavor(Resource):
@@ -170,3 +175,46 @@ def check_gated_pods_and_running_pods(
170175
):
171176
gated_pods += 1
172177
return running_pods, gated_pods
178+
179+
180+
@retry(
181+
wait_timeout=Timeout.TIMEOUT_4MIN,
182+
sleep=5,
183+
)
184+
def wait_for_kueue_crds_available(client: DynamicClient) -> bool:
185+
"""Wait for Kueue CRDs and controller to be fully available.
186+
187+
This function waits for:
188+
1. Kueue CRDs to be registered in the API server
189+
2. kueue-controller-manager pods to be Ready (needed for webhooks/admission control)
190+
191+
Raises:
192+
TimeoutExpiredError: If CRDs or controller are not available within the timeout period.
193+
194+
Returns:
195+
True when CRDs are available and controller is ready.
196+
"""
197+
# Check if CRDs are registered (raises exception if not, then will @retry)
198+
list(ResourceFlavor.get(client=client))
199+
200+
# Check kueue-controller-manager pods exist and are ready
201+
pods = list(
202+
Pod.get(
203+
label_selector="control-plane=controller-manager,app.kubernetes.io/name=kueue",
204+
namespace="openshift-kueue-operator",
205+
client=client,
206+
)
207+
)
208+
all_pods_ready = pods and all(
209+
any(
210+
condition.type == Pod.Condition.READY and condition.status == Pod.Condition.Status.TRUE
211+
for condition in pod.instance.status.conditions or []
212+
)
213+
for pod in pods
214+
)
215+
if not all_pods_ready:
216+
LOGGER.info("Kueue controller pods not ready yet, retrying...")
217+
return False
218+
219+
LOGGER.info(f"Kueue is ready: CRDs available and {len(pods)} controller pod(s) running")
220+
return True

0 commit comments

Comments
 (0)