Skip to content

Commit 4b73a4d

Browse files
committed
fix: respect cleanPodPolicy when job exceeds backoffLimit
Move UpdateJobConditions(JobFailed) before DeletePodsAndServices in the jobExceedsLimit block so that IsFinished() returns true and the cleanPodPolicy: None guard is not bypassed. Fixes: kubeflow#3419 Signed-off-by: Aviad Hayumi <aviad.hayumi@run.ai>
1 parent d0b6ebc commit 4b73a4d

2 files changed

Lines changed: 101 additions & 6 deletions

File tree

pkg/controller.v1/common/job.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,10 @@ func (jc *JobController) ReconcileJobs(
220220
jobStatus.CompletionTime = &now
221221
}
222222

223-
// If the Job exceeds backoff limit or is past active deadline
224-
// delete all pods and services, then set the status to failed
223+
// Fix: https://github.com/kubeflow/trainer/issues/3419
224+
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)
225+
commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)
226+
225227
if err := jc.DeletePodsAndServices(runtimeObject, runPolicy, jobStatus, pods); err != nil {
226228
return err
227229
}
@@ -240,10 +242,6 @@ func (jc *JobController) ReconcileJobs(
240242
}
241243
}
242244

243-
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)
244-
245-
commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)
246-
247245
return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus)
248246
} else {
249247
// General cases which need to reconcile

pkg/controller.v1/common/job_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ func TestDeletePodsAndServices(T *testing.T) {
9797
wantPods: &corev1.PodList{},
9898
wantService: &corev1.ServiceList{},
9999
},
100+
"Unfinished job with cleanPodPolicy None deletes pods (pre-fix backoffLimit bug)": {
101+
cleanPodPolicy: apiv1.CleanPodPolicyNone,
102+
jobCondition: "",
103+
wantPods: &corev1.PodList{},
104+
wantService: &corev1.ServiceList{},
105+
},
100106
}
101107
for name, tc := range cases {
102108
T.Run(name, func(t *testing.T) {
@@ -261,6 +267,97 @@ func TestManagedByExternalController(T *testing.T) {
261267
}
262268
}
263269

270+
// TestBackoffLimitExceededRespectsCleanPodPolicy verifies that cleanPodPolicy is
271+
// respected when a job exceeds its backoffLimit. See https://github.com/kubeflow/trainer/issues/3419
272+
func TestBackoffLimitExceededRespectsCleanPodPolicy(T *testing.T) {
273+
cases := map[string]struct {
274+
cleanPodPolicy apiv1.CleanPodPolicy
275+
setConditionBefore bool
276+
wantPodsPreserved bool
277+
}{
278+
"cleanPodPolicy None with JobFailed set before delete (fixed)": {
279+
cleanPodPolicy: apiv1.CleanPodPolicyNone,
280+
setConditionBefore: true,
281+
wantPodsPreserved: true,
282+
},
283+
"cleanPodPolicy None without JobFailed set before delete (buggy)": {
284+
cleanPodPolicy: apiv1.CleanPodPolicyNone,
285+
setConditionBefore: false,
286+
wantPodsPreserved: false,
287+
},
288+
"cleanPodPolicy All with JobFailed set before delete": {
289+
cleanPodPolicy: apiv1.CleanPodPolicyAll,
290+
setConditionBefore: true,
291+
wantPodsPreserved: false,
292+
},
293+
"cleanPodPolicy Running with JobFailed set before delete": {
294+
cleanPodPolicy: apiv1.CleanPodPolicyRunning,
295+
setConditionBefore: true,
296+
wantPodsPreserved: false,
297+
},
298+
}
299+
for name, tc := range cases {
300+
T.Run(name, func(t *testing.T) {
301+
masterPod := newPod("master-0", corev1.PodRunning)
302+
masterPod.Status.ContainerStatuses = []corev1.ContainerStatus{
303+
{RestartCount: 3},
304+
}
305+
workerPod := newPod("worker-0", corev1.PodRunning)
306+
pods := []runtime.Object{masterPod, workerPod}
307+
services := []runtime.Object{
308+
newService("master-0"),
309+
newService("worker-0"),
310+
}
311+
312+
fakeClient := fake.NewSimpleClientset(append(pods, services...)...)
313+
jobController := JobController{
314+
PodControl: control.RealPodControl{KubeClient: fakeClient, Recorder: &record.FakeRecorder{}},
315+
ServiceControl: control.RealServiceControl{KubeClient: fakeClient, Recorder: &record.FakeRecorder{}},
316+
}
317+
318+
now := metav1.Now()
319+
jobStatus := apiv1.JobStatus{
320+
CompletionTime: &now,
321+
}
322+
runPolicy := &apiv1.RunPolicy{
323+
CleanPodPolicy: &tc.cleanPodPolicy,
324+
}
325+
326+
if tc.setConditionBefore {
327+
jobStatus.Conditions = append(jobStatus.Conditions, apiv1.JobCondition{
328+
Type: apiv1.JobFailed,
329+
Status: corev1.ConditionTrue,
330+
Reason: "BackoffLimitExceeded",
331+
})
332+
}
333+
334+
var inPods []*corev1.Pod
335+
for i := range pods {
336+
inPods = append(inPods, pods[i].(*corev1.Pod))
337+
}
338+
339+
if err := jobController.DeletePodsAndServices(&testjobv1.TestJob{}, runPolicy, jobStatus, inPods); err != nil {
340+
t.Fatalf("DeletePodsAndServices failed: %v", err)
341+
}
342+
343+
gotPods, err := fakeClient.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
344+
if err != nil {
345+
t.Fatalf("Failed to list pods: %v", err)
346+
}
347+
348+
if tc.wantPodsPreserved {
349+
if len(gotPods.Items) != 2 {
350+
t.Errorf("Expected pods to be preserved, but got %d pods", len(gotPods.Items))
351+
}
352+
} else {
353+
if len(gotPods.Items) != 0 {
354+
t.Errorf("Expected pods to be deleted, but got %d pods", len(gotPods.Items))
355+
}
356+
}
357+
})
358+
}
359+
}
360+
264361
func newPod(name string, phase corev1.PodPhase) *corev1.Pod {
265362
pod := &corev1.Pod{
266363
ObjectMeta: metav1.ObjectMeta{

0 commit comments

Comments
 (0)