Skip to content

Commit 695e72d

Browse files
authored
Refactor subGroups to contain Default SubGroup (NVIDIA#372)
* Move min available to DefaultSubGroup field * Append default subgroup as SubGroups list * Assign task to default subGroup Transfer PodInfos map into default SubGroup
1 parent 8bdb385 commit 695e72d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+903
-739
lines changed

pkg/scheduler/actions/allocate/allocate_subgroups_test.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,20 @@
44
package allocate_test
55

66
import (
7-
"testing"
8-
9-
. "go.uber.org/mock/gomock"
10-
"k8s.io/utils/pointer"
11-
"k8s.io/utils/ptr"
12-
13-
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/actions/allocate"
14-
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/actions/integration_tests/integration_tests_utils"
157
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_status"
168
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info"
179
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/constants"
18-
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils"
1910
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/jobs_fake"
2011
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/nodes_fake"
2112
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/tasks_fake"
13+
"k8s.io/utils/pointer"
14+
"k8s.io/utils/ptr"
15+
"testing"
16+
17+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/actions/allocate"
18+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/actions/integration_tests/integration_tests_utils"
19+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils"
20+
. "go.uber.org/mock/gomock"
2221
)
2322

2423
func TestHandleSubGroupsAllocation(t *testing.T) {

pkg/scheduler/actions/common/allocate.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,12 @@ func handleFailedTaskAllocation(job *podgroup_info.PodGroupInfo, unschedulableTa
126126
}
127127

128128
numRunningTasks := job.GetActivelyRunningTasksCount()
129-
if job.MinAvailable > 1 && numRunningTasks < job.MinAvailable {
129+
if job.GetDefaultMinAvailable() > 1 && numRunningTasks < job.GetDefaultMinAvailable() {
130130
job.SetJobFitError(
131131
podgroup_info.PodSchedulingErrors,
132132
fmt.Sprintf("Resources were found for %d pods while %d are required for gang scheduling. "+
133133
"Additional pods cannot be scheduled due to: %s",
134-
numSchedulableTasks, job.MinAvailable, allocationError.Error()),
134+
numSchedulableTasks, job.GetDefaultMinAvailable(), allocationError.Error()),
135135
nil)
136136
} else {
137137
job.SetJobFitError(

pkg/scheduler/actions/common/feasible_nodes.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
func FeasibleNodesForJob(allNodes []*node_info.NodeInfo, job *podgroup_info.PodGroupInfo) []*node_info.NodeInfo {
12-
for _, task := range job.PodInfos {
12+
for _, task := range job.GetAllPodsMap() {
1313
if !task.IsRequireAnyKindOfGPU() {
1414
return allNodes
1515
}

pkg/scheduler/actions/common/feasible_nodes_test.go

Lines changed: 74 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,13 @@ func TestFeasibleNodes(t *testing.T) {
8787
{
8888
name: "no nodes",
8989
job: &podgroup_info.PodGroupInfo{
90-
PodInfos: pod_info.PodsMap{
91-
"pod1": &pod_info.PodInfo{
92-
ResourceRequestType: pod_info.RequestTypeRegular,
93-
ResReq: resource_info.NewResourceRequirementsWithGpus(1),
94-
},
90+
SubGroups: map[string]*podgroup_info.SubGroupInfo{
91+
podgroup_info.DefaultSubGroup: podgroup_info.NewSubGroupInfo(podgroup_info.DefaultSubGroup, 1).WithPodInfos(pod_info.PodsMap{
92+
"pod1": &pod_info.PodInfo{
93+
ResourceRequestType: pod_info.RequestTypeRegular,
94+
ResReq: resource_info.NewResourceRequirementsWithGpus(1),
95+
},
96+
}),
9597
},
9698
},
9799
nodes: []*node_info.NodeInfo{},
@@ -100,11 +102,13 @@ func TestFeasibleNodes(t *testing.T) {
100102
{
101103
name: "CPU only job",
102104
job: &podgroup_info.PodGroupInfo{
103-
PodInfos: pod_info.PodsMap{
104-
"pod1": &pod_info.PodInfo{
105-
ResourceRequestType: pod_info.RequestTypeRegular,
106-
ResReq: resource_info.NewResourceRequirements(0, 1000, 0),
107-
},
105+
SubGroups: map[string]*podgroup_info.SubGroupInfo{
106+
podgroup_info.DefaultSubGroup: podgroup_info.NewSubGroupInfo(podgroup_info.DefaultSubGroup, 1).WithPodInfos(pod_info.PodsMap{
107+
"pod1": &pod_info.PodInfo{
108+
ResourceRequestType: pod_info.RequestTypeRegular,
109+
ResReq: resource_info.NewResourceRequirements(0, 1000, 0),
110+
},
111+
}),
108112
},
109113
},
110114
nodes: allNodes,
@@ -113,13 +117,15 @@ func TestFeasibleNodes(t *testing.T) {
113117
{
114118
name: "whole GPU job",
115119
job: &podgroup_info.PodGroupInfo{
116-
PodInfos: pod_info.PodsMap{
117-
"pod1": &pod_info.PodInfo{
118-
ResourceRequestType: pod_info.RequestTypeRegular,
119-
ResReq: &resource_info.ResourceRequirements{
120-
GpuResourceRequirement: *resource_info.NewGpuResourceRequirementWithGpus(2, 0),
120+
SubGroups: map[string]*podgroup_info.SubGroupInfo{
121+
podgroup_info.DefaultSubGroup: podgroup_info.NewSubGroupInfo(podgroup_info.DefaultSubGroup, 1).WithPodInfos(pod_info.PodsMap{
122+
"pod1": &pod_info.PodInfo{
123+
ResourceRequestType: pod_info.RequestTypeRegular,
124+
ResReq: &resource_info.ResourceRequirements{
125+
GpuResourceRequirement: *resource_info.NewGpuResourceRequirementWithGpus(2, 0),
126+
},
121127
},
122-
},
128+
}),
123129
},
124130
},
125131
nodes: allNodes,
@@ -128,18 +134,20 @@ func TestFeasibleNodes(t *testing.T) {
128134
{
129135
name: "distributed whole GPU job",
130136
job: &podgroup_info.PodGroupInfo{
131-
PodInfos: pod_info.PodsMap{
132-
"pod1": &pod_info.PodInfo{
133-
ResourceRequestType: pod_info.RequestTypeRegular,
134-
ResReq: &resource_info.ResourceRequirements{
135-
GpuResourceRequirement: *resource_info.NewGpuResourceRequirementWithGpus(2, 0),
137+
SubGroups: map[string]*podgroup_info.SubGroupInfo{
138+
podgroup_info.DefaultSubGroup: podgroup_info.NewSubGroupInfo(podgroup_info.DefaultSubGroup, 2).WithPodInfos(pod_info.PodsMap{
139+
"pod1": &pod_info.PodInfo{
140+
ResourceRequestType: pod_info.RequestTypeRegular,
141+
ResReq: &resource_info.ResourceRequirements{
142+
GpuResourceRequirement: *resource_info.NewGpuResourceRequirementWithGpus(2, 0),
143+
},
136144
},
137-
},
138-
"pod2": &pod_info.PodInfo{
139-
ResReq: &resource_info.ResourceRequirements{
140-
GpuResourceRequirement: *resource_info.NewGpuResourceRequirementWithGpus(2, 0),
145+
"pod2": &pod_info.PodInfo{
146+
ResReq: &resource_info.ResourceRequirements{
147+
GpuResourceRequirement: *resource_info.NewGpuResourceRequirementWithGpus(2, 0),
148+
},
141149
},
142-
},
150+
}),
143151
},
144152
},
145153
nodes: allNodes,
@@ -148,16 +156,18 @@ func TestFeasibleNodes(t *testing.T) {
148156
{
149157
name: "Mixed requests (whole GPU and CPU pods)",
150158
job: &podgroup_info.PodGroupInfo{
151-
PodInfos: pod_info.PodsMap{
152-
"pod1": &pod_info.PodInfo{
153-
ResourceRequestType: pod_info.RequestTypeRegular,
154-
ResReq: &resource_info.ResourceRequirements{
155-
GpuResourceRequirement: *resource_info.NewGpuResourceRequirementWithGpus(2, 0),
159+
SubGroups: map[string]*podgroup_info.SubGroupInfo{
160+
podgroup_info.DefaultSubGroup: podgroup_info.NewSubGroupInfo(podgroup_info.DefaultSubGroup, 2).WithPodInfos(pod_info.PodsMap{
161+
"pod1": &pod_info.PodInfo{
162+
ResourceRequestType: pod_info.RequestTypeRegular,
163+
ResReq: &resource_info.ResourceRequirements{
164+
GpuResourceRequirement: *resource_info.NewGpuResourceRequirementWithGpus(2, 0),
165+
},
166+
},
167+
"pod2": &pod_info.PodInfo{
168+
ResReq: resource_info.NewResourceRequirements(0, 1000, 2000),
156169
},
157-
},
158-
"pod2": &pod_info.PodInfo{
159-
ResReq: resource_info.NewResourceRequirements(0, 1000, 2000),
160-
},
170+
}),
161171
},
162172
},
163173
nodes: allNodes,
@@ -166,13 +176,15 @@ func TestFeasibleNodes(t *testing.T) {
166176
{
167177
name: "Fraction GPU job",
168178
job: &podgroup_info.PodGroupInfo{
169-
PodInfos: pod_info.PodsMap{
170-
"pod1": &pod_info.PodInfo{
171-
ResourceRequestType: pod_info.RequestTypeFraction,
172-
ResReq: &resource_info.ResourceRequirements{
173-
GpuResourceRequirement: *resource_info.NewGpuResourceRequirementWithGpus(0.5, 0),
179+
SubGroups: map[string]*podgroup_info.SubGroupInfo{
180+
podgroup_info.DefaultSubGroup: podgroup_info.NewSubGroupInfo(podgroup_info.DefaultSubGroup, 1).WithPodInfos(pod_info.PodsMap{
181+
"pod1": &pod_info.PodInfo{
182+
ResourceRequestType: pod_info.RequestTypeFraction,
183+
ResReq: &resource_info.ResourceRequirements{
184+
GpuResourceRequirement: *resource_info.NewGpuResourceRequirementWithGpus(0.5, 0),
185+
},
174186
},
175-
},
187+
}),
176188
},
177189
},
178190
nodes: allNodes,
@@ -181,14 +193,16 @@ func TestFeasibleNodes(t *testing.T) {
181193
{
182194
name: "GPU Memory job",
183195
job: &podgroup_info.PodGroupInfo{
184-
PodInfos: pod_info.PodsMap{
185-
"pod1": &pod_info.PodInfo{
186-
ResourceRequestType: pod_info.RequestTypeGpuMemory,
187-
ResReq: &resource_info.ResourceRequirements{
188-
GpuResourceRequirement: *resource_info.NewGpuResourceRequirementWithGpus(
189-
0, 500),
196+
SubGroups: map[string]*podgroup_info.SubGroupInfo{
197+
podgroup_info.DefaultSubGroup: podgroup_info.NewSubGroupInfo(podgroup_info.DefaultSubGroup, 1).WithPodInfos(pod_info.PodsMap{
198+
"pod1": &pod_info.PodInfo{
199+
ResourceRequestType: pod_info.RequestTypeGpuMemory,
200+
ResReq: &resource_info.ResourceRequirements{
201+
GpuResourceRequirement: *resource_info.NewGpuResourceRequirementWithGpus(
202+
0, 500),
203+
},
190204
},
191-
},
205+
}),
192206
},
193207
},
194208
nodes: allNodes,
@@ -197,17 +211,19 @@ func TestFeasibleNodes(t *testing.T) {
197211
{
198212
name: "MIG job",
199213
job: &podgroup_info.PodGroupInfo{
200-
PodInfos: pod_info.PodsMap{
201-
"pod1": &pod_info.PodInfo{
202-
ResourceRequestType: pod_info.RequestTypeMigInstance,
203-
ResReq: &resource_info.ResourceRequirements{
204-
GpuResourceRequirement: *resource_info.NewGpuResourceRequirementWithMig(
205-
map[v1.ResourceName]int64{
206-
"nvidia.com/mig-1g.10gb": 1,
207-
},
208-
),
214+
SubGroups: map[string]*podgroup_info.SubGroupInfo{
215+
podgroup_info.DefaultSubGroup: podgroup_info.NewSubGroupInfo(podgroup_info.DefaultSubGroup, 1).WithPodInfos(pod_info.PodsMap{
216+
"pod1": &pod_info.PodInfo{
217+
ResourceRequestType: pod_info.RequestTypeMigInstance,
218+
ResReq: &resource_info.ResourceRequirements{
219+
GpuResourceRequirement: *resource_info.NewGpuResourceRequirementWithMig(
220+
map[v1.ResourceName]int64{
221+
"nvidia.com/mig-1g.10gb": 1,
222+
},
223+
),
224+
},
209225
},
210-
},
226+
}),
211227
},
212228
},
213229
nodes: allNodes,

pkg/scheduler/actions/common/solvers/by_pod_solver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func getNodesOfJob(pj *podgroup_info.PodGroupInfo) []string {
207207
}
208208

209209
pjNodeNames := map[string]string{}
210-
for _, latestPotentialVictimTask := range pj.PodInfos {
210+
for _, latestPotentialVictimTask := range pj.GetAllPodsMap() {
211211
pjNodeNames[latestPotentialVictimTask.NodeName] = latestPotentialVictimTask.NodeName
212212
}
213213
return maps.Keys(pjNodeNames)
@@ -256,7 +256,7 @@ func extractJobsFromTasks(
256256
jobAlreadyExists := false
257257
if possibleDuplicates, ok := jobs[task.Job]; ok {
258258
for _, possibleDuplicate := range possibleDuplicates {
259-
for _, podInfo := range possibleDuplicate.PodInfos {
259+
for _, podInfo := range possibleDuplicate.GetAllPodsMap() {
260260
if podInfo.UID == task.UID {
261261
jobAlreadyExists = true
262262
break

pkg/scheduler/actions/common/solvers/job_solver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func (s *JobSolver) solvePartialJob(ssn *framework.Session, state *solvingState,
122122
func getPartialJobRepresentative(
123123
job *podgroup_info.PodGroupInfo, pendingTasks []*pod_info.PodInfo) *podgroup_info.PodGroupInfo {
124124
jobRepresentative := job.CloneWithTasks(pendingTasks)
125-
jobRepresentative.MinAvailable = int32(len(pendingTasks))
125+
jobRepresentative.SetDefaultMinAvailable(int32(len(pendingTasks)))
126126
subGroupsMinAvailable := map[string]int{}
127127
for _, pendingTask := range pendingTasks {
128128
if _, found := jobRepresentative.SubGroups[pendingTask.SubGroupName]; found {

pkg/scheduler/actions/common/solvers/pod_scenario_builder.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func NewPodAccumulatedScenarioBuilder(
3838
if len(tasksToAllocate) != 0 {
3939
scenario = solverscenario.NewByNodeScenario(session, pendingJob, pendingJob, nil, recordedVictimsJobs)
4040
for _, job := range recordedVictimsJobs {
41-
for podId, podInfo := range job.PodInfos {
41+
for podId, podInfo := range job.GetAllPodsMap() {
4242
recordedVictimsTasks[podId] = podInfo
4343
}
4444
}
@@ -86,7 +86,7 @@ func (asb *PodAccumulatedScenarioBuilder) addNextPotentialVictims() bool {
8686
// we still want to evaluate the job again if there are tasks
8787
// that are not recorded victims yet, like elastic jobs
8888
var remainingTasks []*pod_info.PodInfo
89-
for _, task := range nextVictimJob.PodInfos {
89+
for _, task := range nextVictimJob.GetAllPodsMap() {
9090
if _, ok := asb.recordedVictimsTasks[task.UID]; !ok {
9191
remainingTasks = append(remainingTasks, task)
9292
}
@@ -101,7 +101,7 @@ func (asb *PodAccumulatedScenarioBuilder) addNextPotentialVictims() bool {
101101

102102
if jobHasMoreTasks {
103103
var remainingTasks []*pod_info.PodInfo
104-
for _, task := range nextVictimJob.PodInfos {
104+
for _, task := range nextVictimJob.GetAllPodsMap() {
105105
if !slices.Contains(potentialVictimTasks, task) {
106106
remainingTasks = append(remainingTasks, task)
107107
}

pkg/scheduler/actions/common/solvers/pod_scenario_builder_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,15 @@ var _ = Describe("PodAccumulatedScenarioBuilder", func() {
9696
Expect(len(lastScenario.PotentialVictimsTasks())).To(Equal(4))
9797
for _, task := range lastScenario.PotentialVictimsTasks() {
9898
matchingJob := lastScenario.GetVictimJobRepresentativeById(task)
99-
Expect(len(matchingJob.PodInfos)).To(Equal(1))
99+
Expect(len(matchingJob.GetAllPodsMap())).To(Equal(1))
100100
}
101101

102102
})
103103

104104
It("returns scenario with all tasks in single groups when minAvailable is amount of pods", func() {
105105
for _, podGroupInfo := range ssn.PodGroupInfos {
106-
podGroupInfo.MinAvailable = int32(len(podGroupInfo.PodInfos))
107-
podGroupInfo.PodGroup.Spec.MinMember = int32(len(podGroupInfo.PodInfos))
106+
podGroupInfo.SetDefaultMinAvailable(int32(len(podGroupInfo.GetAllPodsMap())))
107+
podGroupInfo.PodGroup.Spec.MinMember = int32(len(podGroupInfo.GetAllPodsMap()))
108108
}
109109
scenarioBuilder = NewPodAccumulatedScenarioBuilder(ssn, reclaimerJob, []*podgroup_info.PodGroupInfo{},
110110
utils.GetVictimsQueue(ssn, nil))
@@ -120,7 +120,7 @@ var _ = Describe("PodAccumulatedScenarioBuilder", func() {
120120
Expect(len(lastScenario.PotentialVictimsTasks())).To(Equal(4))
121121
for _, task := range lastScenario.PotentialVictimsTasks() {
122122
matchingJob := lastScenario.GetVictimJobRepresentativeById(task)
123-
Expect(len(matchingJob.PodInfos)).To(Equal(2))
123+
Expect(len(matchingJob.GetAllPodsMap())).To(Equal(2))
124124
}
125125
})
126126
})
@@ -129,8 +129,8 @@ var _ = Describe("PodAccumulatedScenarioBuilder", func() {
129129
It("returns scenarios that have the same recorded victims", func() {
130130
ssn, _ = initializeSession(3, 2)
131131
for _, podGroupInfo := range ssn.PodGroupInfos {
132-
podGroupInfo.MinAvailable = int32(len(podGroupInfo.PodInfos))
133-
podGroupInfo.PodGroup.Spec.MinMember = int32(len(podGroupInfo.PodInfos))
132+
podGroupInfo.SetDefaultMinAvailable(int32(len(podGroupInfo.GetAllPodsMap())))
133+
podGroupInfo.PodGroup.Spec.MinMember = int32(len(podGroupInfo.GetAllPodsMap()))
134134
}
135135
submitQueue := createQueue("team-a")
136136
ssn.Queues[submitQueue.UID] = submitQueue
@@ -163,8 +163,8 @@ var _ = Describe("PodAccumulatedScenarioBuilder", func() {
163163
It("returns scenarios that have correct number of potential victims", func() {
164164
ssn, _ = initializeSession(3, 2)
165165
for _, podGroupInfo := range ssn.PodGroupInfos {
166-
podGroupInfo.MinAvailable = int32(len(podGroupInfo.PodInfos))
167-
podGroupInfo.PodGroup.Spec.MinMember = int32(len(podGroupInfo.PodInfos))
166+
podGroupInfo.SetDefaultMinAvailable(int32(len(podGroupInfo.GetAllPodsMap())))
167+
podGroupInfo.PodGroup.Spec.MinMember = int32(len(podGroupInfo.GetAllPodsMap()))
168168
}
169169
submitQueue := createQueue("team-a")
170170
ssn.Queues[submitQueue.UID] = submitQueue
@@ -202,7 +202,7 @@ var _ = Describe("PodAccumulatedScenarioBuilder", func() {
202202
ssn, _ = initializeSession(1, 3)
203203
minAvailable := 1
204204
for _, podGroupInfo := range ssn.PodGroupInfos {
205-
podGroupInfo.MinAvailable = int32(minAvailable)
205+
podGroupInfo.SetDefaultMinAvailable(int32(minAvailable))
206206
podGroupInfo.PodGroup.Spec.MinMember = int32(minAvailable)
207207
}
208208
submitQueue := createQueue("team-a")
@@ -214,7 +214,7 @@ var _ = Describe("PodAccumulatedScenarioBuilder", func() {
214214
// Only the first pod group with the last task is recordedVictimJobs
215215
for _, podGroupInfo := range ssn.PodGroupInfos {
216216
var partialTasks []*pod_info.PodInfo
217-
for _, podInfo := range podGroupInfo.PodInfos {
217+
for _, podInfo := range podGroupInfo.GetAllPodsMap() {
218218
// use last pod as recorded victim as sorting will be reversed
219219
if podInfo.Name == "pod-2" {
220220
partialTasks = append(partialTasks, podInfo)
@@ -243,7 +243,7 @@ var _ = Describe("PodAccumulatedScenarioBuilder", func() {
243243
ssn, _ = initializeSession(1, 4)
244244
minAvailable := 2
245245
for _, podGroupInfo := range ssn.PodGroupInfos {
246-
podGroupInfo.MinAvailable = int32(minAvailable)
246+
podGroupInfo.SetDefaultMinAvailable(int32(minAvailable))
247247
podGroupInfo.PodGroup.Spec.MinMember = int32(minAvailable)
248248
}
249249
submitQueue := createQueue("team-a")
@@ -255,7 +255,7 @@ var _ = Describe("PodAccumulatedScenarioBuilder", func() {
255255
// Only the first pod group with the last task is recordedVictimJobs
256256
for _, podGroupInfo := range ssn.PodGroupInfos {
257257
var partialTasks []*pod_info.PodInfo
258-
for _, podInfo := range podGroupInfo.PodInfos {
258+
for _, podInfo := range podGroupInfo.GetAllPodsMap() {
259259
// use last pod as recorded victim as sorting will be reversed
260260
if podInfo.Name == "pod-3" {
261261
partialTasks = append(partialTasks, podInfo)

0 commit comments

Comments
 (0)