Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 36 additions & 30 deletions utilities/infra.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def get_jira_status(jira):
return jira_connection.issue(id=jira).fields.status.name.lower()


def get_pods(dyn_client, namespace, label=None):
def get_pods(dyn_client: DynamicClient, namespace: Namespace, label: str = "") -> list[Pod]:
return list(
Pod.get(
dyn_client=dyn_client,
Expand All @@ -271,27 +271,27 @@ def wait_for_pods_deletion(pods):
pod.wait_deleted()


def get_pod_container_error_status(pod):
pod_instance_status = pod.instance.status
# Check the containerStatuses and if any containers is in waiting state, return that information:

for container_status in pod_instance_status.get("containerStatuses", []):
waiting_container = container_status.get("state", {}).get("waiting")
if waiting_container:
return waiting_container["reason"] if waiting_container.get("reason") else waiting_container
def get_pod_container_error_status(pod: Pod) -> str | None:
try:
pod_instance_status = pod.instance.status
# Check the containerStatuses and if any container is in waiting state, return that information:
for container_status in pod_instance_status.get("containerStatuses", []):
if waiting_container := container_status.get("state", {}).get("waiting"):
return waiting_container["reason"] if waiting_container.get("reason") else waiting_container
return None
except NotFoundError:
LOGGER.error(f"Pod {pod.name} was not found")
raise


def get_not_running_pods(pods, filter_pods_by_name=None):
def get_not_running_pods(pods: list[Pod], filter_pods_by_name: str = "") -> list[dict[str, str]]:
pods_not_running = []
for pod in pods:
pod_instance = pod.instance
if filter_pods_by_name and filter_pods_by_name in pod.name:
LOGGER.warning(f"Ignoring pod: {pod.name} for pod state validations.")
continue
container_status_error = get_pod_container_error_status(pod=pod)
if container_status_error:
pods_not_running.append({pod.name: container_status_error})
try:
pod_instance = pod.instance
# Waits for all pods in a given namespace to be in final healthy state(running/completed).
# We also need to keep track of pods marked for deletion as not running. This would ensure any
# pod that was spinned up in place of pod marked for deletion, reaches healthy state before end
Expand All @@ -301,18 +301,20 @@ def get_not_running_pods(pods, filter_pods_by_name=None):
pod.Status.SUCCEEDED,
):
pods_not_running.append({pod.name: pod.status})
elif container_status_error := get_pod_container_error_status(pod=pod):
pods_not_running.append({pod.name: container_status_error})
except (ResourceNotFoundError, NotFoundError):
LOGGER.warning(f"Ignoring pod {pod.name} that disappeared during cluster sanity check")
pods_not_running.append({pod.name: "Deleted"})
return pods_not_running


def wait_for_pods_running(
admin_client,
namespace,
number_of_consecutive_checks=1,
filter_pods_by_name=None,
):
admin_client: DynamicClient,
namespace: Namespace,
number_of_consecutive_checks: int = 1,
filter_pods_by_name: str = "",
) -> None:
"""
Waits for all pods in a given namespace to reach Running/Completed state. To avoid catching all pods in running
state too soon, use number_of_consecutive_checks with appropriate values.
Expand All @@ -328,26 +330,30 @@ def wait_for_pods_running(
samples = TimeoutSampler(
wait_timeout=TIMEOUT_2MIN,
sleep=TIMEOUT_5SEC,
func=get_not_running_pods,
pods=list(Pod.get(dyn_client=admin_client, namespace=namespace.name)),
filter_pods_by_name=filter_pods_by_name,
func=get_pods,
dyn_client=admin_client,
namespace=namespace,
exceptions_dict={NotFoundError: []},
)

sample = None
not_running_pods = []
try:
current_check = 0
for sample in samples:
if not sample:
current_check += 1
if current_check >= number_of_consecutive_checks:
return True
else:
current_check = 0
if sample:
if not_running_pods := get_not_running_pods(pods=sample, filter_pods_by_name=filter_pods_by_name):
LOGGER.warning(f"Not running pods: {not_running_pods}")
current_check = 0
else:
current_check += 1
if current_check >= number_of_consecutive_checks:
return
except TimeoutExpiredError:
if sample:
if not_running_pods:
LOGGER.error(
f"timeout waiting for all pods in namespace {namespace.name} to reach "
f"running state, following pods are in not running state: {sample}"
f"running state, following pods are in not running state: {not_running_pods}"
)
raise

Expand Down