Skip to content

Commit 16cb9d9

Browse files
committed
Restart MK workload admission after remote workload eviction
1 parent 60df64b commit 16cb9d9

File tree

4 files changed

+282
-8
lines changed

4 files changed

+282
-8
lines changed

pkg/controller/admissionchecks/multikueue/workload.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -349,22 +349,53 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
349349
return reconcile.Result{}, workload.Finish(ctx, w.client, group.local, remoteFinishedCond.Reason, remoteFinishedCond.Message, w.clock)
350350
}
351351

352-
// 4. Handle workload evicted on manager cluster
352+
// 4. Handle workload eviction
353353
remoteEvictCond, evictedRemote := group.bestMatchByCondition(kueue.WorkloadEvicted)
354-
if remoteEvictCond != nil && remoteEvictCond.Reason == workload.ReasonWithCause(kueue.WorkloadDeactivated, kueue.WorkloadEvictedOnManagerCluster) {
354+
if remoteEvictCond != nil {
355355
remoteCl := group.remoteClients[evictedRemote].client
356356
remoteWl := group.remotes[evictedRemote]
357357

358358
log = log.WithValues("remote", evictedRemote, "remoteWorkload", klog.KObj(remoteWl))
359359
ctx = ctrl.LoggerInto(ctx, log)
360360

361-
if err := group.jobAdapter.SyncJob(ctx, w.client, remoteCl, group.controllerKey, group.local.Name, w.origin); err != nil {
362-
log.Error(err, "Syncing remote controller object")
363-
// We'll retry this in the next reconciling.
361+
// workload evicted on manager cluster
362+
if remoteEvictCond.Reason == workload.ReasonWithCause(kueue.WorkloadDeactivated, kueue.WorkloadEvictedOnManagerCluster) {
363+
if err := group.jobAdapter.SyncJob(ctx, w.client, remoteCl, group.controllerKey, group.local.Name, w.origin); err != nil {
364+
log.Error(err, "Syncing remote controller object")
365+
// We'll retry this in the next reconciling.
366+
return reconcile.Result{}, err
367+
}
368+
return reconcile.Result{}, nil
369+
}
370+
371+
// workload eviction on worker cluster
372+
log.V(5).Info("Workload gets evicted in the remote cluster", "cluster", evictedRemote)
373+
needsACUpdate := acs.State == kueue.CheckStateReady
374+
if err := workload.PatchAdmissionStatus(ctx, w.client, group.local, w.clock, func(wl *kueue.Workload) (bool, error) {
375+
if needsACUpdate {
376+
acs.State = kueue.CheckStatePending
377+
acs.Message = fmt.Sprintf("Workload evicted on worker cluster: %q, resetting for re-admission", *group.local.Status.ClusterName)
378+
acs.LastTransitionTime = metav1.NewTime(w.clock.Now())
379+
workload.SetAdmissionCheckState(&wl.Status.AdmissionChecks, *acs, w.clock)
380+
wl.Status.ClusterName = nil
381+
wl.Status.NominatedClusterNames = nil
382+
}
383+
return true, nil
384+
}); err != nil {
385+
log.Error(err, "Failed to patch workload status")
364386
return reconcile.Result{}, err
365387
}
366388

367-
// Wait for QuotaReserved=false in the local job.
389+
if needsACUpdate {
390+
w.recorder.Eventf(group.local, corev1.EventTypeNormal, "MultiKueue", acs.Message)
391+
}
392+
393+
for cluster := range group.remotes {
394+
if err := client.IgnoreNotFound(group.RemoveRemoteObjects(ctx, cluster)); err != nil {
395+
log.Error(err, "Failed to remove cluster remote objects", "cluster", cluster)
396+
return reconcile.Result{}, err
397+
}
398+
}
368399
return reconcile.Result{}, nil
369400
}
370401

pkg/controller/admissionchecks/multikueue/workload_test.go

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ func TestWlReconcile(t *testing.T) {
501501
},
502502
},
503503
},
504-
"remote wl evicted": {
504+
"remote wl evicted due to eviction on manager cluster": {
505505
features: map[featuregate.Feature]bool{features.MultiKueueBatchJobWithManagedBy: true},
506506
reconcileFor: "wl1",
507507
managersWorkloads: []kueue.Workload{
@@ -651,6 +651,75 @@ func TestWlReconcile(t *testing.T) {
651651
*baseWorkloadBuilder.DeepCopy(),
652652
},
653653
},
654+
"handle workload evicted on worker cluster": {
655+
features: map[featuregate.Feature]bool{features.MultiKueueBatchJobWithManagedBy: true},
656+
reconcileFor: "wl1",
657+
managersWorkloads: []kueue.Workload{
658+
*baseWorkloadBuilder.Clone().
659+
AdmissionCheck(kueue.AdmissionCheckState{
660+
Name: "ac1",
661+
State: kueue.CheckStateReady,
662+
Message: `The workload got reservation on "worker1"`,
663+
}).
664+
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
665+
ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now).
666+
ClusterName("worker1").
667+
Obj(),
668+
},
669+
managersJobs: []batchv1.Job{
670+
*baseJobManagedByKueueBuilder.Clone().Active(1).Obj(),
671+
},
672+
worker1Jobs: []batchv1.Job{
673+
*baseJobBuilder.Clone().
674+
Label(constants.PrebuiltWorkloadLabel, "wl1").
675+
Label(kueue.MultiKueueOriginLabel, defaultOrigin).
676+
Active(1).
677+
Obj(),
678+
},
679+
worker1Workloads: []kueue.Workload{
680+
*baseWorkloadBuilder.Clone().
681+
Label(kueue.MultiKueueOriginLabel, defaultOrigin).
682+
ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now).
683+
Condition(metav1.Condition{
684+
Type: kueue.WorkloadEvicted,
685+
Status: metav1.ConditionTrue,
686+
Reason: "DeactivatedDueToEvictedOnWorkerCluster",
687+
Message: "Evicted on worker: Evicted by test",
688+
}).
689+
Evicted().
690+
Obj(),
691+
},
692+
useSecondWorker: true,
693+
worker2Workloads: []kueue.Workload{
694+
*baseWorkloadBuilder.DeepCopy(),
695+
},
696+
wantManagersWorkloads: []kueue.Workload{
697+
*baseWorkloadBuilder.Clone().
698+
AdmissionCheck(kueue.AdmissionCheckState{
699+
Name: "ac1",
700+
State: kueue.CheckStatePending,
701+
Message: `Workload evicted on worker cluster: "worker1", resetting for re-admission`,
702+
}).
703+
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
704+
ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now).
705+
ClusterName("worker1").
706+
Obj(),
707+
},
708+
wantManagersJobs: []batchv1.Job{
709+
*baseJobManagedByKueueBuilder.Clone().Active(1).Obj(),
710+
},
711+
wantWorker1Workloads: []kueue.Workload{},
712+
wantWorker1Jobs: []batchv1.Job{},
713+
wantWorker2Workloads: []kueue.Workload{},
714+
wantEvents: []utiltesting.EventRecord{
715+
{
716+
Key: client.ObjectKeyFromObject(baseWorkloadBuilder.Clone().Obj()),
717+
EventType: "Normal",
718+
Reason: "MultiKueue",
719+
Message: `Workload evicted on worker cluster: "worker1", resetting for re-admission`,
720+
},
721+
},
722+
},
654723
"remote wl with reservation (withoutJobManagedBy)": {
655724
features: map[featuregate.Feature]bool{features.MultiKueueBatchJobWithManagedBy: false},
656725
reconcileFor: "wl1",
@@ -1699,7 +1768,7 @@ func TestWlReconcile(t *testing.T) {
16991768
// However, other important Status fields (e.g. Conditions) still reflect the change,
17001769
// so we deliberately ignore the Admission field here.
17011770
if features.Enabled(features.WorkloadRequestUseMergePatch) {
1702-
objCheckOpts = append(objCheckOpts, cmpopts.IgnoreFields(kueue.WorkloadStatus{}, "Admission"))
1771+
objCheckOpts = append(objCheckOpts, cmpopts.IgnoreFields(kueue.WorkloadStatus{}, "Admission", "ClusterName"))
17031772
}
17041773

17051774
gotManagersWorkloads := &kueue.WorkloadList{}

test/e2e/multikueue/e2e_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
batchv1 "k8s.io/api/batch/v1"
3535
corev1 "k8s.io/api/core/v1"
3636
apimeta "k8s.io/apimachinery/pkg/api/meta"
37+
"k8s.io/apimachinery/pkg/api/resource"
3738
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3839
"k8s.io/apimachinery/pkg/types"
3940
"k8s.io/utils/ptr"
@@ -603,6 +604,93 @@ var _ = ginkgo.Describe("MultiKueue", func() {
603604
})
604605
})
605606

607+
ginkgo.It("Should re-do admission process when workload gets evicted in the worker", func() {
608+
job := testingjob.MakeJob("job", managerNs.Name).
609+
WorkloadPriorityClass(managerLowWPC.Name).
610+
Queue(kueue.LocalQueueName(managerLq.Name)).
611+
RequestAndLimit(corev1.ResourceCPU, "0.9").
612+
RequestAndLimit(corev1.ResourceMemory, "0.5G").
613+
Obj()
614+
util.MustCreate(ctx, k8sManagerClient, job)
615+
616+
wlKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: managerNs.Name}
617+
managerWl := &kueue.Workload{}
618+
workerWorkload := &kueue.Workload{}
619+
620+
ginkgo.By("Checking that the workload is created and admitted in the manager cluster", func() {
621+
gomega.Eventually(func(g gomega.Gomega) {
622+
g.Expect(k8sManagerClient.Get(ctx, wlKey, managerWl)).To(gomega.Succeed())
623+
g.Expect(workload.IsAdmitted(managerWl)).To(gomega.BeTrue())
624+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
625+
})
626+
627+
createdAtWorker := ""
628+
629+
ginkgo.By("Checking that the workload is created in one of the workers", func() {
630+
gomega.Eventually(func(g gomega.Gomega) {
631+
if err := k8sWorker1Client.Get(ctx, wlKey, workerWorkload); err == nil {
632+
createdAtWorker = "worker1"
633+
} else {
634+
g.Expect(k8sWorker2Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
635+
createdAtWorker = "worker2"
636+
}
637+
g.Expect(workload.IsAdmitted(workerWorkload)).To(gomega.BeTrue())
638+
g.Expect(workerWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
639+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
640+
})
641+
642+
ginkgo.GinkgoLogr.Info("Workload created at", "cluster", createdAtWorker)
643+
644+
ginkgo.By("Modifying worker cluster queue to not have enough resources", func() {
645+
if createdAtWorker == "worker1" {
646+
gomega.Eventually(func(g gomega.Gomega) {
647+
g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(worker1Cq), worker1Cq)).To(gomega.Succeed())
648+
worker1Cq.Spec.ResourceGroups[0].Flavors[0].Resources[0].NominalQuota = resource.MustParse("0.5")
649+
g.Expect(k8sWorker1Client.Update(ctx, worker1Cq)).To(gomega.Succeed())
650+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
651+
} else {
652+
gomega.Eventually(func(g gomega.Gomega) {
653+
g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(worker2Cq), worker2Cq)).To(gomega.Succeed())
654+
worker2Cq.Spec.ResourceGroups[0].Flavors[0].Resources[0].NominalQuota = resource.MustParse("0.5")
655+
g.Expect(k8sWorker2Client.Update(ctx, worker2Cq)).To(gomega.Succeed())
656+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
657+
}
658+
})
659+
660+
ginkgo.By("Triggering eviction in worker", func() {
661+
if createdAtWorker == "worker1" {
662+
gomega.Eventually(func(g gomega.Gomega) {
663+
g.Expect(k8sWorker1Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
664+
g.Expect(workload.SetConditionAndUpdate(ctx, k8sWorker1Client, workerWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", util.RealClock)).To(gomega.Succeed())
665+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
666+
} else {
667+
gomega.Eventually(func(g gomega.Gomega) {
668+
g.Expect(k8sWorker2Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
669+
g.Expect(workload.SetConditionAndUpdate(ctx, k8sWorker2Client, workerWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", util.RealClock)).To(gomega.Succeed())
670+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
671+
}
672+
})
673+
674+
ginkgo.By("Checking that the workload is re-admitted in the other worker cluster", func() {
675+
gomega.Eventually(func(g gomega.Gomega) {
676+
g.Expect(k8sManagerClient.Get(ctx, wlKey, managerWl)).To(gomega.Succeed())
677+
g.Expect(managerWl.Status.ClusterName).NotTo(gomega.HaveValue(gomega.Equal(createdAtWorker)))
678+
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
679+
})
680+
681+
ginkgo.By("Checking that the workload is created in the other worker", func() {
682+
gomega.Eventually(func(g gomega.Gomega) {
683+
if createdAtWorker == "worker1" {
684+
g.Expect(k8sWorker2Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
685+
} else {
686+
g.Expect(k8sWorker1Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
687+
}
688+
g.Expect(workload.IsAdmitted(workerWorkload)).To(gomega.BeTrue())
689+
g.Expect(workerWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
690+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
691+
})
692+
})
693+
606694
ginkgo.It("Should preempt a running low-priority workload when a high-priority workload is admitted (other workers)", func() {
607695
lowJob := testingjob.MakeJob("low-job", managerNs.Name).
608696
WorkloadPriorityClass(managerLowWPC.Name).

test/integration/multikueue/jobs_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ import (
7171
testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob"
7272
testingtrainjob "sigs.k8s.io/kueue/pkg/util/testingjobs/trainjob"
7373
testingxgboostjob "sigs.k8s.io/kueue/pkg/util/testingjobs/xgboostjob"
74+
"sigs.k8s.io/kueue/pkg/workload"
7475
"sigs.k8s.io/kueue/pkg/workloadslicing"
7576
"sigs.k8s.io/kueue/test/integration/framework"
7677
"sigs.k8s.io/kueue/test/util"
@@ -1885,6 +1886,91 @@ var _ = ginkgo.Describe("MultiKueue", ginkgo.Label("area:multikueue", "feature:m
18851886
}, gomega.Equal(completedJobCondition))))
18861887
})
18871888
})
1889+
ginkgo.It("Should redo the admission process once the workload looses Admission in the worker cluster", func() {
1890+
job := testingjob.MakeJob("job", managerNs.Name).
1891+
ManagedBy(kueue.MultiKueueControllerName).
1892+
Queue(kueue.LocalQueueName(managerLq.Name)).
1893+
Obj()
1894+
util.MustCreate(managerTestCluster.ctx, managerTestCluster.client, job)
1895+
1896+
createdWorkload := &kueue.Workload{}
1897+
wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: managerNs.Name}
1898+
1899+
ginkgo.By("setting workload reservation in the management cluster", func() {
1900+
admission := utiltestingapi.MakeAdmission(managerCq.Name).Obj()
1901+
util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, wlLookupKey, admission)
1902+
})
1903+
1904+
ginkgo.By("checking the workload creation in the worker clusters", func() {
1905+
managerWl := &kueue.Workload{}
1906+
gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())
1907+
gomega.Eventually(func(g gomega.Gomega) {
1908+
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
1909+
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
1910+
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
1911+
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
1912+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
1913+
})
1914+
1915+
ginkgo.By("setting workload reservation in worker1, AC state is updated in manager and worker2 wl is removed", func() {
1916+
admission := utiltestingapi.MakeAdmission(managerCq.Name).Obj()
1917+
util.SetQuotaReservation(worker1TestCluster.ctx, worker1TestCluster.client, wlLookupKey, admission)
1918+
1919+
gomega.Eventually(func(g gomega.Gomega) {
1920+
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
1921+
acs := admissioncheck.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, kueue.AdmissionCheckReference(multiKueueAC.Name))
1922+
g.Expect(acs).NotTo(gomega.BeNil())
1923+
g.Expect(acs.State).To(gomega.Equal(kueue.CheckStateReady))
1924+
g.Expect(acs.Message).To(gomega.Equal(`The workload got reservation on "worker1"`))
1925+
ok, err := utiltesting.HasEventAppeared(managerTestCluster.ctx, managerTestCluster.client, corev1.Event{
1926+
Reason: "MultiKueue",
1927+
Type: corev1.EventTypeNormal,
1928+
Message: `The workload got reservation on "worker1"`,
1929+
})
1930+
g.Expect(err).NotTo(gomega.HaveOccurred())
1931+
g.Expect(ok).To(gomega.BeTrue())
1932+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
1933+
1934+
gomega.Eventually(func(g gomega.Gomega) {
1935+
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError())
1936+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
1937+
})
1938+
1939+
ginkgo.By("preempting workload in worker1", func() {
1940+
gomega.Eventually(func(g gomega.Gomega) {
1941+
createdWorkload := &kueue.Workload{}
1942+
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
1943+
g.Expect(workload.SetConditionAndUpdate(worker1TestCluster.ctx, worker1TestCluster.client, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", util.RealClock)).To(gomega.Succeed())
1944+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
1945+
})
1946+
1947+
ginkgo.By("check manager's workload ClusterName reset", func() {
1948+
gomega.Eventually(func(g gomega.Gomega) {
1949+
managerWl := &kueue.Workload{}
1950+
g.Expect(managerTestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())
1951+
g.Expect(managerWl.Status.NominatedClusterNames).To(gomega.ContainElements(workerCluster1.Name, workerCluster2.Name))
1952+
g.Expect(managerWl.Status.ClusterName).To(gomega.BeNil())
1953+
g.Expect(managerWl.Status.AdmissionChecks).To(gomega.ContainElement(gomega.BeComparableTo(
1954+
kueue.AdmissionCheckState{
1955+
Name: kueue.AdmissionCheckReference(multiKueueAC.Name),
1956+
State: kueue.CheckStatePending,
1957+
},
1958+
cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime", "PodSetUpdates", "Message", "RetryCount"))))
1959+
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
1960+
})
1961+
1962+
ginkgo.By("checking the workload admission process started again", func() {
1963+
managerWl := &kueue.Workload{}
1964+
gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())
1965+
gomega.Eventually(func(g gomega.Gomega) {
1966+
createdWorkload := &kueue.Workload{}
1967+
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
1968+
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
1969+
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
1970+
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
1971+
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
1972+
})
1973+
})
18881974
})
18891975

18901976
func admitWorkloadAndCheckWorkerCopies(acName string, wlLookupKey types.NamespacedName, admission *utiltestingapi.AdmissionWrapper) {

0 commit comments

Comments
 (0)