Skip to content

Commit 58769c5

Browse files
committed
fix(dynamo): use Job conditions for failure/completion detection
Check authoritative Job conditions (JobComplete, JobFailed) before falling back to counter-based detection. This correctly handles activeDeadlineSeconds and podFailurePolicy failures where the Job gets a Failed condition but Status.Failed may not exceed backoffLimit. Also change fallback comparison from > to >= for robustness when conditions haven't been set yet. Signed-off-by: Suraj Deshmukh <suraj.deshmukh@microsoft.com>
1 parent a2e3d25 commit 58769c5

2 files changed

Lines changed: 125 additions & 3 deletions

File tree

providers/dynamo/download.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,17 +153,33 @@ func EnsureDownloadJob(ctx context.Context, c client.Client, md *kubeairunwayv1a
153153
return false, nil // requeue → next reconcile creates fresh Job
154154
}
155155

156-
// Job exists, check status
156+
// Job exists — check conditions (authoritative) then counters (fallback).
157+
for _, cond := range existing.Status.Conditions {
158+
if cond.Status != corev1.ConditionTrue {
159+
continue
160+
}
161+
switch cond.Type {
162+
case batchv1.JobComplete:
163+
logger.Info("Model download Job completed", "name", jobName)
164+
return true, nil
165+
case batchv1.JobFailed:
166+
return false, fmt.Errorf("model download Job %s failed permanently: %s",
167+
jobName, cond.Message)
168+
}
169+
}
170+
171+
// Fallback: counter-based detection for older clusters or edge cases
172+
// where conditions haven't been set yet.
157173
if existing.Status.Succeeded >= 1 {
158-
logger.Info("Model download Job completed", "name", jobName)
174+
logger.Info("Model download Job completed (counter)", "name", jobName)
159175
return true, nil
160176
}
161177

162178
backoffLimit := defaultBackoffLimit
163179
if existing.Spec.BackoffLimit != nil {
164180
backoffLimit = *existing.Spec.BackoffLimit
165181
}
166-
if existing.Status.Failed > backoffLimit {
182+
if existing.Status.Failed >= backoffLimit {
167183
return false, fmt.Errorf("model download Job %s failed permanently (failed=%d, backoffLimit=%d)",
168184
jobName, existing.Status.Failed, backoffLimit)
169185
}

providers/dynamo/download_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,13 @@ func TestEnsureDownloadJobCompleted(t *testing.T) {
321321
},
322322
Status: batchv1.JobStatus{
323323
Succeeded: 1,
324+
Conditions: []batchv1.JobCondition{
325+
{
326+
Type: batchv1.JobComplete,
327+
Status: corev1.ConditionTrue,
328+
Message: "Job completed successfully",
329+
},
330+
},
324331
},
325332
}
326333

@@ -402,6 +409,13 @@ func TestEnsureDownloadJobFailed(t *testing.T) {
402409
},
403410
Status: batchv1.JobStatus{
404411
Failed: 4, // exceeds backoffLimit of 3
412+
Conditions: []batchv1.JobCondition{
413+
{
414+
Type: batchv1.JobFailed,
415+
Status: corev1.ConditionTrue,
416+
Message: "Job has reached the specified backoff limit",
417+
},
418+
},
405419
},
406420
}
407421

@@ -416,6 +430,98 @@ func TestEnsureDownloadJobFailed(t *testing.T) {
416430
}
417431
}
418432

433+
func TestEnsureDownloadJobFailedByConditionOnly(t *testing.T) {
434+
scheme := newScheme()
435+
_ = batchv1.AddToScheme(scheme)
436+
437+
md := newDownloadMD("my-model", "default")
438+
439+
// Job failed via activeDeadlineSeconds: JobFailed condition is set,
440+
// but Failed count is below the backoff limit.
441+
backoffLimit := int32(3)
442+
existingJob := &batchv1.Job{
443+
ObjectMeta: metav1.ObjectMeta{
444+
Name: "my-model-model-download",
445+
Namespace: "default",
446+
OwnerReferences: []metav1.OwnerReference{
447+
{
448+
APIVersion: "kubeairunway.ai/v1alpha1",
449+
Kind: "ModelDeployment",
450+
Name: "my-model",
451+
UID: "test-uid",
452+
},
453+
},
454+
},
455+
Spec: batchv1.JobSpec{
456+
BackoffLimit: &backoffLimit,
457+
},
458+
Status: batchv1.JobStatus{
459+
Failed: 1, // below backoffLimit
460+
Conditions: []batchv1.JobCondition{
461+
{
462+
Type: batchv1.JobFailed,
463+
Status: corev1.ConditionTrue,
464+
Message: "Job was active longer than specified deadline",
465+
},
466+
},
467+
},
468+
}
469+
470+
c := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existingJob).WithStatusSubresource(existingJob).Build()
471+
472+
_, err := EnsureDownloadJob(context.Background(), c, md, DefaultDownloadJobImage)
473+
if err == nil {
474+
t.Fatal("expected error for Job failed by condition (activeDeadlineSeconds)")
475+
}
476+
if !strings.Contains(err.Error(), "failed permanently") {
477+
t.Errorf("expected permanent failure error, got: %v", err)
478+
}
479+
if !strings.Contains(err.Error(), "active longer than specified deadline") {
480+
t.Errorf("expected condition message in error, got: %v", err)
481+
}
482+
}
483+
484+
func TestEnsureDownloadJobFailedAtBackoffLimit(t *testing.T) {
485+
scheme := newScheme()
486+
_ = batchv1.AddToScheme(scheme)
487+
488+
md := newDownloadMD("my-model", "default")
489+
490+
// Job with Failed == BackoffLimit but no condition set yet.
491+
// The >= fallback should catch this.
492+
backoffLimit := int32(3)
493+
existingJob := &batchv1.Job{
494+
ObjectMeta: metav1.ObjectMeta{
495+
Name: "my-model-model-download",
496+
Namespace: "default",
497+
OwnerReferences: []metav1.OwnerReference{
498+
{
499+
APIVersion: "kubeairunway.ai/v1alpha1",
500+
Kind: "ModelDeployment",
501+
Name: "my-model",
502+
UID: "test-uid",
503+
},
504+
},
505+
},
506+
Spec: batchv1.JobSpec{
507+
BackoffLimit: &backoffLimit,
508+
},
509+
Status: batchv1.JobStatus{
510+
Failed: 3, // exactly at backoffLimit, no conditions
511+
},
512+
}
513+
514+
c := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existingJob).WithStatusSubresource(existingJob).Build()
515+
516+
_, err := EnsureDownloadJob(context.Background(), c, md, DefaultDownloadJobImage)
517+
if err == nil {
518+
t.Fatal("expected error when Failed == BackoffLimit (fallback detection)")
519+
}
520+
if !strings.Contains(err.Error(), "failed permanently") {
521+
t.Errorf("expected permanent failure error, got: %v", err)
522+
}
523+
}
524+
419525
func TestDeleteManagedJobs(t *testing.T) {
420526
scheme := newScheme()
421527
_ = batchv1.AddToScheme(scheme)

0 commit comments

Comments
 (0)