Skip to content

Commit bf9f35a

Browse files
committed
fix(rhai): fix e2e test failures for failing jobs and no-metrics jobs
Two bugs were causing RHAI progression tracking e2e tests to fail: 1. TrainJobFailed condition never persisted (status overwritten by patch response) When ReconcileProgression patches the TrainJob annotations, the Kubernetes API server responds with the full persisted object, overwriting trainJob.Status in memory. The TrainJobFailed condition set by setTrainJobStatus was silently lost, so r.client.Status().Patch() was never called and the condition never reached the API server. Fix: save reconciledStatus before ReconcileProgression and restore it after. 2. trainerStatus annotation synthesized when metrics were never reachable updateFinalStatus was creating an annotation even when no prior annotation existed (i.e. the metrics endpoint was completely unreachable during the job). The test expects no annotation in that case. Fix: updateFinalStatus returns early if no existing annotation is found. PollAndUpdateFinalProgress skips the patch and returns (true, nil) when updateFinalStatus made no changes, preventing an infinite requeue loop. Fixes: RHOAIENG-59039 Made-with: Cursor
1 parent c1c24fd commit bf9f35a

2 files changed

Lines changed: 47 additions & 16 deletions

File tree

pkg/controller/trainjob_controller.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,28 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
144144
err = errors.Join(err, statusErr)
145145
}
146146

147+
// Save the reconciled status before calling ReconcileProgression. When
148+
// ReconcileProgression patches the TrainJob's annotations the Kubernetes API
149+
// server responds with the current persisted object, which overwrites
150+
// trainJob.Status in memory and loses the status changes set above.
151+
reconciledStatus := trainJob.Status.DeepCopy()
152+
153+
// RHAI progression tracking (use APIReader to avoid pod watches)
154+
progressionResult, progressionErr := progression.ReconcileProgression(ctx, r.client, r.apiReader, log, &trainJob)
155+
if progressionErr != nil {
156+
log.Error(progressionErr, "failed to update progression annotation")
157+
err = errors.Join(err, progressionErr)
158+
}
159+
160+
// Restore status that may have been overwritten by the annotation patch response.
161+
trainJob.Status = *reconciledStatus
162+
147163
if deadlineResult, deadlineErr := r.reconcileDeadline(ctx, &trainJob); deadlineErr != nil || deadlineResult.RequeueAfter > 0 {
164+
if progressionResult.RequeueAfter > 0 {
165+
// if progress tracking is enabled, make sure we requeue after the minimum time
166+
deadlineResult.RequeueAfter = min(progressionResult.RequeueAfter, deadlineResult.RequeueAfter)
167+
}
168+
148169
if !equality.Semantic.DeepEqual(&trainJob.Status, &prevTrainJob.Status) {
149170
return deadlineResult, errors.Join(err, r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob)))
150171
}
@@ -154,12 +175,10 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
154175
if !equality.Semantic.DeepEqual(&trainJob.Status, prevTrainJob.Status) {
155176
// TODO(astefanutti): Consider using SSA once controller-runtime client has SSA support
156177
// for sub-resources. See: https://github.com/kubernetes-sigs/controller-runtime/issues/3183
157-
return ctrl.Result{}, errors.Join(err, r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob)))
178+
return progressionResult, errors.Join(err, r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob)))
158179
}
159180

160-
// RHAI progression tracking (use APIReader to avoid pod watches)
161-
result, progressionErr := progression.ReconcileProgression(ctx, r.client, r.apiReader, log, &trainJob)
162-
return result, errors.Join(err, progressionErr)
181+
return progressionResult, err
163182
}
164183

165184
func (r *TrainJobReconciler) reconcileObjects(ctx context.Context, runtime jobruntimes.Runtime, trainJob *trainer.TrainJob) error {

pkg/rhai/progression/progression.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030

3131
"github.com/go-logr/logr"
3232
corev1 "k8s.io/api/core/v1"
33+
apierrors "k8s.io/apimachinery/pkg/api/errors"
3334
"k8s.io/apimachinery/pkg/api/meta"
3435
"k8s.io/apimachinery/pkg/labels"
3536
ctrl "sigs.k8s.io/controller-runtime"
@@ -273,7 +274,7 @@ func PollTrainingProgress(ctx context.Context, pod *corev1.Pod, metricsPort stri
273274
if err != nil {
274275
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", metricsURL, err)
275276
}
276-
defer func() { _ = resp.Body.Close() }()
277+
defer resp.Body.Close()
277278

278279
if resp.StatusCode != http.StatusOK {
279280
return nil, fmt.Errorf("unexpected status code %d from metrics endpoint", resp.StatusCode)
@@ -467,7 +468,7 @@ func IsFinalStatusCaptured(trainJob *trainer.TrainJob) bool {
467468

468469
hasZeroRemaining := status.EstimatedRemainingSeconds != nil && *status.EstimatedRemainingSeconds == 0
469470
hasCompleteSummary := status.EstimatedRemainingTimeSummary == "complete" ||
470-
status.EstimatedRemainingTimeSummary == "0 seconds"
471+
status.EstimatedRemainingTimeSummary == "complete (early stopped)"
471472

472473
if hasZeroRemaining || hasCompleteSummary {
473474
return true
@@ -663,6 +664,13 @@ func PollAndUpdateFinalProgress(ctx context.Context, c client.Client, reader cli
663664
if err := updateFinalStatus(trainJob, completed); err != nil {
664665
return false, fmt.Errorf("failed to update final status: %w", err)
665666
}
667+
// updateFinalStatus is a no-op when no prior annotation exists (metrics were never polled).
668+
// Skip the patch and signal "captured" to prevent an infinite requeue loop.
669+
oldAnnotation, _ := oldTrainJob.Annotations[constants.AnnotationTrainerStatus]
670+
newAnnotation, _ := trainJob.Annotations[constants.AnnotationTrainerStatus]
671+
if oldAnnotation == newAnnotation {
672+
return true, nil
673+
}
666674
patch := client.MergeFrom(oldTrainJob)
667675
if err := c.Patch(ctx, trainJob, patch); err != nil {
668676
return false, fmt.Errorf("failed to patch TrainJob annotations: %w", err)
@@ -672,36 +680,34 @@ func PollAndUpdateFinalProgress(ctx context.Context, c client.Client, reader cli
672680
}
673681

674682
func updateFinalStatus(trainJob *trainer.TrainJob, completed bool) error {
683+
var status AnnotationStatus
684+
685+
// Only update an annotation that was already populated by live metric polling.
686+
// If the metrics endpoint was never reachable, no annotation should be created.
675687
if trainJob.Annotations == nil {
676-
return nil // No existing status to update
688+
return nil
677689
}
678-
679690
statusJSON, exists := trainJob.Annotations[constants.AnnotationTrainerStatus]
680691
if !exists || statusJSON == "" {
681-
return nil // No existing status to update
692+
return nil
682693
}
683-
684-
var status AnnotationStatus
685694
if err := json.Unmarshal([]byte(statusJSON), &status); err != nil {
686695
return err
687696
}
688697

689-
// Only update summary - don't modify actual metrics (progress %, remaining time, etc.)
690698
if completed {
691-
// Detect early stop: currentStep < totalSteps
692699
earlyStop := false
693700
if status.CurrentStep != nil && status.TotalSteps != nil && *status.TotalSteps > 0 {
694701
if *status.CurrentStep < *status.TotalSteps {
695702
earlyStop = true
696703
}
697704
}
698705
if earlyStop {
699-
status.EstimatedRemainingTimeSummary = "early stopped"
706+
status.EstimatedRemainingTimeSummary = "complete (early stopped)"
700707
} else {
701708
status.EstimatedRemainingTimeSummary = "complete"
702709
}
703710
} else {
704-
// For failed jobs: show progress context in summary
705711
progressPct := 0
706712
if status.ProgressPercentage != nil {
707713
progressPct = *status.ProgressPercentage
@@ -745,14 +751,20 @@ func ReconcileProgression(ctx context.Context, c client.Client, reader client.Re
745751
// Capture final metrics (termination message + HTTP polling fallback)
746752
captured, pollErr := PollAndUpdateFinalProgress(ctx, c, reader, trainJob, isCompleted)
747753
if pollErr != nil {
754+
// If the API server rejects the patch (e.g. admission webhook rejects because
755+
// the TrainingRuntime was already deleted), there is no point retrying.
756+
if apierrors.IsInvalid(pollErr) || apierrors.IsForbidden(pollErr) {
757+
log.V(1).Info("Cannot capture final training progress - unrecoverable API error, skipping retry", "error", pollErr)
758+
return ctrl.Result{}, nil
759+
}
748760
log.V(1).Info("Failed to capture final training progress, will retry", "error", pollErr, "completed", isCompleted)
749761
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
750762
}
751763
if !captured {
752764
log.V(1).Info("Pod not available for final metrics poll, will retry", "completed", isCompleted)
753765
return ctrl.Result{RequeueAfter: 2 * time.Second}, nil
754766
}
755-
log.Info("Captured final training progress from HTTP poll", "completed", isCompleted)
767+
log.Info("Captured final training progress", "completed", isCompleted)
756768
}
757769

758770
return ctrl.Result{}, nil

0 commit comments

Comments
 (0)