Skip to content
110 changes: 71 additions & 39 deletions snakemake_executor_plugin_kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import shlex
import subprocess
import time
from typing import List, Generator, Optional, Self
from typing import Any, AsyncGenerator, List, Optional, Self
import uuid

import kubernetes
Expand Down Expand Up @@ -388,15 +388,15 @@ def run_job(self, job: JobExecutorInterface):
self.logger.error(f"Failed to create pod: {e}")
raise WorkflowError(f"Failed to create pod: {e}")

self.logger.info("Get status with:\n" "kubectl describe job {jobid}\n")
self.logger.info(f"Get status with: kubectl describe job {jobid}")

self.report_job_submission(
SubmittedJobInfo(job=job, external_jobid=jobid, aux={"pod": pod})
)

async def check_active_jobs(
self, active_jobs: List[SubmittedJobInfo]
) -> Generator[SubmittedJobInfo, None, None]:
) -> AsyncGenerator[SubmittedJobInfo, None]:
# Check the status of active jobs.

# You have to iterate over the given list active_jobs.
Expand All @@ -416,24 +416,23 @@ async def check_active_jobs(
async with self.status_rate_limiter:
try:
res = self._kubernetes_retry(
lambda: self.batchapi.read_namespaced_job_status(
lambda j=j: self.batchapi.read_namespaced_job_status(
j.external_jobid, self.namespace
)
)
except kubernetes.client.rest.ApiException as e:
if e.status == 404:
# Jobid not found
# The job is likely already done and was deleted on
# the server.
j.callback(j.job)
continue
else:
self.logger.error(f"ApiException when checking pod status: {e}")
self.report_job_error(j, msg=str(e))
continue
self.logger.error(f"ApiException when checking pod status: {e}")
continue
except WorkflowError as e:
self.logger.error(f"WorkflowError when checking pod status: {e}")
self.report_job_error(j, msg=str(e))
continue

if res is None:
msg = (
"Unknown job {jobid}. Has the job been deleted manually?"
).format(jobid=j.external_jobid)
self.logger.error(msg)
self.report_job_error(j, msg=msg)
continue

# Sometimes, just checking the status of a job is not enough, because
Expand All @@ -442,9 +441,11 @@ async def check_active_jobs(
# that a pod is already terminated.
# We therefore check the status of the snakemake container in addition
# to the job status.
pods = self.kubeapi.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"job-name={j.external_jobid}",
pods = self._kubernetes_retry(
lambda j=j: self.kubeapi.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"job-name={j.external_jobid}",
)
)
assert len(pods.items) <= 1
if pods.items:
Expand All @@ -459,42 +460,59 @@ async def check_active_jobs(
if snakemake_container.state.terminated is not None
else None
)
pod_name = pod.metadata.name
else:
snakemake_container = None
snakemake_container_exit_code = None
pod_name = None

if res is None:
msg = (
"Unknown job {jobid}. Has the job been deleted manually?"
).format(jobid=j.external_jobid)
self.logger.error(msg)
self.report_job_error(j, msg=msg)
elif res.status.failed == 1 or (
if (res.status.failed and res.status.failed > 0) or (
snakemake_container_exit_code is not None
and snakemake_container_exit_code != 0
):
msg = (
"For details, please issue:\n"
f"kubectl describe job {j.external_jobid}"
)
# failed
kube_log = self.log_path / f"{j.external_jobid}.log"
with open(kube_log, "w") as f:
kube_log_content = self.kubeapi.read_namespaced_pod_log(
name=pod.metadata.name,
namespace=self.namespace,
container=snakemake_container.name,
)
print(kube_log_content, file=f)

if pod_name is not None:
assert snakemake_container is not None
kube_log = self.log_path / f"{j.external_jobid}.log"
with open(kube_log, "w") as f:

def read_log(
pod_name=pod_name,
container_name=snakemake_container.name,
):
return self.kubeapi.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace,
container=container_name,
)

kube_log_content = self._kubernetes_retry(read_log)
print(kube_log_content, file=f)
aux_logs = [str(kube_log)]
else:
aux_logs = []

self.logger.error(f"Job {j.external_jobid} failed. {msg}")
self.report_job_error(j, msg=msg, aux_logs=[str(kube_log)])
elif res.status.succeeded == 1 or (snakemake_container_exit_code == 0):
self.report_job_error(j, msg=msg, aux_logs=aux_logs)

self._kubernetes_retry(
lambda j=j: self.safe_delete_job(
j.external_jobid, ignore_not_found=True
)
)
elif (res.status.succeeded and res.status.succeeded >= 1) or (
snakemake_container_exit_code == 0
):
# finished
self.logger.info(f"Job {j.external_jobid} succeeded.")
self.report_job_success(j)

self._kubernetes_retry(
lambda: self.safe_delete_job(
lambda j=j: self.safe_delete_job(
j.external_jobid, ignore_not_found=True
)
)
Expand Down Expand Up @@ -558,13 +576,27 @@ def safe_delete_job(self, jobid, ignore_not_found=True):
import kubernetes.client

body = kubernetes.client.V1DeleteOptions()
self.logger.debug(f"Deleting job {jobid} in namespace {self.namespace}")
try:
# Usually, kubernetes should delete the pods automatically
# when the job is deleted, but in some cases, this does not
# happen, so we delete the pods manually.
pods = self.kubeapi.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"job-name={jobid}",
)
for pod in pods.items:
self.logger.debug(f"Deleting pod {pod.metadata.name} for job {jobid}")
self.kubeapi.delete_namespaced_pod(
pod.metadata.name, self.namespace, body=body
)

self.batchapi.delete_namespaced_job(
jobid, self.namespace, propagation_policy="Foreground", body=body
)
except kubernetes.client.rest.ApiException as e:
if e.status == 404 and ignore_not_found:
self.logger.warning(
self.logger.debug(
"[WARNING] 404 not found when trying to delete the job: {jobid}\n"
"[WARNING] Ignore this error\n".format(jobid=jobid)
)
Expand Down Expand Up @@ -602,7 +634,7 @@ def _reauthenticate_and_retry(self, func=None):
if func:
return func()

def _kubernetes_retry(self, func):
def _kubernetes_retry(self, func) -> Any:
import kubernetes
import urllib3

Expand Down
Loading