Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,30 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
if isMPIJobSuspended(mpiJob) {
msg := fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, kubeflow.JobRunning, corev1.ConditionFalse, mpiJobSuspendedReason, msg)
} else if isFinished(mpiJob.Status) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else if isFinished(mpiJob.Status) {
} else if isFinished(mpiJob.Status) && getCondition(mpiJob.Status, kubeflow.JobRunning) == nil{

It seems that we can simplify the if-else structure by doing this.

// Job reached a terminal state. Do not re-emit Running=True to avoid having a LastTransition time after
// the completion time.
// If Running was never set (e.g. job completed before the operator observed it running), add Running=False
// using the completionTime so that consumers computing duration still find the condition and can make
// some guesses about the job lifecycle.
if getCondition(mpiJob.Status, kubeflow.JobRunning) == nil {
msg := fmt.Sprintf("MPIJob %s/%s is finished but Running condition was never set.", mpiJob.Namespace, mpiJob.Name)
cond := kubeflow.JobCondition{
Type: kubeflow.JobRunning,
Status: corev1.ConditionFalse,
Reason: mpiJobRunningReason,
Message: msg,
}
if mpiJob.Status.CompletionTime != nil {
cond.LastTransitionTime = *mpiJob.Status.CompletionTime
cond.LastUpdateTime = *mpiJob.Status.CompletionTime
} else {
now := metav1.Now()
cond.LastTransitionTime = now
cond.LastUpdateTime = now
}
mpiJob.Status.Conditions = append(mpiJob.Status.Conditions, cond)
}
Comment on lines +1169 to +1186
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if getCondition(mpiJob.Status, kubeflow.JobRunning) == nil {
msg := fmt.Sprintf("MPIJob %s/%s is finished but Running condition was never set.", mpiJob.Namespace, mpiJob.Name)
cond := kubeflow.JobCondition{
Type: kubeflow.JobRunning,
Status: corev1.ConditionFalse,
Reason: mpiJobRunningReason,
Message: msg,
}
if mpiJob.Status.CompletionTime != nil {
cond.LastTransitionTime = *mpiJob.Status.CompletionTime
cond.LastUpdateTime = *mpiJob.Status.CompletionTime
} else {
now := metav1.Now()
cond.LastTransitionTime = now
cond.LastUpdateTime = now
}
mpiJob.Status.Conditions = append(mpiJob.Status.Conditions, cond)
}
msg := fmt.Sprintf("MPIJob %s/%s is finished but Running condition was never set.", mpiJob.Namespace, mpiJob.Name)
cond := kubeflow.JobCondition{
Type: kubeflow.JobRunning,
Status: corev1.ConditionFalse,
Reason: mpiJobRunningReason,
Message: msg,
}
updateTime := ptr.Deref(mpiJob.Status.CompletionTime, c.clock.Now())
cond.LastTransitionTime := updateTime
cond.LastUpdateTime := updateTime
mpiJob.Status.Conditions = append(mpiJob.Status.Conditions, cond)
  1. Unnest the condition
  2. Use the clock
  3. Simplify pointer deref operation

} else if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) {
msg := fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, kubeflow.JobRunning, corev1.ConditionTrue, mpiJobRunningReason, msg)
Expand Down
68 changes: 68 additions & 0 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,71 @@ func TestLauncherSucceeded(t *testing.T) {
updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, msg)
msg = fmt.Sprintf("MPIJob %s/%s successfully completed.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJobCopy, kubeflow.JobSucceeded, corev1.ConditionTrue, mpiJobSucceededReason, msg)
// Running=False is added when the job finishes without Running ever being set.
msg = fmt.Sprintf("MPIJob %s/%s is finished but Running condition was never set.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJobCopy, kubeflow.JobRunning, corev1.ConditionFalse, mpiJobRunningReason, msg)
f.expectUpdateMPIJobStatusAction(mpiJobCopy)

f.run(t.Context(), getKey(mpiJob, t))
}

// TestLauncherSucceededWithRunningPod tests that when a launcher Job has succeeded but its pod is still observed as Running due to
// informer lag). Workers have been cleaned up. The Running condition is set to False rather than being re-emitted as True alongside
// Succeeded.
func TestLauncherSucceededWithRunningPod(t *testing.T) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explicitly check if the Running LastConditions are later or equal to completionTime?

Because testing libraries ignore such time comparison: https://github.com/GonzaloSaez/mpi-operator/blob/8c953844bc5a5deb7ba98d5e1e85a324b6aa2645/pkg/controller/mpi_job_controller_test.go#L350

f := newFixture(t, "")

startTime := metav1.Now()
completionTime := metav1.Now()
Comment on lines +646 to +647
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use clocktesting?


mpiJob := newMPIJob("test", ptr.To[int32](64), &startTime, &completionTime)
// Pre-set the Running condition to simulate a job that was running before completion.
msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, msg)
msg = fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, kubeflow.JobRunning, corev1.ConditionTrue, mpiJobRunningReason, msg)
f.setUpMPIJob(mpiJob)

fmjc := f.newFakeMPIJobController()
mpiJobCopy := mpiJob.DeepCopy()
scheme.Scheme.Default(mpiJobCopy)
launcher := fmjc.newLauncherJob(mpiJobCopy)
// Launcher Job has succeeded.
launcher.Status.Conditions = append(launcher.Status.Conditions, []batchv1.JobCondition{
{
Type: batchv1.JobSuccessCriteriaMet,
Status: corev1.ConditionTrue,
},
{
Type: batchv1.JobComplete,
Status: corev1.ConditionTrue,
},
}...)
launcher.Status.StartTime = &startTime
launcher.Status.CompletionTime = &completionTime
f.setUpLauncher(launcher)

// Launcher pod is still observed as Running despite the Job having completed.
launcherPod := mockJobPod(launcher)
launcherPod.Status.Phase = corev1.PodRunning
f.setUpPod(launcherPod)

mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{
kubeflow.MPIReplicaTypeLauncher: {
Active: 0,
Succeeded: 1,
Failed: 0,
},
kubeflow.MPIReplicaTypeWorker: {},
}
setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime)

msg = fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, msg)
msg = fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJobCopy, kubeflow.JobRunning, corev1.ConditionFalse, mpiJobRunningReason, msg)
msg = fmt.Sprintf("MPIJob %s/%s successfully completed.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJobCopy, kubeflow.JobSucceeded, corev1.ConditionTrue, mpiJobSucceededReason, msg)
f.expectUpdateMPIJobStatusAction(mpiJobCopy)

f.run(t.Context(), getKey(mpiJob, t))
Expand Down Expand Up @@ -692,6 +757,9 @@ func TestLauncherFailed(t *testing.T) {
updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, msg)
msg = "Job has reached the specified backoff limit: second message"
updateMPIJobConditions(mpiJobCopy, kubeflow.JobFailed, corev1.ConditionTrue, batchv1.JobReasonBackoffLimitExceeded+"/FailedReason2", msg)
// Running=False is added when the job finishes without Running ever being set.
msg = fmt.Sprintf("MPIJob %s/%s is finished but Running condition was never set.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJobCopy, kubeflow.JobRunning, corev1.ConditionFalse, mpiJobRunningReason, msg)

f.expectUpdateMPIJobStatusAction(mpiJobCopy)

Expand Down
Loading