Skip to content

Commit 560f952

Browse files
authored
Wait for pods to be in running state before attempting to create ModelRegistry (#270)
* on rebase clean commented-by- labels * Wait for pods to be in running state before attempting to create ModelRegistry
1 parent 8fdbcf7 commit 560f952

File tree

2 files changed

+81
-3
lines changed

2 files changed

+81
-3
lines changed

tests/model_registry/conftest.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
get_mr_service_by_label,
3838
get_model_registry_deployment_template_dict,
3939
get_model_registry_db_label_dict,
40+
wait_for_pods_running,
4041
)
4142
from utilities.constants import Annotations, Protocols, DscComponents
4243
from model_registry import ModelRegistry as ModelRegistryClient
@@ -221,8 +222,7 @@ def after_call(self, response: Response, case: Case) -> None:
221222

222223
@pytest.fixture(scope="class")
223224
def updated_dsc_component_state_scope_class(
224-
request: FixtureRequest,
225-
dsc_resource: DataScienceCluster,
225+
request: FixtureRequest, dsc_resource: DataScienceCluster, admin_client: DynamicClient
226226
) -> Generator[DataScienceCluster, Any, Any]:
227227
original_components = dsc_resource.instance.spec.components
228228
with ResourceEditor(patches={dsc_resource: {"spec": {"components": request.param["component_patch"]}}}):
@@ -233,6 +233,11 @@ def updated_dsc_component_state_scope_class(
233233
name=dsc_resource.instance.spec.components.modelregistry.registriesNamespace, ensure_exists=True
234234
)
235235
namespace.wait_for_status(status=Namespace.Status.ACTIVE)
236+
wait_for_pods_running(
237+
admin_client=admin_client,
238+
namespace_name=py_config["applications_namespace"],
239+
number_of_consecutive_checks=6,
240+
)
236241
yield dsc_resource
237242

238243
for component_name, value in request.param["component_patch"].items():

tests/model_registry/utils.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,21 @@
22

33
from kubernetes.dynamic import DynamicClient
44
from ocp_resources.namespace import Namespace
5+
from ocp_resources.pod import Pod
56
from ocp_resources.service import Service
67
from ocp_resources.model_registry import ModelRegistry
78
from kubernetes.dynamic.exceptions import ResourceNotFoundError
8-
9+
from simple_logger.logger import get_logger
10+
from timeout_sampler import TimeoutExpiredError, TimeoutSampler
11+
from kubernetes.dynamic.exceptions import NotFoundError
912
from tests.model_registry.constants import MR_DB_IMAGE_DIGEST
1013
from utilities.exceptions import ProtocolNotSupportedError, TooManyServicesError
1114
from utilities.constants import Protocols, Annotations
1215

1316
ADDRESS_ANNOTATION_PREFIX: str = "routing.opendatahub.io/external-address-"
1417

18+
LOGGER = get_logger(name=__name__)
19+
1520

1621
def get_mr_service_by_label(client: DynamicClient, ns: Namespace, mr_instance: ModelRegistry) -> Service:
1722
"""
@@ -156,3 +161,71 @@ def get_model_registry_db_label_dict(db_resource_name: str) -> dict[str, str]:
156161
Annotations.KubernetesIo.INSTANCE: db_resource_name,
157162
Annotations.KubernetesIo.PART_OF: db_resource_name,
158163
}
164+
165+
166+
def get_pod_container_error_status(pod: Pod) -> str | None:
167+
"""
168+
Check container error status for a given pod and if any containers is in waiting state, return that information
169+
"""
170+
pod_instance_status = pod.instance.status
171+
for container_status in pod_instance_status.get("containerStatuses", []):
172+
if waiting_container := container_status.get("state", {}).get("waiting"):
173+
return waiting_container["reason"] if waiting_container.get("reason") else waiting_container
174+
return ""
175+
176+
177+
def get_not_running_pods(pods: list[Pod]) -> list[dict[str, Any]]:
178+
# Gets all the non-running pods from a given namespace.
179+
# Note: We need to keep track of pods marked for deletion as not running. This would ensure any
180+
# pod that was spun up in place of pod marked for deletion, are not ignored
181+
pods_not_running = []
182+
try:
183+
for pod in pods:
184+
pod_instance = pod.instance
185+
if container_status_error := get_pod_container_error_status(pod=pod):
186+
pods_not_running.append({pod.name: container_status_error})
187+
188+
if pod_instance.metadata.get("deletionTimestamp") or pod_instance.status.phase not in (
189+
pod.Status.RUNNING,
190+
pod.Status.SUCCEEDED,
191+
):
192+
pods_not_running.append({pod.name: pod.status})
193+
except (ResourceNotFoundError, NotFoundError) as exc:
194+
LOGGER.warning("Ignoring pod that disappeared during cluster sanity check: %s", exc)
195+
return pods_not_running
196+
197+
198+
def wait_for_pods_running(
199+
admin_client: DynamicClient,
200+
namespace_name: str,
201+
number_of_consecutive_checks: int = 1,
202+
) -> bool | None:
203+
"""
204+
Waits for all pods in a given namespace to reach Running/Completed state. To avoid catching all pods in running
205+
state too soon, use number_of_consecutive_checks with appropriate values.
206+
"""
207+
samples = TimeoutSampler(
208+
wait_timeout=180,
209+
sleep=5,
210+
func=get_not_running_pods,
211+
pods=list(Pod.get(dyn_client=admin_client, namespace=namespace_name)),
212+
exceptions_dict={NotFoundError: [], ResourceNotFoundError: []},
213+
)
214+
sample = None
215+
try:
216+
current_check = 0
217+
for sample in samples:
218+
if not sample:
219+
current_check += 1
220+
if current_check >= number_of_consecutive_checks:
221+
return True
222+
else:
223+
current_check = 0
224+
except TimeoutExpiredError:
225+
if sample:
226+
LOGGER.error(
227+
f"timeout waiting for all pods in namespace {namespace_name} to reach "
228+
f"running state, following pods are in not running state: {sample}"
229+
)
230+
raise
231+
return None

0 commit comments

Comments
 (0)