Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,12 @@ func (jc *JobController) ReconcileJobs(
jobStatus.CompletionTime = &now
}

// If the Job exceeds backoff limit or is past active deadline
// delete all pods and services, then set the status to failed
// Set JobFailed condition BEFORE cleanup so that DeletePodsAndServices
// sees IsFinished()==true and correctly respects cleanPodPolicy.
// Fix: https://github.com/kubeflow/trainer/issues/3419
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)
commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)

if err := jc.DeletePodsAndServices(runtimeObject, runPolicy, jobStatus, pods); err != nil {
return err
}
Expand All @@ -240,10 +244,6 @@ func (jc *JobController) ReconcileJobs(
}
}

jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)

commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)

return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus)
} else {
// General cases which need to reconcile
Expand Down
97 changes: 97 additions & 0 deletions pkg/controller.v1/common/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ func TestDeletePodsAndServices(T *testing.T) {
wantPods: &corev1.PodList{},
wantService: &corev1.ServiceList{},
},
"Unfinished job with cleanPodPolicy None deletes pods (pre-fix backoffLimit bug)": {
cleanPodPolicy: apiv1.CleanPodPolicyNone,
jobCondition: "",
wantPods: &corev1.PodList{},
wantService: &corev1.ServiceList{},
},
}
for name, tc := range cases {
T.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -261,6 +267,97 @@ func TestManagedByExternalController(T *testing.T) {
}
}

// TestBackoffLimitExceededRespectsCleanPodPolicy verifies that cleanPodPolicy is
// respected when a job exceeds its backoffLimit. See https://github.com/kubeflow/trainer/issues/3419
func TestBackoffLimitExceededRespectsCleanPodPolicy(T *testing.T) {
cases := map[string]struct {
cleanPodPolicy apiv1.CleanPodPolicy
setConditionBefore bool
wantPodsPreserved bool
}{
"cleanPodPolicy None with JobFailed set before delete (fixed)": {
cleanPodPolicy: apiv1.CleanPodPolicyNone,
setConditionBefore: true,
wantPodsPreserved: true,
},
"cleanPodPolicy None without JobFailed set before delete (buggy)": {
cleanPodPolicy: apiv1.CleanPodPolicyNone,
setConditionBefore: false,
wantPodsPreserved: false,
},
"cleanPodPolicy All with JobFailed set before delete": {
cleanPodPolicy: apiv1.CleanPodPolicyAll,
setConditionBefore: true,
wantPodsPreserved: false,
},
"cleanPodPolicy Running with JobFailed set before delete": {
cleanPodPolicy: apiv1.CleanPodPolicyRunning,
setConditionBefore: true,
wantPodsPreserved: false,
},
}
for name, tc := range cases {
T.Run(name, func(t *testing.T) {
masterPod := newPod("master-0", corev1.PodRunning)
masterPod.Status.ContainerStatuses = []corev1.ContainerStatus{
{RestartCount: 3},
}
workerPod := newPod("worker-0", corev1.PodRunning)
pods := []runtime.Object{masterPod, workerPod}
services := []runtime.Object{
newService("master-0"),
newService("worker-0"),
}

fakeClient := fake.NewSimpleClientset(append(pods, services...)...)
jobController := JobController{
PodControl: control.RealPodControl{KubeClient: fakeClient, Recorder: &record.FakeRecorder{}},
ServiceControl: control.RealServiceControl{KubeClient: fakeClient, Recorder: &record.FakeRecorder{}},
}

now := metav1.Now()
jobStatus := apiv1.JobStatus{
CompletionTime: &now,
}
runPolicy := &apiv1.RunPolicy{
CleanPodPolicy: &tc.cleanPodPolicy,
}

if tc.setConditionBefore {
jobStatus.Conditions = append(jobStatus.Conditions, apiv1.JobCondition{
Type: apiv1.JobFailed,
Status: corev1.ConditionTrue,
Reason: "BackoffLimitExceeded",
})
}

var inPods []*corev1.Pod
for i := range pods {
inPods = append(inPods, pods[i].(*corev1.Pod))
}

if err := jobController.DeletePodsAndServices(&testjobv1.TestJob{}, runPolicy, jobStatus, inPods); err != nil {
t.Fatalf("DeletePodsAndServices failed: %v", err)
}

gotPods, err := fakeClient.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
if err != nil {
t.Fatalf("Failed to list pods: %v", err)
}

if tc.wantPodsPreserved {
if len(gotPods.Items) != 2 {
t.Errorf("Expected pods to be preserved, but got %d pods", len(gotPods.Items))
}
} else {
if len(gotPods.Items) != 0 {
t.Errorf("Expected pods to be deleted, but got %d pods", len(gotPods.Items))
}
}
})
}
}

func newPod(name string, phase corev1.PodPhase) *corev1.Pod {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down
Loading