Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
315 changes: 315 additions & 0 deletions pkg/scheduler/scheduler_tas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5860,3 +5860,318 @@ func TestScheduleForTASWhenWorkloadModifiedConcurrently(t *testing.T) {
}
}
}

// TestScheduleForTASPreemptionWithFeatureGateDisabled verifies that preemption works correctly
// when TopologyAwareScheduling feature gate is disabled, even if ResourceFlavor has topologyName configured.
// This test ensures that the fix in preemption.go (adding feature gate check before calling
// WorkloadsTopologyRequests) prevents errors when TAS is disabled but topology-related configurations exist.
func TestScheduleForTASPreemptionWithFeatureGateDisabled(t *testing.T) {
now := time.Now().Truncate(time.Second)
defaultSingleNode := []corev1.Node{
*testingnode.MakeNode("x1").
Label("tas-node", "true").
Label(corev1.LabelHostname, "x1").
StatusAllocatable(corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("5"),
corev1.ResourceMemory: resource.MustParse("5Gi"),
corev1.ResourcePods: resource.MustParse("10"),
}).
Ready().
Obj(),
}
defaultSingleLevelTopology := *utiltestingapi.MakeDefaultOneLevelTopology("tas-single-level")
defaultTASFlavor := *utiltestingapi.MakeResourceFlavor("tas-default").
NodeLabel("tas-node", "true").
TopologyName("tas-single-level").
Obj()
defaultClusterQueueWithPreemption := *utiltestingapi.MakeClusterQueue("tas-main").
Preemption(kueue.ClusterQueuePreemption{WithinClusterQueue: kueue.PreemptionPolicyLowerPriority}).
ResourceGroup(*utiltestingapi.MakeFlavorQuotas("tas-default").
Resource(corev1.ResourceCPU, "3").
Resource(corev1.ResourceMemory, "3Gi").Obj()).
Obj()
queues := []kueue.LocalQueue{
*utiltestingapi.MakeLocalQueue("tas-main", "default").ClusterQueue("tas-main").Obj(),
}
eventIgnoreMessage := cmpopts.IgnoreFields(utiltesting.EventRecord{}, "Message")
cases := map[string]struct {
nodes []corev1.Node
topologies []kueue.Topology
resourceFlavors []kueue.ResourceFlavor
clusterQueues []kueue.ClusterQueue
workloads []kueue.Workload
wantWorkloads []kueue.Workload

// wantNewAssignments is a summary of all new admissions in the cache after this cycle.
wantNewAssignments map[workload.Reference]kueue.Admission
// wantLeft is the workload keys that are left in the queues after this cycle.
wantLeft map[kueue.ClusterQueueReference][]workload.Reference
// wantInadmissibleLeft is the workload keys that are left in the inadmissible state after this cycle.
wantInadmissibleLeft map[kueue.ClusterQueueReference][]workload.Reference
// wantEvents asserts on the events, the comparison options are passed by eventCmpOpts
wantEvents []utiltesting.EventRecord
// eventCmpOpts are the comparison options for the events
eventCmpOpts cmp.Options

featureGates map[featuregate.Feature]bool
}{
"TAS disabled, preemption succeeds even with ResourceFlavor topologyName": {
// This test verifies that when TopologyAwareScheduling feature gate is disabled,
// preemption works correctly even if ResourceFlavor has topologyName configured.
// The fix ensures that WorkloadsTopologyRequests is not called when TAS is disabled,
// preventing "workload requires Topology, but there is no TAS cache information" errors.
nodes: defaultSingleNode,
topologies: []kueue.Topology{defaultSingleLevelTopology},
resourceFlavors: []kueue.ResourceFlavor{defaultTASFlavor},
clusterQueues: []kueue.ClusterQueue{defaultClusterQueueWithPreemption},
workloads: []kueue.Workload{
*utiltestingapi.MakeWorkload("high-priority", "default").
UID("wl-high-priority").
JobUID("job-high-priority").
Queue("tas-main").
Priority(2).
PodSets(*utiltestingapi.MakePodSet("main", 1).
Request(corev1.ResourceCPU, "3").
Obj()).
Obj(),
*utiltestingapi.MakeWorkload("low-priority", "default").
Queue("tas-main").
Priority(1).
ReserveQuotaAt(
utiltestingapi.MakeAdmission("tas-main").
PodSets(utiltestingapi.MakePodSetAssignment("main").
Assignment(corev1.ResourceCPU, "tas-default", "3").
Obj()).
Obj(),
now,
).
AdmittedAt(true, now).
PodSets(*utiltestingapi.MakePodSet("main", 1).
Request(corev1.ResourceCPU, "3").
Obj()).
Obj(),
},
wantWorkloads: []kueue.Workload{
*utiltestingapi.MakeWorkload("high-priority", "default").
UID("wl-high-priority").
JobUID("job-high-priority").
Queue("tas-main").
Priority(2).
PodSets(*utiltestingapi.MakePodSet("main", 1).
Request(corev1.ResourceCPU, "3").
Obj()).
Condition(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionFalse,
Reason: "Pending",
Message: "couldn't assign flavors to pod set main: insufficient unused quota for cpu in flavor tas-default, 3 more needed. Pending the preemption of 1 workload(s)",
LastTransitionTime: metav1.NewTime(now),
}).
ResourceRequests(kueue.PodSetRequest{
Name: "main",
Resources: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("3"),
},
}).
Obj(),
*utiltestingapi.MakeWorkload("low-priority", "default").
Queue("tas-main").
Priority(1).
ReserveQuotaAt(
utiltestingapi.MakeAdmission("tas-main").
PodSets(utiltestingapi.MakePodSetAssignment("main").
Assignment(corev1.ResourceCPU, "tas-default", "3").
Obj()).
Obj(),
now,
).
AdmittedAt(true, now).
PodSets(*utiltestingapi.MakePodSet("main", 1).
Request(corev1.ResourceCPU, "3").
Obj()).
Condition(metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
Reason: "ByTest",
Message: "Admitted by ClusterQueue tas-main",
LastTransitionTime: metav1.NewTime(now),
}).
Condition(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
Reason: "AdmittedByTest",
Message: "Admitted by ClusterQueue tas-main",
LastTransitionTime: metav1.NewTime(now),
}).
Condition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: "Preempted",
Message: "Preempted to accommodate a workload (UID: wl-high-priority, JobUID: job-high-priority) due to prioritization in the ClusterQueue; preemptor path: /tas-main; preemptee path: /tas-main",
LastTransitionTime: metav1.NewTime(now),
}).
Condition(metav1.Condition{
Type: kueue.WorkloadPreempted,
Status: metav1.ConditionTrue,
Reason: "InClusterQueue",
Message: "Preempted to accommodate a workload (UID: wl-high-priority, JobUID: job-high-priority) due to prioritization in the ClusterQueue; preemptor path: /tas-main; preemptee path: /tas-main",
LastTransitionTime: metav1.NewTime(now),
}).
SchedulingStatsEviction(kueue.WorkloadSchedulingStatsEviction{Reason: "Preempted", Count: 1}).
Obj(),
},
wantNewAssignments: map[workload.Reference]kueue.Admission{},
wantLeft: map[kueue.ClusterQueueReference][]workload.Reference{
"tas-main": {"default/high-priority"},
},
wantInadmissibleLeft: nil,
eventCmpOpts: cmp.Options{eventIgnoreMessage},
wantEvents: []utiltesting.EventRecord{
utiltesting.MakeEventRecord("default", "low-priority", "EvictedDueToPreempted", "Normal").
Message("Preempted to accommodate a workload (UID: wl-high-priority, JobUID: job-high-priority) due to prioritization in the ClusterQueue; preemptor path: /tas-main; preemptee path: /tas-main").
Obj(),
utiltesting.MakeEventRecord("default", "low-priority", "Preempted", "Normal").
Message("Preempted to accommodate a workload (UID: wl-high-priority, JobUID: job-high-priority) due to prioritization in the ClusterQueue; preemptor path: /tas-main; preemptee path: /tas-main").
Obj(),
utiltesting.MakeEventRecord("default", "high-priority", "Pending", "Warning").
Message("couldn't assign flavors to pod set main: insufficient unused quota for cpu in flavor tas-default, 3 more needed. Pending the preemption of 1 workload(s)").
Obj(),
},
featureGates: map[featuregate.Feature]bool{
features.TopologyAwareScheduling: false,
},
},
}

for name, tc := range cases {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to merge this test case into TestScheduleForTASPreemption? We could introduce there featureGates field like in TestScheduleForTAS which would allow to disable TopologyAwareScheduling.

I know isolated tests are nice, but it also has the cost of duplicating long setup and asserts code.

for _, enabled := range []bool{false, true} {
t.Run(fmt.Sprintf("%s WorkloadRequestUseMergePatch enabled: %t", name, enabled), func(t *testing.T) {
features.SetFeatureGateDuringTest(t, features.WorkloadRequestUseMergePatch, enabled)
for feature, value := range tc.featureGates {
features.SetFeatureGateDuringTest(t, feature, value)
}
ctx, log := utiltesting.ContextWithLog(t)
testWls := make([]kueue.Workload, 0, len(tc.workloads))
for _, wl := range tc.workloads {
testWls = append(testWls, *wl.DeepCopy())
}
clientBuilder := utiltesting.NewClientBuilder().
WithLists(
&kueue.WorkloadList{Items: testWls},
&kueue.TopologyList{Items: tc.topologies},
&corev1.NodeList{Items: tc.nodes},
&kueue.LocalQueueList{Items: queues}).
WithObjects(
utiltesting.MakeNamespace("default"),
).
WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}).
WithStatusSubresource(&kueue.Workload{})
_ = tasindexer.SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder))
cl := clientBuilder.Build()
recorder := &utiltesting.EventRecorder{}
cqCache := schdcache.New(cl)
qManager := qcache.NewManager(cl, cqCache)
topologyByName := slices.ToMap(tc.topologies, func(i int) (kueue.TopologyReference, kueue.Topology) {
return kueue.TopologyReference(tc.topologies[i].Name), tc.topologies[i]
})
// Note: We intentionally don't add Topology to cache when TAS is disabled
// to simulate the scenario where topology cache is missing
for _, flavor := range tc.resourceFlavors {
cqCache.AddOrUpdateResourceFlavor(log, &flavor)
if flavor.Spec.TopologyName != nil && tc.featureGates[features.TopologyAwareScheduling] {
t := topologyByName[*flavor.Spec.TopologyName]
cqCache.AddOrUpdateTopology(log, &t)
}
}
for _, cq := range tc.clusterQueues {
if err := cqCache.AddClusterQueue(ctx, &cq); err != nil {
t.Fatalf("Inserting clusterQueue %s in cache: %v", cq.Name, err)
}
if err := qManager.AddClusterQueue(ctx, &cq); err != nil {
t.Fatalf("Inserting clusterQueue %s in manager: %v", cq.Name, err)
}
if err := cl.Create(ctx, &cq); err != nil {
t.Fatalf("couldn't create the cluster queue: %v", err)
}
}
for _, q := range queues {
if err := qManager.AddLocalQueue(ctx, &q); err != nil {
t.Fatalf("Inserting queue %s/%s in manager: %v", q.Namespace, q.Name, err)
}
}
initiallyAdmittedWorkloads := sets.New[workload.Reference]()
for _, w := range testWls {
if workload.IsAdmitted(&w) {
initiallyAdmittedWorkloads.Insert(workload.Key(&w))
}
}
scheduler := New(qManager, cqCache, cl, recorder, WithClock(t, testingclock.NewFakeClock(now)))
wg := sync.WaitGroup{}
scheduler.setAdmissionRoutineWrapper(routine.NewWrapper(
func() { wg.Add(1) },
func() { wg.Done() },
))

ctx, cancel := context.WithTimeout(ctx, queueingTimeout)
go qManager.CleanUpOnContext(ctx)
defer cancel()

scheduler.schedule(ctx)
wg.Wait()
snapshot, err := cqCache.Snapshot(ctx)
if err != nil {
t.Fatalf("unexpected error while building snapshot: %v", err)
}

gotWorkloads := &kueue.WorkloadList{}
err = cl.List(ctx, gotWorkloads)
if err != nil {
t.Fatalf("Unexpected list workloads error: %v", err)
}

defaultWorkloadCmpOpts := cmp.Options{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(kueue.Workload{}, "ObjectMeta.ResourceVersion"),
cmpopts.SortSlices(func(a, b metav1.Condition) bool {
return a.Type < b.Type
}),
}

if diff := cmp.Diff(tc.wantWorkloads, gotWorkloads.Items, defaultWorkloadCmpOpts); diff != "" {
t.Errorf("Unexpected scheduled workloads (-want,+got):\n%s", diff)
}

gotAssignments := make(map[workload.Reference]kueue.Admission)
for cqName, c := range snapshot.ClusterQueues() {
for name, w := range c.Workloads {
if initiallyAdmittedWorkloads.Has(workload.Key(w.Obj)) {
continue
}
switch {
case !workload.HasQuotaReservation(w.Obj):
t.Fatalf("Workload %s is not admitted by a clusterQueue, but it is found as member of clusterQueue %s in the cache", name, cqName)
case w.Obj.Status.Admission.ClusterQueue != cqName:
t.Fatalf("Workload %s is admitted by clusterQueue %s, but it is found as member of clusterQueue %s in the cache", name, w.Obj.Status.Admission.ClusterQueue, cqName)
default:
gotAssignments[name] = *w.Obj.Status.Admission
}
}
}
if diff := cmp.Diff(tc.wantNewAssignments, gotAssignments, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("Unexpected assigned clusterQueues in cache (-want,+got):\n%s", diff)
}
qDump := qManager.Dump()
if diff := cmp.Diff(tc.wantLeft, qDump, cmpDump...); diff != "" {
t.Errorf("Unexpected elements left in the queue (-want,+got):\n%s", diff)
}
qDumpInadmissible := qManager.DumpInadmissible()
if diff := cmp.Diff(tc.wantInadmissibleLeft, qDumpInadmissible, cmpDump...); diff != "" {
t.Errorf("Unexpected elements left in inadmissible workloads (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(tc.wantEvents, recorder.RecordedEvents, tc.eventCmpOpts...); diff != "" {
t.Errorf("unexpected events (-want/+got):\n%s", diff)
}
})
}
}
}