Skip to content

Commit 225b71e

Browse files
authored
Calculate podsAllocateable for jobs requesting topology - topology job filter PRs part 2 (#353)
* Calculate podsAllocateable for jobs requesting topology * Clean allocation data for next calculations
1 parent db28c0f commit 225b71e

File tree

3 files changed

+610
-1
lines changed

3 files changed

+610
-1
lines changed

pkg/scheduler/plugins/topology/topology_plugin.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313

1414
const (
1515
topologyPluginName = "topology"
16-
noNodeName = ""
1716
)
1817

1918
type topologyPlugin struct {
@@ -41,6 +40,9 @@ func (t *topologyPlugin) OnSessionOpen(ssn *framework.Session) {
4140
t.sessionStateGetter = ssn
4241
t.nodesInfos = ssn.Nodes
4342
t.initializeTopologyTree(topologies, ssn)
43+
44+
//pre-predicate to generate the whole topology tree and store per workload
45+
ssn.AddPrePredicateFn(t.prePredicateFn)
4446
}
4547

4648
func (t *topologyPlugin) initializeTopologyTree(topologies []*kueuev1alpha1.Topology, ssn *framework.Session) {
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Copyright 2025 NVIDIA CORPORATION
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package topology
5+
6+
import (
7+
"fmt"
8+
9+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/node_info"
10+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info"
11+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info"
12+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/resource_info"
13+
)
14+
15+
type jobAllocationMetaData struct {
16+
maxPodResources *resource_info.ResourceRequirements
17+
allocationTestPods []*pod_info.PodInfo
18+
tasksToAllocate []*pod_info.PodInfo
19+
}
20+
21+
func (t *topologyPlugin) prePredicateFn(_ *pod_info.PodInfo, job *podgroup_info.PodGroupInfo) error {
22+
topologyTree, err := t.getJobTopology(job)
23+
if err != nil {
24+
return err
25+
}
26+
if topologyTree == nil {
27+
return nil
28+
}
29+
30+
// Calc tree job allocation data
31+
_, err = t.calcTreeAllocatable(job, topologyTree)
32+
if err != nil {
33+
return err
34+
}
35+
36+
// Clean allocation data from the tree
37+
for _, levelDomains := range topologyTree.DomainsByLevel {
38+
for _, domain := range levelDomains {
39+
domain.AllocatablePods = 0
40+
}
41+
}
42+
43+
return nil
44+
}
45+
46+
func (t *topologyPlugin) getJobTopology(job *podgroup_info.PodGroupInfo) (*TopologyInfo, error) {
47+
jobTopologyName := job.PodGroup.Spec.TopologyConstraint.Topology
48+
if jobTopologyName == "" {
49+
return nil, nil
50+
}
51+
topologyTree := t.TopologyTrees[jobTopologyName]
52+
if topologyTree == nil {
53+
return nil, fmt.Errorf("matching topology tree haven't been found for job %s, workload topology name: %s",
54+
job.PodGroup.Name, jobTopologyName)
55+
}
56+
return topologyTree, nil
57+
}
58+
59+
func (t *topologyPlugin) calcTreeAllocatable(job *podgroup_info.PodGroupInfo, topologyTree *TopologyInfo) (int, error) {
60+
jobAllocationMetaData, err := initJobAllocationMetadataStruct(job, t)
61+
if err != nil {
62+
return 0, err
63+
}
64+
65+
return t.calcSubTreeAllocatable(jobAllocationMetaData, topologyTree.Root)
66+
}
67+
68+
func initJobAllocationMetadataStruct(job *podgroup_info.PodGroupInfo, t *topologyPlugin) (*jobAllocationMetaData, error) {
69+
tasksToAllocate := podgroup_info.GetTasksToAllocate(job, t.taskOrderFunc, true)
70+
maxPodResources := resource_info.NewResourceRequirements(0, 0, 0)
71+
for _, podInfo := range tasksToAllocate {
72+
err := maxPodResources.SetMaxResource(podInfo.ResReq)
73+
if err != nil {
74+
return nil, err
75+
}
76+
}
77+
initialAllocationTestPods := []*pod_info.PodInfo{
78+
{Name: "1-pods-resources", ResReq: maxPodResources},
79+
}
80+
jobAllocationData := &jobAllocationMetaData{
81+
maxPodResources: maxPodResources,
82+
allocationTestPods: initialAllocationTestPods,
83+
tasksToAllocate: tasksToAllocate,
84+
}
85+
return jobAllocationData, nil
86+
}
87+
88+
func (t *topologyPlugin) calcSubTreeAllocatable(jobAllocationData *jobAllocationMetaData, rootDomain *TopologyDomainInfo) (int, error) {
89+
if rootDomain == nil {
90+
return 0, nil
91+
}
92+
93+
if len(rootDomain.Children) == 0 {
94+
for _, node := range rootDomain.Nodes {
95+
rootDomain.AllocatablePods += calcNodeAccomedation(jobAllocationData, node)
96+
}
97+
return rootDomain.AllocatablePods, nil
98+
}
99+
100+
for _, child := range rootDomain.Children {
101+
childAllocateable, err := t.calcSubTreeAllocatable(jobAllocationData, child)
102+
if err != nil {
103+
return 0, err
104+
}
105+
rootDomain.AllocatablePods += childAllocateable
106+
}
107+
return rootDomain.AllocatablePods, nil
108+
}
109+
110+
func calcNodeAccomedation(jobAllocationMetaData *jobAllocationMetaData, node *node_info.NodeInfo) int {
111+
allocateablePodsCount := 0
112+
for _, resourceRepresentorPod := range jobAllocationMetaData.allocationTestPods {
113+
if node.IsTaskAllocatable(resourceRepresentorPod) {
114+
allocateablePodsCount++
115+
} else {
116+
break
117+
}
118+
}
119+
// Add more to jobResourcesAllocationsRepresentors until node cannot accommodate any more pods
120+
if allocateablePodsCount == len(jobAllocationMetaData.allocationTestPods) {
121+
for i := allocateablePodsCount; i < len(jobAllocationMetaData.tasksToAllocate); i++ {
122+
latestTestPod := jobAllocationMetaData.allocationTestPods[len(jobAllocationMetaData.allocationTestPods)-1]
123+
124+
iAllocationsTestPod := &pod_info.PodInfo{
125+
Name: fmt.Sprintf("%d-pods-resources", allocateablePodsCount+1),
126+
ResReq: calcNextAllocationTestPodResources(latestTestPod.ResReq, jobAllocationMetaData.maxPodResources),
127+
}
128+
jobAllocationMetaData.allocationTestPods = append(jobAllocationMetaData.allocationTestPods, iAllocationsTestPod)
129+
if node.IsTaskAllocatable(iAllocationsTestPod) {
130+
allocateablePodsCount++
131+
} else {
132+
break
133+
}
134+
}
135+
}
136+
return allocateablePodsCount
137+
}
138+
139+
func calcNextAllocationTestPodResources(previousTestResources, maxPodResources *resource_info.ResourceRequirements) *resource_info.ResourceRequirements {
140+
nPlus1Resources := previousTestResources.Clone()
141+
nPlus1Resources.BaseResource.Add(&maxPodResources.BaseResource)
142+
if len(nPlus1Resources.GpuResourceRequirement.MigResources()) > 0 {
143+
for migResource, quant := range maxPodResources.GpuResourceRequirement.MigResources() {
144+
nPlus1Resources.GpuResourceRequirement.MigResources()[migResource] += quant
145+
}
146+
} else {
147+
updatedGpuResource := resource_info.NewGpuResourceRequirementWithMultiFraction(
148+
nPlus1Resources.GetNumOfGpuDevices(),
149+
nPlus1Resources.GpuFractionalPortion(),
150+
nPlus1Resources.GpuMemory())
151+
nPlus1Resources.GpuResourceRequirement = *updatedGpuResource
152+
}
153+
return nPlus1Resources
154+
}

0 commit comments

Comments
 (0)