Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
52a3e03
bugfix and add unit test
andrewseif Jan 8, 2026
960d7d3
add e2e testing
andrewseif Jan 8, 2026
a2a18f9
update e2e testing
andrewseif Jan 8, 2026
4c6a28a
update function name
andrewseif Jan 9, 2026
dd27de7
fix function name in testing
andrewseif Jan 9, 2026
076aa7a
revert to old logic but removing empty string for old
andrewseif Jan 9, 2026
ca335de
add validation for old wl
andrewseif Jan 10, 2026
79d6a4b
fix linter issue
andrewseif Jan 10, 2026
116e539
Update pkg/controller/core/workload_controller.go
andrewseif Jan 10, 2026
03674a6
remove redundant validation
andrewseif Jan 10, 2026
1e72145
Update pkg/workload/workload.go
andrewseif Jan 10, 2026
efdabfb
clean tests
andrewseif Jan 10, 2026
023b1e7
update reconciler UpdateWorkloadPriority method
andrewseif Jan 10, 2026
ea8d6c7
update methods to be more expressive
andrewseif Jan 10, 2026
399845c
revert comment for simplicity
andrewseif Jan 10, 2026
6abdbb3
update workload to use IsPodPriorityClass, to avoid complexity
andrewseif Jan 10, 2026
57b8205
update IsPodPriorityClass function
andrewseif Jan 10, 2026
1e78b9e
Update pkg/workload/workload.go
andrewseif Jan 10, 2026
9896790
Update test/integration/singlecluster/controller/jobs/job/job_control…
andrewseif Jan 10, 2026
fe2ba6f
Update test/integration/singlecluster/controller/jobs/job/job_control…
andrewseif Jan 10, 2026
d0098d8
add value based tests
andrewseif Jan 10, 2026
c29aa7d
Merge remote-tracking branch 'origin/add-wpc-to-suspended-wl' into ad…
andrewseif Jan 10, 2026
543282c
Update pkg/controller/core/workload_controller_test.go
andrewseif Jan 10, 2026
e49c14d
Update pkg/controller/core/workload_controller.go
andrewseif Jan 10, 2026
d7d305b
add updated tests and fix log
andrewseif Jan 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,11 +992,11 @@ func (r *WorkloadReconciler) Update(e event.TypedUpdateEvent[*kueue.Workload]) b

func workloadPriorityChanged(old, new *kueue.Workload) bool {
// Updates to Pod Priority are not supported.
if !workload.IsWorkloadPriorityClass(old) || !workload.IsWorkloadPriorityClass(new) {
if workload.IsPodPriorityClass(old) || !workload.IsWorkloadPriorityClass(new) {
return false
}
// Check if priority class reference changed.
if workload.PriorityClassName(old) != "" && workload.PriorityClassName(new) != "" &&
if workload.PriorityClassName(new) != "" &&
workload.PriorityClassName(old) != workload.PriorityClassName(new) {
return true
}
Expand Down
103 changes: 103 additions & 0 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2734,3 +2734,106 @@ func TestReconcile(t *testing.T) {
}
}
}

// TestWorkloadPriorityClassChanged tests the workloadPriorityClassChanged function.
func TestWorkloadPriorityClassChanged(t *testing.T) {
testCases := map[string]struct {
oldWorkload *kueue.Workload
newWorkload *kueue.Workload
wantChanged bool
}{
"no priority class on either workload": {
oldWorkload: utiltestingapi.MakeWorkload("wl", "ns").Obj(),
newWorkload: utiltestingapi.MakeWorkload("wl", "ns").Obj(),
wantChanged: false,
},
"same priority class on both workloads": {
oldWorkload: utiltestingapi.MakeWorkload("wl", "ns").
WorkloadPriorityClassRef("priority-1").
Obj(),
newWorkload: utiltestingapi.MakeWorkload("wl", "ns").
WorkloadPriorityClassRef("priority-1").
Obj(),
wantChanged: false,
},
"priority class changed from one to another": {
oldWorkload: utiltestingapi.MakeWorkload("wl", "ns").
WorkloadPriorityClassRef("priority-1").
Obj(),
newWorkload: utiltestingapi.MakeWorkload("wl", "ns").
WorkloadPriorityClassRef("priority-2").
Obj(),
wantChanged: true,
},
"priority class added (none -> some)": {
oldWorkload: utiltestingapi.MakeWorkload("wl", "ns").Obj(),
newWorkload: utiltestingapi.MakeWorkload("wl", "ns").
WorkloadPriorityClassRef("priority-1").
Obj(),
wantChanged: true,
},
"priority class removed (some -> none) - blocked by validation": {
oldWorkload: utiltestingapi.MakeWorkload("wl", "ns").
WorkloadPriorityClassRef("priority-1").
Obj(),
newWorkload: utiltestingapi.MakeWorkload("wl", "ns").Obj(),
wantChanged: false,
},
"PodPriorityClass (not WorkloadPriorityClass) changed - should not trigger": {
oldWorkload: utiltestingapi.MakeWorkload("wl", "ns").
PodPriorityClassRef("pod-priority-1").
Obj(),
newWorkload: utiltestingapi.MakeWorkload("wl", "ns").
PodPriorityClassRef("pod-priority-2").
Obj(),
wantChanged: false,
},
"PodPriorityClass added (none -> some) - should not trigger": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also be great to add a test case where we change the priority value. It should work only for WorkloadPriorityClass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this got me thinking, is there a case where Kind changed from PodPriority to WorkloadPriority but uses same name? do we need to cover that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it would be great to cover this case as well.

oldWorkload: utiltestingapi.MakeWorkload("wl", "ns").Obj(),
newWorkload: utiltestingapi.MakeWorkload("wl", "ns").
PodPriorityClassRef("pod-priority-1").
Obj(),
wantChanged: false,
},
"priority value decreased": {
oldWorkload: utiltestingapi.MakeWorkload("wl", "ns").
Priority(500).
Obj(),
newWorkload: utiltestingapi.MakeWorkload("wl", "ns").
Priority(600).
Obj(),
wantChanged: false,
},
"priority value decreased with WPC": {
oldWorkload: utiltestingapi.MakeWorkload("wl", "ns").
WorkloadPriorityClassRef("priority-1").
Priority(500).
Obj(),
newWorkload: utiltestingapi.MakeWorkload("wl", "ns").
WorkloadPriorityClassRef("priority-1").
Priority(100).
Obj(),
wantChanged: true,
},
"priority value decreased with PPC": {
oldWorkload: utiltestingapi.MakeWorkload("wl", "ns").
PodPriorityClassRef("pod-priority").
Priority(500).
Obj(),
newWorkload: utiltestingapi.MakeWorkload("wl", "ns").
PodPriorityClassRef("pod-priority").
Priority(100).
Obj(),
wantChanged: false,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
gotChanged := workloadPriorityChanged(tc.oldWorkload, tc.newWorkload)
if gotChanged != tc.wantChanged {
t.Errorf("workloadPriorityChanged() = %v, want %v", gotChanged, tc.wantChanged)
}
})
}
}
7 changes: 5 additions & 2 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,9 +965,12 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
return match, nil
}

// UpdateWorkloadPriority updates workload priority if object's kueue.x-k8s.io/priority-class label changed.
func UpdateWorkloadPriority(ctx context.Context, c client.Client, r record.EventRecorder, obj client.Object, wl *kueue.Workload, customPriorityClassFunc func() string) error {
if workload.IsWorkloadPriorityClass(wl) && WorkloadPriorityClassName(obj) != workload.PriorityClassName(wl) {
jobPriorityClassName := WorkloadPriorityClassName(obj)
wlPriorityClassName := workload.PriorityClassName(wl)

// This handles both: changing priority (old -> new) AND adding priority (none -> new)
if (workload.HasNoPriority(wl) || workload.IsWorkloadPriorityClass(wl)) && jobPriorityClassName != wlPriorityClassName {
if err := PrepareWorkloadPriority(ctx, c, obj, wl, customPriorityClassFunc); err != nil {
return fmt.Errorf("prepare workload priority: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/jobframework/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,13 @@ func validatedUpdateForEnabledWorkloadSlice(oldJob, newJob GenericJob) field.Err
}

func ValidateUpdateForWorkloadPriorityClassName(isSuspended bool, oldObj, newObj client.Object) field.ErrorList {
// Cannot ADD a priority class to a NON-suspended (running) workload && wpc is empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to support adding a priority class, I think we should allow it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as I understand the issue mentioned suspended workloads, this piece of code is only affecting running on.

If we change this if condition, it means we allow priority change to running workloads (if I get the code right?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, you right, we want to fix only suspended workloads.

if !isSuspended && IsWorkloadPriorityClassNameEmpty(oldObj) {
if !IsWorkloadPriorityClassNameEmpty(newObj) {
return field.ErrorList{field.Invalid(workloadPriorityClassNamePath, WorkloadPriorityClassName(newObj), "WorkloadPriorityClass cannot be added to a non-suspended workload")}
}
}
// Cannot REMOVE a priority class from a workload (regardless of suspended/running)
if IsWorkloadPriorityClassNameEmpty(newObj) {
if !IsWorkloadPriorityClassNameEmpty(oldObj) {
return field.ErrorList{field.Invalid(workloadPriorityClassNamePath, WorkloadPriorityClassName(newObj), "WorkloadPriorityClass cannot be removed from a workload")}
Expand Down
10 changes: 10 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,16 @@ func IsWorkloadPriorityClass(wl *kueue.Workload) bool {
wl.Spec.PriorityClassRef.Group == kueue.WorkloadPriorityClassGroup
}

func IsPodPriorityClass(wl *kueue.Workload) bool {
return wl.Spec.PriorityClassRef != nil &&
wl.Spec.PriorityClassRef.Kind == kueue.PodPriorityClassKind &&
wl.Spec.PriorityClassRef.Group == kueue.PodPriorityClassGroup
}

func HasNoPriority(wl *kueue.Workload) bool {
return wl.Spec.PriorityClassRef == nil
}

func prepareForEviction(w *kueue.Workload, now time.Time, reason, message string) {
SetEvictedCondition(w, now, reason, message)
resetClusterNomination(w)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1545,6 +1545,64 @@ var _ = ginkgo.Describe("Interacting with scheduler", ginkgo.Ordered, ginkgo.Con
}, util.ConsistentDuration, util.ShortInterval).Should(gomega.Succeed())
})
})

ginkgo.It("should trigger preemption when WorkloadPriorityClass is added to a pending workload that had no priority class", framework.SlowSpec, func() {
ginkgo.By("Creating a low-priority job that gets admitted on spot-untainted flavor")
jobLow := testingjob.MakeJob(jobName+"-low", ns.Name).
WorkloadPriorityClass(lowWorkloadPriorityClass.Name).
Queue(kueue.LocalQueueName(devLocalQ.Name)).
Request(corev1.ResourceCPU, "5").
NodeSelector(instanceKey, "spot-untainted").
Obj()
util.MustCreate(ctx, k8sClient, jobLow)

lowWlKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(jobLow.Name, jobLow.UID), Namespace: ns.Name}

ginkgo.By("Verifying the low-priority workload is admitted")
util.ExpectWorkloadsToHaveQuotaReservationByKey(ctx, k8sClient, devClusterQ.Name, lowWlKey)

ginkgo.By("Creating a job WITHOUT any WorkloadPriorityClass on the SAME flavor (will be pending)")
jobNoPriority := testingjob.MakeJob(jobName+"-no-priority", ns.Name).
Queue(kueue.LocalQueueName(devLocalQ.Name)).
Request(corev1.ResourceCPU, "5").
NodeSelector(instanceKey, "spot-untainted").
Obj()
util.MustCreate(ctx, k8sClient, jobNoPriority)

noPriorityWlKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(jobNoPriority.Name, jobNoPriority.UID), Namespace: ns.Name}

ginkgo.By("Verifying the no-priority workload is pending (not admitted)")
gomega.Eventually(func(g gomega.Gomega) {
wl := &kueue.Workload{}
g.Expect(k8sClient.Get(ctx, noPriorityWlKey, wl)).To(gomega.Succeed())
g.Expect(wl.Status.Conditions).Should(utiltesting.HaveConditionStatusFalseAndReason(kueue.WorkloadQuotaReserved, "Pending"))
g.Expect(wl.Spec.PriorityClassRef).To(gomega.BeNil())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("Adding high WorkloadPriorityClass to the pending job via label update")
gomega.Eventually(func(g gomega.Gomega) {
createdJob := &batchv1.Job{}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(jobNoPriority), createdJob)).To(gomega.Succeed())
if createdJob.Labels == nil {
createdJob.Labels = make(map[string]string)
}
createdJob.Labels[constants.WorkloadPriorityClassLabel] = highWorkloadPriorityClass.Name
g.Expect(k8sClient.Update(ctx, createdJob)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("Verifying the workload's priority is updated")
util.ExpectWorkloadsWithWorkloadPriority(ctx, k8sClient, highWorkloadPriorityClass.Name, highWorkloadPriorityClass.Value, noPriorityWlKey)

ginkgo.By("Verifying the low-priority workload gets preempted")
lowWl := &kueue.Workload{}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, lowWlKey, lowWl)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
util.ExpectWorkloadsToBePreemptedByKey(ctx, k8sClient, lowWlKey)

ginkgo.By("Verifying the high-priority workload gets admitted")
util.ExpectWorkloadsToBeAdmittedByKey(ctx, k8sClient, devClusterQ.Name, noPriorityWlKey)
})
})

ginkgo.When("Patch a WorkloadPriorityClass", func() {
Expand Down
76 changes: 76 additions & 0 deletions test/integration/singlecluster/scheduler/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,82 @@ var _ = ginkgo.Describe("Preemption", func() {
cQPath := "/" + cq.Name
util.ExpectPreemptedCondition(ctx, k8sClient, kueue.InClusterQueueReason, metav1.ConditionTrue, lowWl, highWl, string(highWl.UID), "job-uid", cQPath, cQPath)
})

ginkgo.It("Should trigger preemption when WorkloadPriorityClass is added to suspended workload", func() {
ginkgo.By("Creating a normal priority workload that gets admitted and uses all quota")

normalPrioClass := utiltestingapi.MakeWorkloadPriorityClass("normal-priority").
PriorityValue(0).
Obj()
util.MustCreate(ctx, k8sClient, normalPrioClass)
defer func() {
util.ExpectObjectToBeDeleted(ctx, k8sClient, normalPrioClass, true)
}()

normalWl := utiltestingapi.MakeWorkload("normal-wl", ns.Name).
Queue(kueue.LocalQueueName(q.Name)).
Priority(0).
Request(corev1.ResourceCPU, "4").
Obj()

normalWl.Spec.PriorityClassRef = &kueue.PriorityClassRef{
Group: kueue.WorkloadPriorityClassGroup,
Kind: "WorkloadPriorityClass",
Name: "normal-priority",
}

util.MustCreate(ctx, k8sClient, normalWl)
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, normalWl)

ginkgo.By("Creating a suspended workload WITHOUT priority class")
suspendedWl := utiltestingapi.MakeWorkload("suspended-wl", ns.Name).
Queue(kueue.LocalQueueName(q.Name)).
Request(corev1.ResourceCPU, "4").
Active(false).
Obj()
util.MustCreate(ctx, k8sClient, suspendedWl)

ginkgo.By("Adding high WorkloadPriorityClass to the suspended workload")
gomega.Eventually(func(g gomega.Gomega) {
wl := &kueue.Workload{}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(suspendedWl), wl)).To(gomega.Succeed())
wl.Spec.PriorityClassRef = &kueue.PriorityClassRef{
Group: kueue.WorkloadPriorityClassGroup,
Kind: "WorkloadPriorityClass",
Name: "high-priority",
}
wl.Spec.Priority = ptr.To(highPriority)
g.Expect(k8sClient.Update(ctx, wl)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("Verifying the priority class was added")
gomega.Eventually(func(g gomega.Gomega) {
wl := &kueue.Workload{}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(suspendedWl), wl)).To(gomega.Succeed())
g.Expect(wl.Spec.PriorityClassRef).NotTo(gomega.BeNil())
g.Expect(wl.Spec.PriorityClassRef.Name).To(gomega.Equal("high-priority"))
g.Expect(wl.Spec.Priority).NotTo(gomega.BeNil())
g.Expect(*wl.Spec.Priority).To(gomega.Equal(highPriority))
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("Unsuspending the workload (set Active=true)")
gomega.Eventually(func(g gomega.Gomega) {
wl := &kueue.Workload{}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(suspendedWl), wl)).To(gomega.Succeed())
wl.Spec.Active = ptr.To(true)
g.Expect(k8sClient.Update(ctx, wl)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("Verifying the normal priority workload gets preempted")
util.ExpectWorkloadsToBePreempted(ctx, k8sClient, normalWl)
util.FinishEvictionForWorkloads(ctx, k8sClient, normalWl)

ginkgo.By("Verifying the high-priority workload gets admitted")
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, suspendedWl)

ginkgo.By("Verifying the normal priority workload is now pending")
util.ExpectWorkloadsToBePending(ctx, k8sClient, normalWl)
})
})

ginkgo.Context("In a ClusterQueue that is part of a cohort", func() {
Expand Down