diff --git a/src/cloudai/systems/kubernetes/kubernetes_system.py b/src/cloudai/systems/kubernetes/kubernetes_system.py index 2a0c40e63..a74bc1fc6 100644 --- a/src/cloudai/systems/kubernetes/kubernetes_system.py +++ b/src/cloudai/systems/kubernetes/kubernetes_system.py @@ -143,7 +143,7 @@ def _is_job_running(self, job: KubernetesJob) -> bool: logging.debug(f"Checking for job '{job.name}' of kind '{job.kind}' to determine if it is running.") if "mpijob" in job.kind.lower(): - return self._is_mpijob_running(job.name) + return self._is_mpijob_running(job) elif "job" in job.kind.lower(): return self._is_batch_job_running(job.name) elif "dynamographdeployment" in job.kind.lower(): @@ -153,20 +153,22 @@ def _is_job_running(self, job: KubernetesJob) -> bool: logging.error(error_message) raise ValueError(error_message) - def _is_mpijob_running(self, job_name: str) -> bool: + def _is_mpijob_running(self, job: KubernetesJob) -> bool: try: mpijob = self.custom_objects_api.get_namespaced_custom_object( group="kubeflow.org", version="v2beta1", namespace=self.default_namespace, plural="mpijobs", - name=job_name, + name=job.name, ) assert isinstance(mpijob, dict) status: dict = cast(dict, mpijob.get("status", {})) conditions = status.get("conditions", []) - logging.debug(f"MPIJob '{job_name}': {conditions=} {status=}") + logging.debug(f"MPIJob '{job.name}': {conditions=} {status=}") + + self.store_logs_for_job(job.name, job.test_run.output_path) # Consider an empty conditions list as running if not conditions: @@ -183,11 +185,11 @@ def _is_mpijob_running(self, job_name: str) -> bool: except lazy.k8s.client.ApiException as e: if e.status == 404: - logging.debug(f"MPIJob '{job_name}' not found. It may have completed and been removed from the system.") + logging.debug(f"MPIJob '{job.name}' not found. It may have completed and been removed from the system.") return False else: error_message = ( - f"Error occurred while retrieving status for MPIJob '{job_name}' " + f"Error occurred while retrieving status for MPIJob '{job.name}' " f"Error code: {e.status}. Message: {e.reason}. Please check the job name, namespace, and " "Kubernetes API server." ) @@ -392,6 +394,7 @@ def kill(self, job: BaseJob) -> None: job (BaseJob): The job to be terminated. """ k_job: KubernetesJob = cast(KubernetesJob, job) + self.store_logs_for_job(k_job.name, k_job.test_run.output_path) self.delete_job(k_job.name, k_job.kind) def delete_job(self, job_name: str, job_kind: str) -> None: