Skip to content

Commit 78380d5

Browse files
Enable TAS with ElasticJobsViaWorkloadSlices feature (#8580)
This change enables Topology-Aware Scheduling (TAS) to work correctly with the ElasticJobsViaWorkloadSlices feature, which allows jobs to dynamically scale via workload slices. When ElasticJobsViaWorkloadSlices is enabled and a workload slice is being replaced, the previous topology assignment is preserved for existing pods, ensuring pods across slices are found correctly. Signed-off-by: Sohan Kunkerkar <sohank2602@gmail.com>
1 parent e9c6db2 commit 78380d5

File tree

26 files changed

+1652
-101
lines changed

26 files changed

+1652
-101
lines changed

pkg/cache/scheduler/tas_cache_test.go

Lines changed: 208 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,16 @@ import (
3838

3939
// PodSetTestCase defines a test case for a single podset in the consolidated test.
4040
type PodSetTestCase struct {
41-
podSetName string
42-
topologyRequest *kueue.PodSetTopologyRequest
43-
requests resources.Requests
44-
count int32
45-
tolerations []corev1.Toleration
46-
nodeSelector map[string]string
47-
podSetGroupName *string
48-
wantAssignment *tas.TopologyAssignment
49-
wantReason string
41+
podSetName string
42+
topologyRequest *kueue.PodSetTopologyRequest
43+
requests resources.Requests
44+
count int32
45+
tolerations []corev1.Toleration
46+
nodeSelector map[string]string
47+
podSetGroupName *string
48+
previousAssignment *kueue.TopologyAssignment
49+
wantAssignment *tas.TopologyAssignment
50+
wantReason string
5051
}
5152

5253
func TestFindTopologyAssignments(t *testing.T) {
@@ -5983,6 +5984,201 @@ func TestFindTopologyAssignments(t *testing.T) {
59835984
},
59845985
},
59855986
},
5987+
"elastic workload scale up: delta-only placement preserves previous assignment": {
5988+
nodes: []corev1.Node{
5989+
*testingnode.MakeNode("x1").
5990+
Label(corev1.LabelHostname, "x1").
5991+
StatusAllocatable(corev1.ResourceList{
5992+
corev1.ResourceCPU: resource.MustParse("2"),
5993+
corev1.ResourcePods: resource.MustParse("10"),
5994+
}).
5995+
Ready().
5996+
Obj(),
5997+
*testingnode.MakeNode("x2").
5998+
Label(corev1.LabelHostname, "x2").
5999+
StatusAllocatable(corev1.ResourceList{
6000+
corev1.ResourceCPU: resource.MustParse("2"),
6001+
corev1.ResourcePods: resource.MustParse("10"),
6002+
}).
6003+
Ready().
6004+
Obj(),
6005+
*testingnode.MakeNode("x3").
6006+
Label(corev1.LabelHostname, "x3").
6007+
StatusAllocatable(corev1.ResourceList{
6008+
corev1.ResourceCPU: resource.MustParse("2"),
6009+
corev1.ResourcePods: resource.MustParse("10"),
6010+
}).
6011+
Ready().
6012+
Obj(),
6013+
},
6014+
levels: defaultOneLevel,
6015+
podSets: []PodSetTestCase{{
6016+
topologyRequest: &kueue.PodSetTopologyRequest{
6017+
Unconstrained: ptr.To(true),
6018+
},
6019+
requests: resources.Requests{
6020+
corev1.ResourceCPU: 1000,
6021+
},
6022+
count: 4,
6023+
previousAssignment: tas.V1Beta2From(&tas.TopologyAssignment{
6024+
Levels: []string{corev1.LabelHostname},
6025+
Domains: []tas.TopologyDomainAssignment{
6026+
{Count: 2, Values: []string{"x1"}},
6027+
},
6028+
}),
6029+
wantAssignment: &tas.TopologyAssignment{
6030+
Levels: []string{corev1.LabelHostname},
6031+
Domains: []tas.TopologyDomainAssignment{
6032+
{Count: 2, Values: []string{"x1"}},
6033+
{Count: 2, Values: []string{"x2"}},
6034+
},
6035+
},
6036+
}},
6037+
enableFeatureGates: []featuregate.Feature{features.ElasticJobsViaWorkloadSlices, features.ElasticJobsViaWorkloadSlicesWithTAS},
6038+
},
6039+
"elastic workload scale up: spread across multiple nodes preserved": {
6040+
nodes: []corev1.Node{
6041+
*testingnode.MakeNode("x1").
6042+
Label(corev1.LabelHostname, "x1").
6043+
StatusAllocatable(corev1.ResourceList{
6044+
corev1.ResourceCPU: resource.MustParse("2"),
6045+
corev1.ResourcePods: resource.MustParse("10"),
6046+
}).
6047+
Ready().
6048+
Obj(),
6049+
*testingnode.MakeNode("x2").
6050+
Label(corev1.LabelHostname, "x2").
6051+
StatusAllocatable(corev1.ResourceList{
6052+
corev1.ResourceCPU: resource.MustParse("2"),
6053+
corev1.ResourcePods: resource.MustParse("10"),
6054+
}).
6055+
Ready().
6056+
Obj(),
6057+
*testingnode.MakeNode("x3").
6058+
Label(corev1.LabelHostname, "x3").
6059+
StatusAllocatable(corev1.ResourceList{
6060+
corev1.ResourceCPU: resource.MustParse("2"),
6061+
corev1.ResourcePods: resource.MustParse("10"),
6062+
}).
6063+
Ready().
6064+
Obj(),
6065+
},
6066+
levels: defaultOneLevel,
6067+
podSets: []PodSetTestCase{{
6068+
topologyRequest: &kueue.PodSetTopologyRequest{
6069+
Unconstrained: ptr.To(true),
6070+
},
6071+
requests: resources.Requests{
6072+
corev1.ResourceCPU: 1000,
6073+
},
6074+
count: 4,
6075+
previousAssignment: tas.V1Beta2From(&tas.TopologyAssignment{
6076+
Levels: []string{corev1.LabelHostname},
6077+
Domains: []tas.TopologyDomainAssignment{
6078+
{Count: 1, Values: []string{"x1"}},
6079+
{Count: 1, Values: []string{"x2"}},
6080+
},
6081+
}),
6082+
wantAssignment: &tas.TopologyAssignment{
6083+
Levels: []string{corev1.LabelHostname},
6084+
Domains: []tas.TopologyDomainAssignment{
6085+
{Count: 1, Values: []string{"x1"}},
6086+
{Count: 1, Values: []string{"x2"}},
6087+
{Count: 2, Values: []string{"x3"}},
6088+
},
6089+
},
6090+
}},
6091+
enableFeatureGates: []featuregate.Feature{features.ElasticJobsViaWorkloadSlices, features.ElasticJobsViaWorkloadSlicesWithTAS},
6092+
},
6093+
"elastic workload scale down: truncates assignment": {
6094+
nodes: []corev1.Node{
6095+
*testingnode.MakeNode("x1").
6096+
Label(corev1.LabelHostname, "x1").
6097+
StatusAllocatable(corev1.ResourceList{
6098+
corev1.ResourceCPU: resource.MustParse("4"),
6099+
corev1.ResourcePods: resource.MustParse("10"),
6100+
}).
6101+
Ready().
6102+
Obj(),
6103+
*testingnode.MakeNode("x2").
6104+
Label(corev1.LabelHostname, "x2").
6105+
StatusAllocatable(corev1.ResourceList{
6106+
corev1.ResourceCPU: resource.MustParse("4"),
6107+
corev1.ResourcePods: resource.MustParse("10"),
6108+
}).
6109+
Ready().
6110+
Obj(),
6111+
},
6112+
levels: defaultOneLevel,
6113+
podSets: []PodSetTestCase{{
6114+
topologyRequest: &kueue.PodSetTopologyRequest{
6115+
Unconstrained: ptr.To(true),
6116+
},
6117+
requests: resources.Requests{
6118+
corev1.ResourceCPU: 1000,
6119+
},
6120+
count: 3,
6121+
previousAssignment: tas.V1Beta2From(&tas.TopologyAssignment{
6122+
Levels: []string{corev1.LabelHostname},
6123+
Domains: []tas.TopologyDomainAssignment{
6124+
{Count: 3, Values: []string{"x1"}},
6125+
{Count: 2, Values: []string{"x2"}},
6126+
},
6127+
}),
6128+
wantAssignment: &tas.TopologyAssignment{
6129+
Levels: []string{corev1.LabelHostname},
6130+
Domains: []tas.TopologyDomainAssignment{
6131+
{Count: 3, Values: []string{"x1"}},
6132+
},
6133+
},
6134+
}},
6135+
enableFeatureGates: []featuregate.Feature{features.ElasticJobsViaWorkloadSlices, features.ElasticJobsViaWorkloadSlicesWithTAS},
6136+
},
6137+
"elastic workload same count: reuses previous assignment exactly": {
6138+
nodes: []corev1.Node{
6139+
*testingnode.MakeNode("x1").
6140+
Label(corev1.LabelHostname, "x1").
6141+
StatusAllocatable(corev1.ResourceList{
6142+
corev1.ResourceCPU: resource.MustParse("4"),
6143+
corev1.ResourcePods: resource.MustParse("10"),
6144+
}).
6145+
Ready().
6146+
Obj(),
6147+
*testingnode.MakeNode("x2").
6148+
Label(corev1.LabelHostname, "x2").
6149+
StatusAllocatable(corev1.ResourceList{
6150+
corev1.ResourceCPU: resource.MustParse("4"),
6151+
corev1.ResourcePods: resource.MustParse("10"),
6152+
}).
6153+
Ready().
6154+
Obj(),
6155+
},
6156+
levels: defaultOneLevel,
6157+
podSets: []PodSetTestCase{{
6158+
topologyRequest: &kueue.PodSetTopologyRequest{
6159+
Unconstrained: ptr.To(true),
6160+
},
6161+
requests: resources.Requests{
6162+
corev1.ResourceCPU: 1000,
6163+
},
6164+
count: 3,
6165+
previousAssignment: tas.V1Beta2From(&tas.TopologyAssignment{
6166+
Levels: []string{corev1.LabelHostname},
6167+
Domains: []tas.TopologyDomainAssignment{
6168+
{Count: 2, Values: []string{"x1"}},
6169+
{Count: 1, Values: []string{"x2"}},
6170+
},
6171+
}),
6172+
wantAssignment: &tas.TopologyAssignment{
6173+
Levels: []string{corev1.LabelHostname},
6174+
Domains: []tas.TopologyDomainAssignment{
6175+
{Count: 2, Values: []string{"x1"}},
6176+
{Count: 1, Values: []string{"x2"}},
6177+
},
6178+
},
6179+
}},
6180+
enableFeatureGates: []featuregate.Feature{features.ElasticJobsViaWorkloadSlices, features.ElasticJobsViaWorkloadSlicesWithTAS},
6181+
},
59866182
}
59876183
for name, tc := range cases {
59886184
t.Run(name, func(t *testing.T) {
@@ -6036,8 +6232,9 @@ func TestFindTopologyAssignments(t *testing.T) {
60366232
},
60376233
},
60386234
},
6039-
SinglePodRequests: ps.requests,
6040-
Count: ps.count,
6235+
SinglePodRequests: ps.requests,
6236+
Count: ps.count,
6237+
PreviousAssignment: ps.previousAssignment,
60416238
}
60426239
if ps.podSetGroupName != nil {
60436240
tasInput.PodSetGroupName = ps.podSetGroupName
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
Copyright The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package scheduler
18+
19+
import (
20+
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2"
21+
"sigs.k8s.io/kueue/pkg/resources"
22+
utiltas "sigs.k8s.io/kueue/pkg/util/tas"
23+
)
24+
25+
// elasticPlacementResult holds the result of elastic workload placement.
26+
type elasticPlacementResult struct {
27+
// applied indicates delta placement was used; false means use standard placement.
28+
applied bool
29+
assignments map[kueue.PodSetReference]tasPodSetAssignmentResult
30+
}
31+
32+
// handleElasticWorkload processes delta-only placement for elastic workloads.
33+
// It keeps previous pods fixed and places only new pods during scale-up,
34+
// or truncates the assignment during scale-down.
35+
func (s *TASFlavorSnapshot) handleElasticWorkload(
36+
workers TASPodSetRequests,
37+
leader *TASPodSetRequests,
38+
assumedUsage map[utiltas.TopologyDomainID]resources.Requests,
39+
opts *findTopologyAssignmentsOption,
40+
) elasticPlacementResult {
41+
if workers.PreviousAssignment == nil {
42+
return elasticPlacementResult{applied: false}
43+
}
44+
45+
prevAssignment := utiltas.InternalFrom(workers.PreviousAssignment)
46+
47+
if isStale, staleDomain := s.IsTopologyAssignmentStale(prevAssignment); isStale {
48+
s.log.V(3).Info("previous TAS assignment is stale, doing fresh placement",
49+
"staleDomain", staleDomain)
50+
return elasticPlacementResult{applied: false}
51+
}
52+
53+
previousCount := utiltas.CountPodsInAssignment(prevAssignment)
54+
result := make(map[kueue.PodSetReference]tasPodSetAssignmentResult)
55+
56+
switch {
57+
case workers.Count > previousCount:
58+
return s.handleScaleUp(workers, leader, prevAssignment, previousCount, assumedUsage, opts)
59+
case workers.Count < previousCount:
60+
return s.handleScaleDown(workers, prevAssignment, assumedUsage)
61+
default:
62+
// Same count: reuse previous assignment
63+
result[workers.PodSet.Name] = tasPodSetAssignmentResult{TopologyAssignment: prevAssignment}
64+
addAssumedUsage(assumedUsage, prevAssignment, &workers)
65+
return elasticPlacementResult{applied: true, assignments: result}
66+
}
67+
}
68+
69+
// handleScaleUp places only delta pods while keeping previous pods fixed.
70+
func (s *TASFlavorSnapshot) handleScaleUp(
71+
workers TASPodSetRequests,
72+
leader *TASPodSetRequests,
73+
prevAssignment *utiltas.TopologyAssignment,
74+
previousCount int32,
75+
assumedUsage map[utiltas.TopologyDomainID]resources.Requests,
76+
opts *findTopologyAssignmentsOption,
77+
) elasticPlacementResult {
78+
result := make(map[kueue.PodSetReference]tasPodSetAssignmentResult)
79+
80+
deltaCount := workers.Count - previousCount
81+
deltaRequest := workers
82+
deltaRequest.Count = deltaCount
83+
deltaRequest.PreviousAssignment = nil
84+
85+
// Previous pods consume capacity
86+
prevAssumedUsage := utiltas.ComputeUsagePerDomain(prevAssignment, workers.SinglePodRequests)
87+
for domainID, usage := range prevAssumedUsage {
88+
if assumedUsage[domainID] == nil {
89+
assumedUsage[domainID] = resources.Requests{}
90+
}
91+
assumedUsage[domainID].Add(usage)
92+
}
93+
94+
deltaAssignments, reason := s.findTopologyAssignment(deltaRequest, leader, assumedUsage, opts.simulateEmpty, "")
95+
if reason != "" {
96+
result[workers.PodSet.Name] = tasPodSetAssignmentResult{FailureReason: reason}
97+
return elasticPlacementResult{applied: true, assignments: result}
98+
}
99+
100+
deltaAssignment := deltaAssignments[workers.PodSet.Name]
101+
finalAssignment := s.mergeTopologyAssignments(deltaAssignment, prevAssignment)
102+
result[workers.PodSet.Name] = tasPodSetAssignmentResult{TopologyAssignment: finalAssignment}
103+
104+
if leader != nil {
105+
result[leader.PodSet.Name] = tasPodSetAssignmentResult{TopologyAssignment: deltaAssignments[leader.PodSet.Name]}
106+
addAssumedUsage(assumedUsage, deltaAssignments[leader.PodSet.Name], leader)
107+
}
108+
109+
// Add only delta to avoid double-counting previous pods.
110+
addAssumedUsage(assumedUsage, deltaAssignment, &workers)
111+
return elasticPlacementResult{applied: true, assignments: result}
112+
}
113+
114+
// handleScaleDown truncates the previous assignment to fit fewer pods.
115+
func (s *TASFlavorSnapshot) handleScaleDown(
116+
workers TASPodSetRequests,
117+
prevAssignment *utiltas.TopologyAssignment,
118+
assumedUsage map[utiltas.TopologyDomainID]resources.Requests,
119+
) elasticPlacementResult {
120+
result := make(map[kueue.PodSetReference]tasPodSetAssignmentResult)
121+
122+
truncatedAssignment := utiltas.TruncateAssignment(prevAssignment, workers.Count)
123+
result[workers.PodSet.Name] = tasPodSetAssignmentResult{TopologyAssignment: truncatedAssignment}
124+
addAssumedUsage(assumedUsage, truncatedAssignment, &workers)
125+
return elasticPlacementResult{applied: true, assignments: result}
126+
}

0 commit comments

Comments
 (0)