Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions pkg/controller/trainjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,16 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
err = errors.Join(err, statusErr)
}

if deadlineResult, deadlineErr := r.reconcileDeadline(ctx, &trainJob); deadlineErr != nil || deadlineResult.RequeueAfter > 0 {
if !equality.Semantic.DeepEqual(&trainJob.Status, &prevTrainJob.Status) {
return deadlineResult, errors.Join(err, r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob)))
}
return deadlineResult, errors.Join(err, deadlineErr)
}
deadlineResult := r.reconcileDeadline(ctx, &trainJob)

if !equality.Semantic.DeepEqual(&trainJob.Status, prevTrainJob.Status) {
if !equality.Semantic.DeepEqual(trainJob.Status, prevTrainJob.Status) {
// TODO(astefanutti): Consider using SSA once controller-runtime client has SSA support
// for sub-resources. See: https://github.com/kubernetes-sigs/controller-runtime/issues/3183
return ctrl.Result{}, errors.Join(err, r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob)))
err = errors.Join(err, r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob)))
}

if deadlineResult.RequeueAfter > 0 {
return deadlineResult, err
}
return ctrl.Result{}, err
}
Expand All @@ -168,17 +167,17 @@ func (r *TrainJobReconciler) reconcileObjects(ctx context.Context, runtime jobru
return nil
}

func (r *TrainJobReconciler) reconcileDeadline(ctx context.Context, trainJob *trainer.TrainJob) (ctrl.Result, error) {
func (r *TrainJobReconciler) reconcileDeadline(ctx context.Context, trainJob *trainer.TrainJob) ctrl.Result {
if trainJob.Spec.ActiveDeadlineSeconds == 0 || trainjob.IsTrainJobFinished(trainJob) || ptr.Deref(trainJob.Spec.Suspend, false) {
return ctrl.Result{}, nil
return ctrl.Result{}
}
startTime := trainJob.CreationTimestamp.Time
suspendedCond := meta.FindStatusCondition(trainJob.Status.Conditions, trainer.TrainJobSuspended)
if suspendedCond != nil && suspendedCond.Status == metav1.ConditionFalse {
startTime = suspendedCond.LastTransitionTime.Time
}
if startTime.IsZero() {
return ctrl.Result{}, nil
return ctrl.Result{}
}
deadline := startTime.Add(time.Duration(trainJob.Spec.ActiveDeadlineSeconds) * time.Second)
now := time.Now()
Expand All @@ -194,7 +193,7 @@ func (r *TrainJobReconciler) reconcileDeadline(ctx context.Context, trainJob *tr
if err := client.IgnoreNotFound(r.client.Delete(ctx, jobSet)); err != nil {
ctrl.LoggerFrom(ctx).V(2).Info("Failed to delete JobSet after deadline exceeded", "error", err)
}
return ctrl.Result{}, nil
return ctrl.Result{}
}
requeueAfter := time.Until(deadline)
if requeueAfter <= 0 {
Expand All @@ -203,7 +202,7 @@ func (r *TrainJobReconciler) reconcileDeadline(ctx context.Context, trainJob *tr
ctrl.LoggerFrom(ctx).V(2).Info("Scheduling deadline check",
"activeDeadlineSeconds", trainJob.Spec.ActiveDeadlineSeconds,
"requeueAfter", requeueAfter)
return ctrl.Result{RequeueAfter: requeueAfter}, nil
return ctrl.Result{RequeueAfter: requeueAfter}
}

func (r *TrainJobReconciler) Create(e event.TypedCreateEvent[*trainer.TrainJob]) bool {
Expand Down
Loading