Skip to content

Commit ca8aaaa

Browse files
ArmedGuydavidLif
andauthored
scheduler: bugfix: Make pod_scenario_builder build scenarios for rest of elastic job (#132)
* add ginkgo test to pod_scenario_builder, add elastic scenario * add fix for pod_scenario_builder.go for elastic jobs * add new test cases for ensuring correct number of potential victims is still returned * Fails - e2e test to reclaim elastic job for a distributed job * Another elastic reclaim e2e test * update changelog --------- Co-authored-by: davidLif <[email protected]>
1 parent 6cf5744 commit ca8aaaa

File tree

4 files changed

+225
-2
lines changed

4 files changed

+225
-2
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
1212

1313
### Changed
1414
- Queue order function now takes into account potential victims, resulting in better reclaim scenarios.
15+
16+
### Fixes
17+
- Fixed preempt/reclaim of elastic workloads only taking one pod.

pkg/scheduler/actions/common/solvers/pod_scenario_builder.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,19 @@ func (asb *PodAccumulatedScenarioBuilder) GetNextScenario() *solverscenario.ByNo
7777
// Jump over recorded victims in potential victims generation
7878
for _, potentialVictimTask := range potentialVictimTasks {
7979
if _, ok := asb.recordedVictimsTasks[potentialVictimTask.UID]; ok {
80+
// If any of the tasks of the victim job are recorded victims
81+
// we still want to evaluate the job again if there are tasks
82+
// that are not recorded victims yet, like elastic jobs
83+
var remainingTasks []*pod_info.PodInfo
84+
for _, task := range nextVictimJob.PodInfos {
85+
if _, ok := asb.recordedVictimsTasks[task.UID]; !ok {
86+
remainingTasks = append(remainingTasks, task)
87+
}
88+
}
89+
if len(remainingTasks) != 0 {
90+
jobToPush := nextVictimJob.CloneWithTasks(remainingTasks)
91+
asb.victimsJobsQueue.PushJob(jobToPush)
92+
}
8093
return asb.GetNextScenario()
8194
}
8295
}

pkg/scheduler/actions/common/solvers/pod_scenario_builder_test.go

Lines changed: 127 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package solvers
66
import (
77
"fmt"
88
"strconv"
9+
"testing"
910

1011
. "github.com/onsi/ginkgo/v2"
1112
. "github.com/onsi/gomega"
@@ -108,7 +109,7 @@ var _ = Describe("PodAccumulatedScenarioBuilder", func() {
108109
})
109110

110111
Context("with recorded victims", func() {
111-
It("All scenarios have the same recorded victims", func() {
112+
It("returns scenarios that have the same recorded victims", func() {
112113
ssn, _ = initializeSession(3, 2)
113114
for _, podGroupInfo := range ssn.PodGroupInfos {
114115
podGroupInfo.MinAvailable = int32(len(podGroupInfo.PodInfos))
@@ -141,6 +142,126 @@ var _ = Describe("PodAccumulatedScenarioBuilder", func() {
141142

142143
Expect(numberOfGeneratedScenarios).To(Equal(2))
143144
})
145+
146+
It("returns scenarios that have correct number of potential victims", func() {
147+
ssn, _ = initializeSession(3, 2)
148+
for _, podGroupInfo := range ssn.PodGroupInfos {
149+
podGroupInfo.MinAvailable = int32(len(podGroupInfo.PodInfos))
150+
podGroupInfo.PodGroup.Spec.MinMember = int32(len(podGroupInfo.PodInfos))
151+
}
152+
submitQueue := createQueue("team-a")
153+
ssn.Queues[submitQueue.UID] = submitQueue
154+
reclaimerJob, _ = createJobWithTasks(1, 1, "team-a", v1.PodPending)
155+
156+
var recordedVictimsJobs []*podgroup_info.PodGroupInfo
157+
recordedVictimIndexes := []int{0, 2}
158+
podGroupIndex := 0
159+
160+
for _, podGroupInfo := range ssn.PodGroupInfos {
161+
if slices.Contains(recordedVictimIndexes, podGroupIndex) {
162+
recordedVictimsJobs = append(recordedVictimsJobs, podGroupInfo)
163+
}
164+
podGroupIndex += 1
165+
}
166+
167+
victimsQueue := utils.GetVictimsQueue(ssn, nil)
168+
169+
scenarioBuilder = NewPodAccumulatedScenarioBuilder(ssn, reclaimerJob, recordedVictimsJobs, victimsQueue)
170+
171+
numberOfGeneratedScenarios := 0
172+
potentialVictimsPerScenario := []int{0, 2}
173+
for sn := scenarioBuilder.GetCurrentScenario(); sn != nil; sn = scenarioBuilder.GetNextScenario() {
174+
Expect(numberOfGeneratedScenarios < len(potentialVictimsPerScenario)).To(BeTrue())
175+
Expect(len(sn.PotentialVictimsTasks())).To(Equal(potentialVictimsPerScenario[numberOfGeneratedScenarios]))
176+
numberOfGeneratedScenarios += 1
177+
}
178+
Expect(numberOfGeneratedScenarios).To(Equal(len(potentialVictimsPerScenario)))
179+
})
180+
})
181+
182+
Context("with recorded victims that are elastic", func() {
183+
It("returns scenarios that have the same recorded victims", func() {
184+
// run 1 job with 3 tasks, set minAvailable to 1 for elastic
185+
ssn, _ = initializeSession(1, 3)
186+
minAvailable := 1
187+
for _, podGroupInfo := range ssn.PodGroupInfos {
188+
podGroupInfo.MinAvailable = int32(minAvailable)
189+
podGroupInfo.PodGroup.Spec.MinMember = int32(minAvailable)
190+
}
191+
submitQueue := createQueue("team-a")
192+
ssn.Queues[submitQueue.UID] = submitQueue
193+
reclaimerJob, _ = createJobWithTasks(1, 2, "team-a", v1.PodPending)
194+
195+
var recordedVictimsJobs []*podgroup_info.PodGroupInfo
196+
197+
// Only the first pod group with the last task is recordedVictimJobs
198+
for _, podGroupInfo := range ssn.PodGroupInfos {
199+
var partialTasks []*pod_info.PodInfo
200+
for _, podInfo := range podGroupInfo.PodInfos {
201+
// use last pod as recorded victim as sorting will be reversed
202+
if podInfo.Name == "pod-2" {
203+
partialTasks = append(partialTasks, podInfo)
204+
}
205+
}
206+
recordedVictimsJobs = append(recordedVictimsJobs, podGroupInfo.CloneWithTasks(partialTasks))
207+
// we only want to change the first pod group, break after this
208+
break
209+
}
210+
211+
victimsQueue := utils.GetVictimsQueue(ssn, nil)
212+
213+
scenarioBuilder = NewPodAccumulatedScenarioBuilder(ssn, reclaimerJob, recordedVictimsJobs, victimsQueue)
214+
215+
numberOfGeneratedScenarios := 0
216+
for sn := scenarioBuilder.GetCurrentScenario(); sn != nil; sn = scenarioBuilder.GetNextScenario() {
217+
Expect(len(sn.RecordedVictimsJobs())).To(Equal(len(recordedVictimsJobs)))
218+
numberOfGeneratedScenarios += 1
219+
}
220+
221+
Expect(numberOfGeneratedScenarios).To(Equal(3))
222+
})
223+
224+
It("returns scenarios that have correct number of potential victims", func() {
225+
// run 1 job with 4 tasks, set minAvailable to 2 for elastic
226+
ssn, _ = initializeSession(1, 4)
227+
minAvailable := 2
228+
for _, podGroupInfo := range ssn.PodGroupInfos {
229+
podGroupInfo.MinAvailable = int32(minAvailable)
230+
podGroupInfo.PodGroup.Spec.MinMember = int32(minAvailable)
231+
}
232+
submitQueue := createQueue("team-a")
233+
ssn.Queues[submitQueue.UID] = submitQueue
234+
reclaimerJob, _ = createJobWithTasks(1, 2, "team-a", v1.PodPending)
235+
236+
var recordedVictimsJobs []*podgroup_info.PodGroupInfo
237+
238+
// Only the first pod group with the last task is recordedVictimJobs
239+
for _, podGroupInfo := range ssn.PodGroupInfos {
240+
var partialTasks []*pod_info.PodInfo
241+
for _, podInfo := range podGroupInfo.PodInfos {
242+
// use last pod as recorded victim as sorting will be reversed
243+
if podInfo.Name == "pod-3" {
244+
partialTasks = append(partialTasks, podInfo)
245+
}
246+
}
247+
recordedVictimsJobs = append(recordedVictimsJobs, podGroupInfo.CloneWithTasks(partialTasks))
248+
// we only want to change the first pod group, break after this
249+
break
250+
}
251+
252+
victimsQueue := utils.GetVictimsQueue(ssn, nil)
253+
254+
scenarioBuilder = NewPodAccumulatedScenarioBuilder(ssn, reclaimerJob, recordedVictimsJobs, victimsQueue)
255+
256+
numberOfGeneratedScenarios := 0
257+
potentialVictimsPerScenario := []int{0, 1, 3}
258+
for sn := scenarioBuilder.GetCurrentScenario(); sn != nil; sn = scenarioBuilder.GetNextScenario() {
259+
Expect(numberOfGeneratedScenarios < len(potentialVictimsPerScenario)).To(BeTrue())
260+
Expect(len(sn.PotentialVictimsTasks())).To(Equal(potentialVictimsPerScenario[numberOfGeneratedScenarios]))
261+
numberOfGeneratedScenarios += 1
262+
}
263+
Expect(numberOfGeneratedScenarios).To(Equal(len(potentialVictimsPerScenario)))
264+
})
144265
})
145266
})
146267

@@ -153,7 +274,6 @@ func initializeSession(jobsCount, tasksPerJob int) (*framework.Session, []*pod_i
153274
queues := []*queue_info.QueueInfo{defaultQueue}
154275

155276
for jobID := 0; jobID < jobsCount; jobID++ {
156-
jobTasks := []*pod_info.PodInfo{}
157277
queueName := fmt.Sprintf("team-%d", jobID)
158278
newJob, jobTasks := createJobWithTasks(tasksPerJob, jobID, queueName, v1.PodRunning)
159279
jobs = append(jobs, newJob)
@@ -278,3 +398,8 @@ func requireOneGPU() v1.ResourceRequirements {
278398
},
279399
}
280400
}
401+
402+
func TestScenarioSolvers(t *testing.T) {
403+
RegisterFailHandler(Fail)
404+
RunSpecs(t, "Scenario Solvers Suite")
405+
}

test/e2e/suites/reclaim/reclaim_elastic_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,86 @@ var _ = Describe("Reclaim with Elastic Jobs", Ordered, func() {
145145
return len(pods.Items) == 0
146146
})
147147
})
148+
149+
It("Reclaim elastic job for a distributed job", func(ctx context.Context) {
150+
testCtx = testcontext.GetConnectivity(ctx, Default)
151+
parentQueue, reclaimeeQueue, reclaimerQueue = createQueues(2, 0, 2)
152+
reclaimeeQueue.Spec.Resources.GPU.OverQuotaWeight = 0
153+
testCtx.InitQueues([]*v2.Queue{parentQueue, reclaimeeQueue, reclaimerQueue})
154+
reclaimeeNamespace = queue.GetConnectedNamespaceToQueue(reclaimeeQueue)
155+
156+
// reclaimee job
157+
reclaimeePodRequirements := v1.ResourceRequirements{
158+
Limits: map[v1.ResourceName]resource.Quantity{
159+
constants.GpuResource: resource.MustParse("1"),
160+
},
161+
}
162+
reclaimeePodGroup, reclaimeePods := pod_group.CreateWithPods(ctx, testCtx.KubeClientset, testCtx.KubeAiSchedClientset,
163+
"elastic-reclaimee-job", reclaimeeQueue, 2, nil,
164+
reclaimeePodRequirements)
165+
wait.ForPodsScheduled(ctx, testCtx.ControllerClient, reclaimeeNamespace, reclaimeePods)
166+
167+
// reclaimer job
168+
reclaimerPodRequirements := v1.ResourceRequirements{
169+
Limits: map[v1.ResourceName]resource.Quantity{
170+
constants.GpuResource: resource.MustParse("1"),
171+
},
172+
}
173+
_, reclaimerPods := pod_group.CreateDistributedJob(
174+
ctx, testCtx.KubeClientset, testCtx.ControllerClient,
175+
reclaimerQueue, 2, reclaimerPodRequirements, "",
176+
)
177+
reclaimerNamespace := queue.GetConnectedNamespaceToQueue(reclaimerQueue)
178+
wait.ForPodsScheduled(ctx, testCtx.ControllerClient, reclaimerNamespace, reclaimerPods)
179+
180+
// verify results
181+
wait.ForPodsWithCondition(ctx, testCtx.ControllerClient, func(watch.Event) bool {
182+
pods, err := testCtx.KubeClientset.CoreV1().Pods(reclaimeeNamespace).List(ctx, metav1.ListOptions{
183+
LabelSelector: fmt.Sprintf("%s=%s", podGroupLabelName, reclaimeePodGroup.Name),
184+
})
185+
Expect(err).To(Succeed())
186+
return len(pods.Items) == 0
187+
})
188+
})
189+
190+
It("Reclaim elastic job partially for a distributed job", func(ctx context.Context) {
191+
testCtx = testcontext.GetConnectivity(ctx, Default)
192+
parentQueue, reclaimeeQueue, reclaimerQueue = createQueues(3, 1, 2)
193+
reclaimeeQueue.Spec.Resources.GPU.OverQuotaWeight = 0
194+
testCtx.InitQueues([]*v2.Queue{parentQueue, reclaimeeQueue, reclaimerQueue})
195+
reclaimeeNamespace = queue.GetConnectedNamespaceToQueue(reclaimeeQueue)
196+
197+
// reclaimee job
198+
reclaimeePodRequirements := v1.ResourceRequirements{
199+
Limits: map[v1.ResourceName]resource.Quantity{
200+
constants.GpuResource: resource.MustParse("1"),
201+
},
202+
}
203+
reclaimeePodGroup, reclaimeePods := pod_group.CreateWithPods(ctx, testCtx.KubeClientset, testCtx.KubeAiSchedClientset,
204+
"elastic-reclaimee-job", reclaimeeQueue, 3, nil,
205+
reclaimeePodRequirements)
206+
wait.ForPodsScheduled(ctx, testCtx.ControllerClient, reclaimeeNamespace, reclaimeePods)
207+
208+
// reclaimer job
209+
reclaimerPodRequirements := v1.ResourceRequirements{
210+
Limits: map[v1.ResourceName]resource.Quantity{
211+
constants.GpuResource: resource.MustParse("1"),
212+
},
213+
}
214+
_, reclaimerPods := pod_group.CreateDistributedJob(
215+
ctx, testCtx.KubeClientset, testCtx.ControllerClient,
216+
reclaimerQueue, 2, reclaimerPodRequirements, "",
217+
)
218+
reclaimerNamespace := queue.GetConnectedNamespaceToQueue(reclaimerQueue)
219+
wait.ForPodsScheduled(ctx, testCtx.ControllerClient, reclaimerNamespace, reclaimerPods)
220+
221+
// verify results
222+
wait.ForPodsWithCondition(ctx, testCtx.ControllerClient, func(watch.Event) bool {
223+
pods, err := testCtx.KubeClientset.CoreV1().Pods(reclaimeeNamespace).List(ctx, metav1.ListOptions{
224+
LabelSelector: fmt.Sprintf("%s=%s", podGroupLabelName, reclaimeePodGroup.Name),
225+
})
226+
Expect(err).To(Succeed())
227+
return len(pods.Items) == 1
228+
})
229+
})
148230
})

0 commit comments

Comments
 (0)