Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions pkg/controller/admissionchecks/multikueue/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,22 +349,53 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
return reconcile.Result{}, workload.Finish(ctx, w.client, group.local, remoteFinishedCond.Reason, remoteFinishedCond.Message, w.clock)
}

// 4. Handle workload evicted on manager cluster
// 4. Handle workload eviction
remoteEvictCond, evictedRemote := group.bestMatchByCondition(kueue.WorkloadEvicted)
if remoteEvictCond != nil && remoteEvictCond.Reason == workload.ReasonWithCause(kueue.WorkloadDeactivated, kueue.WorkloadEvictedOnManagerCluster) {
if remoteEvictCond != nil {
remoteCl := group.remoteClients[evictedRemote].client
remoteWl := group.remotes[evictedRemote]

log = log.WithValues("remote", evictedRemote, "remoteWorkload", klog.KObj(remoteWl))
ctx = ctrl.LoggerInto(ctx, log)

if err := group.jobAdapter.SyncJob(ctx, w.client, remoteCl, group.controllerKey, group.local.Name, w.origin); err != nil {
log.Error(err, "Syncing remote controller object")
// We'll retry this in the next reconciling.
// workload evicted on manager cluster
if remoteEvictCond.Reason == workload.ReasonWithCause(kueue.WorkloadDeactivated, kueue.WorkloadEvictedOnManagerCluster) {
if err := group.jobAdapter.SyncJob(ctx, w.client, remoteCl, group.controllerKey, group.local.Name, w.origin); err != nil {
log.Error(err, "Syncing remote controller object")
// We'll retry this in the next reconciling.
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

// workload eviction on worker cluster
log.V(5).Info("Workload gets evicted in the remote cluster", "cluster", evictedRemote)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this present tense feels slightly confusing; IIUC the workload already got evicted.

needsACUpdate := acs.State == kueue.CheckStateReady
if err := workload.PatchAdmissionStatus(ctx, w.client, group.local, w.clock, func(wl *kueue.Workload) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we do it when needsACUpdate is false?
(Currently, the update func does nothing in this case, but it still returns true - looks like we'd send an empty patch request to the apiserver?)

if needsACUpdate {
acs.State = kueue.CheckStatePending
acs.Message = fmt.Sprintf("Workload evicted on worker cluster: %q, resetting for re-admission", *group.local.Status.ClusterName)
acs.LastTransitionTime = metav1.NewTime(w.clock.Now())
workload.SetAdmissionCheckState(&wl.Status.AdmissionChecks, *acs, w.clock)
wl.Status.ClusterName = nil
wl.Status.NominatedClusterNames = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity - can this have an effect, given this rule?
(or is this intended to replace an empty list with nil in some edge cases?)

}
return true, nil
}); err != nil {
log.Error(err, "Failed to patch workload status")
return reconcile.Result{}, err
}

// Wait for QuotaReserved=false in the local job.
if needsACUpdate {
w.recorder.Eventf(group.local, corev1.EventTypeNormal, "MultiKueue", acs.Message)
}

for cluster := range group.remotes {
if err := client.IgnoreNotFound(group.RemoveRemoteObjects(ctx, cluster)); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most cases, other calls to RemoveRemoteObjects in this file are followed by group.remotes[cluster] = nil.
Should it be done also here?

log.Error(err, "Failed to remove cluster remote objects", "cluster", cluster)
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}

Expand Down
73 changes: 71 additions & 2 deletions pkg/controller/admissionchecks/multikueue/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func TestWlReconcile(t *testing.T) {
},
},
},
"remote wl evicted": {
"remote wl evicted due to eviction on manager cluster": {
features: map[featuregate.Feature]bool{features.MultiKueueBatchJobWithManagedBy: true},
reconcileFor: "wl1",
managersWorkloads: []kueue.Workload{
Expand Down Expand Up @@ -651,6 +651,75 @@ func TestWlReconcile(t *testing.T) {
*baseWorkloadBuilder.DeepCopy(),
},
},
"handle workload evicted on worker cluster": {
features: map[featuregate.Feature]bool{features.MultiKueueBatchJobWithManagedBy: true},
reconcileFor: "wl1",
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{
Name: "ac1",
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now).
ClusterName("worker1").
Obj(),
},
managersJobs: []batchv1.Job{
*baseJobManagedByKueueBuilder.Clone().Active(1).Obj(),
},
worker1Jobs: []batchv1.Job{
*baseJobBuilder.Clone().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(kueue.MultiKueueOriginLabel, defaultOrigin).
Active(1).
Obj(),
},
worker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
Label(kueue.MultiKueueOriginLabel, defaultOrigin).
ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now).
Condition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: "DeactivatedDueToEvictedOnWorkerCluster",
Message: "Evicted on worker: Evicted by test",
}).
Evicted().
Obj(),
},
useSecondWorker: true,
worker2Workloads: []kueue.Workload{
*baseWorkloadBuilder.DeepCopy(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{
Name: "ac1",
State: kueue.CheckStatePending,
Message: `Workload evicted on worker cluster: "worker1", resetting for re-admission`,
}).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now).
ClusterName("worker1").
Obj(),
},
wantManagersJobs: []batchv1.Job{
*baseJobManagedByKueueBuilder.Clone().Active(1).Obj(),
},
wantWorker1Workloads: []kueue.Workload{},
wantWorker1Jobs: []batchv1.Job{},
wantWorker2Workloads: []kueue.Workload{},
wantEvents: []utiltesting.EventRecord{
{
Key: client.ObjectKeyFromObject(baseWorkloadBuilder.Clone().Obj()),
EventType: "Normal",
Reason: "MultiKueue",
Message: `Workload evicted on worker cluster: "worker1", resetting for re-admission`,
},
},
},
"remote wl with reservation (withoutJobManagedBy)": {
features: map[featuregate.Feature]bool{features.MultiKueueBatchJobWithManagedBy: false},
reconcileFor: "wl1",
Expand Down Expand Up @@ -1699,7 +1768,7 @@ func TestWlReconcile(t *testing.T) {
// However, other important Status fields (e.g. Conditions) still reflect the change,
// so we deliberately ignore the Admission field here.
if features.Enabled(features.WorkloadRequestUseMergePatch) {
objCheckOpts = append(objCheckOpts, cmpopts.IgnoreFields(kueue.WorkloadStatus{}, "Admission"))
objCheckOpts = append(objCheckOpts, cmpopts.IgnoreFields(kueue.WorkloadStatus{}, "Admission", "ClusterName"))
}

gotManagersWorkloads := &kueue.WorkloadList{}
Expand Down
88 changes: 88 additions & 0 deletions test/e2e/multikueue/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -603,6 +604,93 @@ var _ = ginkgo.Describe("MultiKueue", func() {
})
})

ginkgo.It("Should re-do admission process when workload gets evicted in the worker", func() {
job := testingjob.MakeJob("job", managerNs.Name).
WorkloadPriorityClass(managerLowWPC.Name).
Queue(kueue.LocalQueueName(managerLq.Name)).
RequestAndLimit(corev1.ResourceCPU, "0.9").
RequestAndLimit(corev1.ResourceMemory, "0.5G").
Obj()
util.MustCreate(ctx, k8sManagerClient, job)

wlKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: managerNs.Name}
managerWl := &kueue.Workload{}
workerWorkload := &kueue.Workload{}

ginkgo.By("Checking that the workload is created and admitted in the manager cluster", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sManagerClient.Get(ctx, wlKey, managerWl)).To(gomega.Succeed())
g.Expect(workload.IsAdmitted(managerWl)).To(gomega.BeTrue())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

createdAtWorker := ""

ginkgo.By("Checking that the workload is created in one of the workers", func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below, you'll modify worker's CQ limits to manipulate what it can fit.
If so, how about using that trick once more, to get control over where the wl lands initially?

AFAICS the initial CPU quotas are: 2 at worker1 and 1 at worker2.
So if you set 1.5 for your workload, it'll certainly land on worker 1.
Then, you could swap the quotas (say, first set 2 at w2, then set 1 at w1) and verify if the workload moved to w2.

Compared to what you have now, +1 CQ update but -4 if blocks. (If I'm not mistaken).

gomega.Eventually(func(g gomega.Gomega) {
if err := k8sWorker1Client.Get(ctx, wlKey, workerWorkload); err == nil {
createdAtWorker = "worker1"
} else {
g.Expect(k8sWorker2Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
createdAtWorker = "worker2"
}
g.Expect(workload.IsAdmitted(workerWorkload)).To(gomega.BeTrue())
g.Expect(workerWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.GinkgoLogr.Info("Workload created at", "cluster", createdAtWorker)

ginkgo.By("Modifying worker cluster queue to not have enough resources", func() {
if createdAtWorker == "worker1" {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(worker1Cq), worker1Cq)).To(gomega.Succeed())
worker1Cq.Spec.ResourceGroups[0].Flavors[0].Resources[0].NominalQuota = resource.MustParse("0.5")
g.Expect(k8sWorker1Client.Update(ctx, worker1Cq)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
} else {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(worker2Cq), worker2Cq)).To(gomega.Succeed())
worker2Cq.Spec.ResourceGroups[0].Flavors[0].Resources[0].NominalQuota = resource.MustParse("0.5")
g.Expect(k8sWorker2Client.Update(ctx, worker2Cq)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
}
})

ginkgo.By("Triggering eviction in worker", func() {
if createdAtWorker == "worker1" {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker1Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
g.Expect(workload.SetConditionAndUpdate(ctx, k8sWorker1Client, workerWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", util.RealClock)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
} else {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker2Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
g.Expect(workload.SetConditionAndUpdate(ctx, k8sWorker2Client, workerWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", util.RealClock)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
}
})

ginkgo.By("Checking that the workload is re-admitted in the other worker cluster", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sManagerClient.Get(ctx, wlKey, managerWl)).To(gomega.Succeed())
g.Expect(managerWl.Status.ClusterName).NotTo(gomega.HaveValue(gomega.Equal(createdAtWorker)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also check that it's not empty.

}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Checking that the workload is created in the other worker", func() {
gomega.Eventually(func(g gomega.Gomega) {
if createdAtWorker == "worker1" {
g.Expect(k8sWorker2Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
} else {
g.Expect(k8sWorker1Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
}
g.Expect(workload.IsAdmitted(workerWorkload)).To(gomega.BeTrue())
g.Expect(workerWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})

ginkgo.It("Should preempt a running low-priority workload when a high-priority workload is admitted (other workers)", func() {
lowJob := testingjob.MakeJob("low-job", managerNs.Name).
WorkloadPriorityClass(managerLowWPC.Name).
Expand Down
86 changes: 86 additions & 0 deletions test/integration/multikueue/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob"
testingtrainjob "sigs.k8s.io/kueue/pkg/util/testingjobs/trainjob"
testingxgboostjob "sigs.k8s.io/kueue/pkg/util/testingjobs/xgboostjob"
"sigs.k8s.io/kueue/pkg/workload"
"sigs.k8s.io/kueue/pkg/workloadslicing"
"sigs.k8s.io/kueue/test/integration/framework"
"sigs.k8s.io/kueue/test/util"
Expand Down Expand Up @@ -1885,6 +1886,91 @@ var _ = ginkgo.Describe("MultiKueue", ginkgo.Label("area:multikueue", "feature:m
}, gomega.Equal(completedJobCondition))))
})
})
ginkgo.It("Should redo the admission process once the workload looses Admission in the worker cluster", func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ginkgo.It("Should redo the admission process once the workload looses Admission in the worker cluster", func() {
ginkgo.It("Should redo the admission process once the workload loses Admission in the worker cluster", func() {

job := testingjob.MakeJob("job", managerNs.Name).
ManagedBy(kueue.MultiKueueControllerName).
Queue(kueue.LocalQueueName(managerLq.Name)).
Obj()
util.MustCreate(managerTestCluster.ctx, managerTestCluster.client, job)

createdWorkload := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: managerNs.Name}

ginkgo.By("setting workload reservation in the management cluster", func() {
admission := utiltestingapi.MakeAdmission(managerCq.Name).Obj()
util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, wlLookupKey, admission)
})

ginkgo.By("checking the workload creation in the worker clusters", func() {
managerWl := &kueue.Workload{}
gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("setting workload reservation in worker1, AC state is updated in manager and worker2 wl is removed", func() {
admission := utiltestingapi.MakeAdmission(managerCq.Name).Obj()
util.SetQuotaReservation(worker1TestCluster.ctx, worker1TestCluster.client, wlLookupKey, admission)

gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
acs := admissioncheck.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, kueue.AdmissionCheckReference(multiKueueAC.Name))
g.Expect(acs).NotTo(gomega.BeNil())
g.Expect(acs.State).To(gomega.Equal(kueue.CheckStateReady))
g.Expect(acs.Message).To(gomega.Equal(`The workload got reservation on "worker1"`))
ok, err := utiltesting.HasEventAppeared(managerTestCluster.ctx, managerTestCluster.client, corev1.Event{
Reason: "MultiKueue",
Type: corev1.EventTypeNormal,
Message: `The workload got reservation on "worker1"`,
})
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(ok).To(gomega.BeTrue())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

gomega.Eventually(func(g gomega.Gomega) {
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("preempting workload in worker1", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdWorkload := &kueue.Workload{}
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(workload.SetConditionAndUpdate(worker1TestCluster.ctx, worker1TestCluster.client, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", util.RealClock)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("check manager's workload ClusterName reset", func() {
gomega.Eventually(func(g gomega.Gomega) {
managerWl := &kueue.Workload{}
g.Expect(managerTestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
g.Expect(managerTestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())

(or are there reasons to use the other context here?)

g.Expect(managerWl.Status.NominatedClusterNames).To(gomega.ContainElements(workerCluster1.Name, workerCluster2.Name))
g.Expect(managerWl.Status.ClusterName).To(gomega.BeNil())
g.Expect(managerWl.Status.AdmissionChecks).To(gomega.ContainElement(gomega.BeComparableTo(
kueue.AdmissionCheckState{
Name: kueue.AdmissionCheckReference(multiKueueAC.Name),
State: kueue.CheckStatePending,
},
cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime", "PodSetUpdates", "Message", "RetryCount"))))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("checking the workload admission process started again", func() {
managerWl := &kueue.Workload{}
gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
createdWorkload := &kueue.Workload{}
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})
})
})

func admitWorkloadAndCheckWorkerCopies(acName string, wlLookupKey types.NamespacedName, admission *utiltestingapi.AdmissionWrapper) {
Expand Down