Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 77 additions & 41 deletions pkg/controller/admissionchecks/multikueue/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,39 +105,23 @@ func (g *wlGroup) IsElasticWorkload() bool {
return workloadslicing.IsElasticWorkload(g.local)
}

// FirstReserving returns true if there is a workload reserving quota,
// bestMatchByCondition returns true if there is a workload with a specified condition type,
// the string identifies the remote cluster.
func (g *wlGroup) FirstReserving() (bool, string) {
found := false
bestMatch := ""
var bestTime time.Time
func (g *wlGroup) bestMatchByCondition(conditionType string) (*metav1.Condition, string) {
var (
bestMatchCond *metav1.Condition
bestMatchRemote string
)
for remote, wl := range g.remotes {
if wl == nil {
continue
}
c := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadQuotaReserved)
if c != nil && c.Status == metav1.ConditionTrue && (!found || bestTime.IsZero() || c.LastTransitionTime.Time.Before(bestTime)) {
found = true
bestMatch = remote
bestTime = c.LastTransitionTime.Time
}
}
return found, bestMatch
}

func (g *wlGroup) RemoteFinishedCondition() (*metav1.Condition, string) {
var bestMatch *metav1.Condition
bestMatchRemote := ""
for remote, wl := range g.remotes {
if wl == nil {
continue
}
if c := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadFinished); c != nil && c.Status == metav1.ConditionTrue && (bestMatch == nil || c.LastTransitionTime.Before(&bestMatch.LastTransitionTime)) {
bestMatch = c
bestMatchRemote = remote
if wl != nil {
cond := apimeta.FindStatusCondition(wl.Status.Conditions, conditionType)
if cond != nil && cond.Status == metav1.ConditionTrue && (bestMatchCond == nil || cond.LastTransitionTime.Before(&bestMatchCond.LastTransitionTime)) {
bestMatchCond = cond
bestMatchRemote = remote
}
}
}
return bestMatch, bestMatchRemote
return bestMatchCond, bestMatchRemote
}

func (g *wlGroup) RemoveRemoteObjects(ctx context.Context, cluster string) error {
Expand Down Expand Up @@ -325,7 +309,7 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco

acs := admissioncheck.FindAdmissionCheck(group.local.Status.AdmissionChecks, group.acName)

// 0. Ignore Elastic workloads Finished when:
// 1. Ignore Elastic workloads Finished when:
// - Workload is "Finished" as a result workload slice replacement, OR
// - Workload doesn't have quota reservation as a result of scale-up, i.e., scaling-up in progress.
if group.IsElasticWorkload() &&
Expand All @@ -334,7 +318,7 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
return reconcile.Result{}, nil
}

// 1. delete all remote workloads when local workload is finished or has no quota reservation.
// 2. Delete all remote workloads when the local workload is finished or has no quota reservation.
if group.IsFinished() || !workload.HasQuotaReservation(group.local) {
var errs []error
for rem := range group.remotes {
Expand All @@ -346,9 +330,10 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
return reconcile.Result{}, errors.Join(errs...)
}

if remoteFinishedCond, remote := group.RemoteFinishedCondition(); remoteFinishedCond != nil {
// NOTE: we can have a race condition setting the wl status here and it being updated by the job controller
// it should not be problematic but the "From remote xxxx:" could be lost ....
// 3. Finish the local workload when the remote workload is finished.
if remoteFinishedCond, remote := group.bestMatchByCondition(kueue.WorkloadFinished); remoteFinishedCond != nil {
// NOTE: we can have a race condition setting the wl status here, and it being updated by the job controller,
// it should not be problematic, but the "From remote xxxx:" could be lost ....

if group.jobAdapter != nil {
if err := group.jobAdapter.SyncJob(ctx, w.client, group.remoteClients[remote].client, group.controllerKey, group.local.Name, w.origin); err != nil {
Expand All @@ -364,7 +349,7 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
return reconcile.Result{}, workload.Finish(ctx, w.client, group.local, remoteFinishedCond.Reason, remoteFinishedCond.Message, w.clock)
}

// 2. delete all workloads that are out of sync (other than scaled-down elastic workloads)
// 4. Delete all workloads that are out of sync (other than scaled-down elastic workloads)
// or are not in the chosen worker.
for rem, remWl := range group.remotes {
if remWl != nil && !equality.Semantic.DeepEqual(group.local.Spec, remWl.Spec) {
Expand All @@ -385,9 +370,38 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
}
}

// 3. get the first reserving
hasReserving, reservingRemote := group.FirstReserving()
if hasReserving {
// 5. Handle remote eviction
if remoteEvictCond, evictedRemote := group.bestMatchByCondition(kueue.WorkloadEvicted); remoteEvictCond != nil {
remoteCl := group.remoteClients[evictedRemote].client
remoteWl := group.remotes[evictedRemote]

log = log.WithValues("remote", evictedRemote, "remoteWorkload", klog.KObj(remoteWl))
ctrl.LoggerInto(ctx, log)

if !workload.IsEvicted(group.local) {
err := workload.Evict(ctx, w.client, w.recorder, group.local, remoteEvictCond.Reason, remoteEvictCond.Message, "", w.clock, workload.WithCustomPrepare(func(wl *kueue.Workload) {
if remotePreemptedCond := apimeta.FindStatusCondition(remoteWl.Status.Conditions, kueue.WorkloadPreempted); remotePreemptedCond != nil {
workload.SetPreemptedCondition(wl, w.clock.Now(), remotePreemptedCond.Reason, remotePreemptedCond.Message)
}
}))
if err != nil {
log.Error(err, "Failed to patch workload status", "workload", klog.KObj(remoteWl))
return reconcile.Result{}, err
}
}

if err := group.jobAdapter.SyncJob(ctx, w.client, remoteCl, group.controllerKey, group.local.Name, w.origin); err != nil {
log.Error(err, "Syncing remote controller object", "remote", klog.KObj(remoteWl))
// We'll retry this in the next reconciling.
return reconcile.Result{}, err
}

// Wait for QuotaReserved=false in local job.
return reconcile.Result{}, nil
}

// 6. Get the first reserving
if reservingCond, reservingRemote := group.bestMatchByCondition(kueue.WorkloadQuotaReserved); reservingCond != nil {
// remove the non-reserving worker workloads
for rem, remWl := range group.remotes {
if remWl != nil && rem != reservingRemote {
Expand All @@ -399,9 +413,31 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
}
}

if err := group.jobAdapter.SyncJob(ctx, w.client, group.remoteClients[reservingRemote].client, group.controllerKey, group.local.Name, w.origin); err != nil {
log.V(2).Error(err, "creating remote controller object", "remote", reservingRemote)
// We'll retry this in the next reconcile.
remoteCl := group.remoteClients[reservingRemote].client
remoteWl := group.remotes[reservingRemote]

log = log.WithValues("remote", reservingRemote, "remoteWorkload", klog.KObj(remoteWl))
ctrl.LoggerInto(ctx, log)

if workload.HasQuotaReservation(group.local) && workload.IsEvicted(group.local) {
err := workload.PatchAdmissionStatus(ctx, remoteCl, remoteWl, w.clock, func(remoteWl *kueue.Workload) (bool, error) {
if cond := apimeta.FindStatusCondition(group.local.Status.Conditions, kueue.WorkloadEvicted); cond != nil {
apimeta.SetStatusCondition(&remoteWl.Status.Conditions, *cond)
}
if cond := apimeta.FindStatusCondition(group.local.Status.Conditions, kueue.WorkloadPreempted); cond != nil {
apimeta.SetStatusCondition(&remoteWl.Status.Conditions, *cond)
}
return true, nil
})
if err != nil {
log.Error(err, "Failed to patch workload status")
}
return reconcile.Result{}, err
}

if err := group.jobAdapter.SyncJob(ctx, w.client, remoteCl, group.controllerKey, group.local.Name, w.origin); err != nil {
log.Error(err, "Syncing remote controller object")
// We'll retry this in the next reconciling.
return reconcile.Result{}, err
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ func (r *WorkloadReconciler) Update(e event.TypedUpdateEvent[*kueue.Workload]) b
log := r.log.WithValues("workload", klog.KObj(e.ObjectNew), "queue", e.ObjectNew.Spec.QueueName, "status", status)
ctx := ctrl.LoggerInto(context.Background(), log)
active := workload.IsActive(e.ObjectNew)
evictedOnMultiKueueWorker := workload.IsEvictedOnMultiKueueWorker(e.ObjectNew)

prevQueue := e.ObjectOld.Spec.QueueName
if prevQueue != e.ObjectNew.Spec.QueueName {
Expand All @@ -896,10 +897,14 @@ func (r *WorkloadReconciler) Update(e event.TypedUpdateEvent[*kueue.Workload]) b
workload.AdjustResources(ctrl.LoggerInto(ctx, log), r.client, wlCopy)

switch {
case status == workload.StatusFinished || !active:
case status == workload.StatusFinished || !active || evictedOnMultiKueueWorker:
if !active {
log.V(2).Info("Workload will not be queued because the workload is not active")
}
if evictedOnMultiKueueWorker {
log.V(2).Info("Workload will not be queued because the workload is evicted on multikueue worker")
}

// The workload could have been in the queues if we missed an event.
r.queues.DeleteWorkload(log, e.ObjectNew)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (b *multiKueueAdapter) SyncJob(ctx context.Context, localClient client.Clie

// if the remote exists, just copy the status
if err == nil {
if localAppWrapper.Spec.Suspend {
if fromObject(&localAppWrapper).IsSuspended() && !fromObject(&localAppWrapper).IsActive() {
// Ensure the appwrapper is unsuspended before updating its status; otherwise, it will fail when patching the spec.
log.V(2).Info("Skipping the sync since the local appwrapper is still suspended")
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/job/job_multikueue_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (b *multiKueueAdapter) SyncJob(ctx context.Context, localClient client.Clie
// the remote job exists
if err == nil {
if features.Enabled(features.MultiKueueBatchJobWithManagedBy) {
if fromObject(&localJob).IsSuspended() {
if fromObject(&localJob).IsSuspended() && !fromObject(&localJob).IsActive() {
// Ensure the job is unsuspended before updating its status; otherwise, it will fail when patching the spec.
log.V(2).Info("Skipping the sync since the local job is still suspended")
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/jobset/jobset_multikueue_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (b *multiKueueAdapter) SyncJob(ctx context.Context, localClient client.Clie

// if the remote exists, just copy the status
if err == nil {
if fromObject(&localJob).IsSuspended() {
if fromObject(&localJob).IsSuspended() && !fromObject(&localJob).IsActive() {
// Ensure the job is unsuspended before updating its status; otherwise, it will fail when patching the spec.
log.V(2).Info("Skipping the sync since the local job is still suspended")
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (a adapter[PtrT, T]) SyncJob(
}

if err == nil {
if a.fromObject(localJob).IsSuspended() {
if a.fromObject(localJob).IsSuspended() && !a.fromObject(localJob).IsActive() {
// Ensure the job is unsuspended before updating its status; otherwise, it will fail when patching the spec.
log.V(2).Info("Skipping the sync since the local job is still suspended")
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/mpijob/mpijob_multikueue_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (b *multiKueueAdapter) SyncJob(ctx context.Context, localClient client.Clie

// if the remote exists, just copy the status
if err == nil {
if fromObject(&localJob).IsSuspended() {
if fromObject(&localJob).IsSuspended() && !fromObject(&localJob).IsActive() {
// Ensure the job is unsuspended before updating its status; otherwise, it will fail when patching the spec.
log.V(2).Info("Skipping the sync since the local job is still suspended")
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (b *multiKueueAdapter) SyncJob(ctx context.Context, localClient client.Clie

// if the remote exists, just copy the status
if err == nil {
if fromObject(&localJob).IsSuspended() {
if fromObject(&localJob).IsSuspended() && !fromObject(&localJob).IsActive() {
// Ensure the job is unsuspended before updating its status; otherwise, it will fail when patching the spec.
log.V(2).Info("Skipping the sync since the local job is still suspended")
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/rayjob/rayjob_multikueue_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (b *multiKueueAdapter) SyncJob(ctx context.Context, localClient client.Clie

// if the remote exists, just copy the status
if err == nil {
if fromObject(&localJob).IsSuspended() {
if fromObject(&localJob).IsSuspended() && !fromObject(&localJob).IsActive() {
// Ensure the job is unsuspended before updating its status; otherwise, it will fail when patching the spec.
log.V(2).Info("Skipping the sync since the local job is still suspended")
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (b *multiKueueAdapter) SyncJob(ctx context.Context, localClient client.Clie

// if the remote exists, just copy the status
if err == nil {
if fromObject(&localJob).IsSuspended() {
if fromObject(&localJob).IsSuspended() && !fromObject(&localJob).IsActive() {
// Ensure the job is unsuspended before updating its status; otherwise, it will fail when patching the spec.
log.V(2).Info("Skipping the sync since the local job is still suspended")
return nil
Expand Down
12 changes: 12 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,18 @@ func IsEvicted(w *kueue.Workload) bool {
return apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadEvicted)
}

func IsMultiKueueWorkerWorkload(w *kueue.Workload) bool {
_, ok := w.Labels[kueue.MultiKueueOriginLabel]
return ok
}

func IsEvictedOnMultiKueueWorker(w *kueue.Workload) bool {
if !features.Enabled(features.MultiKueue) {
return false
}
return IsMultiKueueWorkerWorkload(w) && IsEvicted(w)
}

// HasConditionWithTypeAndReason checks if there is a condition in Workload's status
// with exactly the same Type, Status and Reason
func HasConditionWithTypeAndReason(w *kueue.Workload, cond *metav1.Condition) bool {
Expand Down
Loading