diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index 4af15e25..a84d7e29 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -1160,6 +1160,30 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher if isMPIJobSuspended(mpiJob) { msg := fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name) updateMPIJobConditions(mpiJob, kubeflow.JobRunning, corev1.ConditionFalse, mpiJobSuspendedReason, msg) + } else if isFinished(mpiJob.Status) { + // Job reached a terminal state. Do not re-emit Running=True to avoid having a LastTransition time after + // the completion time. + // If Running was never set (e.g. job completed before the operator observed it running), add Running=False + // using the completionTime so that consumers computing duration still find the condition and can make + // some guesses about the job lifecycle. + if getCondition(mpiJob.Status, kubeflow.JobRunning) == nil { + msg := fmt.Sprintf("MPIJob %s/%s is finished but Running condition was never set.", mpiJob.Namespace, mpiJob.Name) + cond := kubeflow.JobCondition{ + Type: kubeflow.JobRunning, + Status: corev1.ConditionFalse, + Reason: mpiJobRunningReason, + Message: msg, + } + if mpiJob.Status.CompletionTime != nil { + cond.LastTransitionTime = *mpiJob.Status.CompletionTime + cond.LastUpdateTime = *mpiJob.Status.CompletionTime + } else { + now := metav1.Now() + cond.LastTransitionTime = now + cond.LastUpdateTime = now + } + mpiJob.Status.Conditions = append(mpiJob.Status.Conditions, cond) + } } else if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) { msg := fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name) updateMPIJobConditions(mpiJob, kubeflow.JobRunning, corev1.ConditionTrue, mpiJobRunningReason, msg) diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index 888a7f4a..4e61deb8 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -629,6 +629,71 @@ func TestLauncherSucceeded(t *testing.T) { updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, msg) msg = fmt.Sprintf("MPIJob %s/%s successfully completed.", mpiJob.Namespace, mpiJob.Name) updateMPIJobConditions(mpiJobCopy, kubeflow.JobSucceeded, corev1.ConditionTrue, mpiJobSucceededReason, msg) + // Running=False is added when the job finishes without Running ever being set. + msg = fmt.Sprintf("MPIJob %s/%s is finished but Running condition was never set.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobRunning, corev1.ConditionFalse, mpiJobRunningReason, msg) + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.run(t.Context(), getKey(mpiJob, t)) +} + +// TestLauncherSucceededWithRunningPod tests that when a launcher Job has succeeded but its pod is still observed as Running due to +// informer lag). Workers have been cleaned up. The Running condition is set to False rather than being re-emitted as True alongside +// Succeeded. +func TestLauncherSucceededWithRunningPod(t *testing.T) { + f := newFixture(t, "") + + startTime := metav1.Now() + completionTime := metav1.Now() + + mpiJob := newMPIJob("test", ptr.To[int32](64), &startTime, &completionTime) + // Pre-set the Running condition to simulate a job that was running before completion. + msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJob, kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, msg) + msg = fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJob, kubeflow.JobRunning, corev1.ConditionTrue, mpiJobRunningReason, msg) + f.setUpMPIJob(mpiJob) + + fmjc := f.newFakeMPIJobController() + mpiJobCopy := mpiJob.DeepCopy() + scheme.Scheme.Default(mpiJobCopy) + launcher := fmjc.newLauncherJob(mpiJobCopy) + // Launcher Job has succeeded. + launcher.Status.Conditions = append(launcher.Status.Conditions, []batchv1.JobCondition{ + { + Type: batchv1.JobSuccessCriteriaMet, + Status: corev1.ConditionTrue, + }, + { + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + }, + }...) + launcher.Status.StartTime = &startTime + launcher.Status.CompletionTime = &completionTime + f.setUpLauncher(launcher) + + // Launcher pod is still observed as Running despite the Job having completed. + launcherPod := mockJobPod(launcher) + launcherPod.Status.Phase = corev1.PodRunning + f.setUpPod(launcherPod) + + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: { + Active: 0, + Succeeded: 1, + Failed: 0, + }, + kubeflow.MPIReplicaTypeWorker: {}, + } + setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) + + msg = fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, msg) + msg = fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobRunning, corev1.ConditionFalse, mpiJobRunningReason, msg) + msg = fmt.Sprintf("MPIJob %s/%s successfully completed.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobSucceeded, corev1.ConditionTrue, mpiJobSucceededReason, msg) f.expectUpdateMPIJobStatusAction(mpiJobCopy) f.run(t.Context(), getKey(mpiJob, t)) @@ -692,6 +757,9 @@ func TestLauncherFailed(t *testing.T) { updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, msg) msg = "Job has reached the specified backoff limit: second message" updateMPIJobConditions(mpiJobCopy, kubeflow.JobFailed, corev1.ConditionTrue, batchv1.JobReasonBackoffLimitExceeded+"/FailedReason2", msg) + // Running=False is added when the job finishes without Running ever being set. + msg = fmt.Sprintf("MPIJob %s/%s is finished but Running condition was never set.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobRunning, corev1.ConditionFalse, mpiJobRunningReason, msg) f.expectUpdateMPIJobStatusAction(mpiJobCopy)