Skip to content

Commit 1b5cc09

Browse files
authored
/1381- Add caching to GetTasksToAllocate and GetTasksToAllocateInitResource 2- GetActivelyRunningTasksCount returns the count instead of the pod 3- save the amount of ActiveAllocated pods in podgroup info instead of calculating them on request
1 parent 9395368 commit 1b5cc09

File tree

10 files changed

+336
-186
lines changed

10 files changed

+336
-186
lines changed

pkg/scheduler/actions/common/allocate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func handleFailedTaskAllocation(job *podgroup_info.PodGroupInfo, unschedulableTa
131131
allocationError.SetError(common_info.DefaultPodError)
132132
}
133133

134-
numRunningTasks := int32(len(job.GetActivelyRunningTasks()))
134+
numRunningTasks := job.GetActivelyRunningTasksCount()
135135
if job.MinAvailable > 1 && numRunningTasks < job.MinAvailable {
136136
job.SetJobFitError(
137137
podgroup_info.PodSchedulingErrors,

pkg/scheduler/actions/consolidation/consolidation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func buildPreemptibleFilterFunc(preemptor *podgroup_info.PodGroupInfo, maxPreemp
147147
return false
148148
}
149149

150-
if len(job.GetActiveAllocatedTasks()) == 0 {
150+
if job.GetActiveAllocatedTasksCount() == 0 {
151151
return false
152152
}
153153

pkg/scheduler/actions/utils/job_order_by_queue_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,9 @@ func TestJobsOrderByQueues_PushJob(t *testing.T) {
564564
jobsOrder.PushJob(tt.args.job)
565565

566566
for _, expectedJob := range tt.expected.expectedJobsList {
567+
_ = expectedJob.GetActiveAllocatedTasksCount()
567568
actualJob := jobsOrder.PopNextJob()
569+
_ = actualJob.GetActiveAllocatedTasksCount()
568570
assert.Equal(t, expectedJob, actualJob)
569571
}
570572
})

pkg/scheduler/api/podgroup_info/allocation_info.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ func HasTasksToAllocate(podGroupInfo *PodGroupInfo, isRealAllocation bool) bool
2727
func GetTasksToAllocate(
2828
podGroupInfo *PodGroupInfo, taskOrderFn common_info.LessFn, isRealAllocation bool,
2929
) []*pod_info.PodInfo {
30+
if podGroupInfo.tasksToAllocate != nil {
31+
return podGroupInfo.tasksToAllocate
32+
}
33+
3034
taskPriorityQueue := getTasksToAllocateQueue(podGroupInfo, taskOrderFn, isRealAllocation)
3135
maxNumOfTasksToAllocate := getNumOfTasksToAllocate(podGroupInfo, taskPriorityQueue.Len())
3236

@@ -36,6 +40,7 @@ func GetTasksToAllocate(
3640
tasksToAllocate = append(tasksToAllocate, nextPod)
3741
}
3842

43+
podGroupInfo.tasksToAllocate = tasksToAllocate
3944
return tasksToAllocate
4045
}
4146

@@ -77,18 +82,21 @@ func GetJobsToAllocateInitResource(
7782
func GetTasksToAllocateInitResource(
7883
podGroupInfo *PodGroupInfo, taskOrderFn common_info.LessFn, isRealAllocation bool,
7984
) *resource_info.Resource {
80-
tasksTotalRequestedResource := resource_info.EmptyResource()
81-
8285
if podGroupInfo == nil {
83-
return tasksTotalRequestedResource
86+
return resource_info.EmptyResource()
87+
}
88+
if podGroupInfo.tasksToAllocateInitResource != nil {
89+
return podGroupInfo.tasksToAllocateInitResource
8490
}
8591

92+
tasksTotalRequestedResource := resource_info.EmptyResource()
8693
for _, task := range GetTasksToAllocate(podGroupInfo, taskOrderFn, isRealAllocation) {
8794
if task.ShouldAllocate(isRealAllocation) {
8895
tasksTotalRequestedResource.AddResourceRequirements(task.ResReq)
8996
}
9097
}
9198

99+
podGroupInfo.tasksToAllocateInitResource = tasksTotalRequestedResource
92100
return tasksTotalRequestedResource
93101
}
94102

pkg/scheduler/api/podgroup_info/job_info.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
policyv1 "k8s.io/api/policy/v1"
1515
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1616
"k8s.io/apimachinery/pkg/types"
17+
"k8s.io/utils/ptr"
1718

1819
enginev2alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2"
1920
commonconstants "github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
@@ -61,8 +62,7 @@ type PodGroupInfo struct {
6162
NodesFitErrors map[common_info.PodID]*common_info.FitErrors
6263

6364
// All tasks of the Job.
64-
PodStatusIndex map[pod_status.PodStatus]pod_info.PodsMap
65-
PodInfos pod_info.PodsMap
65+
PodInfos pod_info.PodsMap
6666

6767
Allocated *resource_info.Resource
6868

@@ -76,6 +76,12 @@ type PodGroupInfo struct {
7676
StalenessInfo
7777

7878
schedulingConstraintsSignature common_info.SchedulingConstraintsSignature
79+
80+
// inner cache
81+
tasksToAllocate []*pod_info.PodInfo
82+
tasksToAllocateInitResource *resource_info.Resource
83+
PodStatusIndex map[pod_status.PodStatus]pod_info.PodsMap
84+
activeAllocatedCount *int
7985
}
8086

8187
func NewPodGroupInfo(uid common_info.PodGroupID, tasks ...*pod_info.PodInfo) *PodGroupInfo {
@@ -94,6 +100,7 @@ func NewPodGroupInfo(uid common_info.PodGroupID, tasks ...*pod_info.PodInfo) *Po
94100
TimeStamp: nil,
95101
Stale: false,
96102
},
103+
activeAllocatedCount: ptr.To(0),
97104
}
98105

99106
for _, task := range tasks {
@@ -155,6 +162,11 @@ func (podGroupInfo *PodGroupInfo) addTaskIndex(ti *pod_info.PodInfo) {
155162
}
156163

157164
podGroupInfo.PodStatusIndex[ti.Status][ti.UID] = ti
165+
if pod_status.IsActiveAllocatedStatus(ti.Status) {
166+
podGroupInfo.activeAllocatedCount = ptr.To(*podGroupInfo.activeAllocatedCount + 1)
167+
}
168+
169+
podGroupInfo.invalidateTasksCache()
158170
}
159171

160172
func (podGroupInfo *PodGroupInfo) AddTaskInfo(ti *pod_info.PodInfo) {
@@ -182,31 +194,44 @@ func (podGroupInfo *PodGroupInfo) UpdateTaskStatus(task *pod_info.PodInfo, statu
182194
func (podGroupInfo *PodGroupInfo) deleteTaskIndex(ti *pod_info.PodInfo) {
183195
if tasks, found := podGroupInfo.PodStatusIndex[ti.Status]; found {
184196
delete(tasks, ti.UID)
197+
if pod_status.IsActiveAllocatedStatus(ti.Status) {
198+
podGroupInfo.activeAllocatedCount = ptr.To(*podGroupInfo.activeAllocatedCount - 1)
199+
}
185200

186201
if len(tasks) == 0 {
187202
delete(podGroupInfo.PodStatusIndex, ti.Status)
188203
}
204+
205+
podGroupInfo.invalidateTasksCache()
189206
}
190207
}
191208

192-
func (podGroupInfo *PodGroupInfo) GetActiveAllocatedTasks() []*pod_info.PodInfo {
193-
var tasksToAllocate []*pod_info.PodInfo
194-
for _, task := range podGroupInfo.PodInfos {
195-
if pod_status.IsActiveAllocatedStatus(task.Status) {
196-
tasksToAllocate = append(tasksToAllocate, task)
209+
func (podGroupInfo *PodGroupInfo) invalidateTasksCache() {
210+
podGroupInfo.tasksToAllocate = nil
211+
podGroupInfo.tasksToAllocateInitResource = nil
212+
}
213+
214+
func (podGroupInfo *PodGroupInfo) GetActiveAllocatedTasksCount() int {
215+
if podGroupInfo.activeAllocatedCount == nil {
216+
var taskCount int
217+
for _, task := range podGroupInfo.PodInfos {
218+
if pod_status.IsActiveAllocatedStatus(task.Status) {
219+
taskCount++
220+
}
197221
}
222+
podGroupInfo.activeAllocatedCount = ptr.To(taskCount)
198223
}
199-
return tasksToAllocate
224+
return *podGroupInfo.activeAllocatedCount
200225
}
201226

202-
func (podGroupInfo *PodGroupInfo) GetActivelyRunningTasks() []*pod_info.PodInfo {
203-
var tasks []*pod_info.PodInfo
227+
func (podGroupInfo *PodGroupInfo) GetActivelyRunningTasksCount() int32 {
228+
tasksCount := int32(0)
204229
for _, task := range podGroupInfo.PodInfos {
205230
if pod_status.IsActiveUsedStatus(task.Status) {
206-
tasks = append(tasks, task)
231+
tasksCount += 1
207232
}
208233
}
209-
return tasks
234+
return tasksCount
210235
}
211236

212237
func (podGroupInfo *PodGroupInfo) DeleteTaskInfo(ti *pod_info.PodInfo) error {
@@ -342,8 +367,9 @@ func (podGroupInfo *PodGroupInfo) CloneWithTasks(tasks []*pod_info.PodInfo) *Pod
342367
PodGroup: podGroupInfo.PodGroup,
343368
PodGroupUID: podGroupInfo.PodGroupUID,
344369

345-
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{},
346-
PodInfos: pod_info.PodsMap{},
370+
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{},
371+
PodInfos: pod_info.PodsMap{},
372+
activeAllocatedCount: ptr.To(0),
347373
}
348374

349375
podGroupInfo.CreationTimestamp.DeepCopyInto(&info.CreationTimestamp)

pkg/scheduler/api/podgroup_info/job_info_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
v1 "k8s.io/api/core/v1"
1111
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"k8s.io/utils/ptr"
1213

1314
"github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2"
1415
commonconstants "github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
@@ -75,8 +76,9 @@ func TestAddTaskInfo(t *testing.T) {
7576
case01_task4.UID: case01_task4,
7677
},
7778
},
78-
JobFitErrors: make(v2alpha2.UnschedulableExplanations, 0),
79-
NodesFitErrors: map[common_info.PodID]*common_info.FitErrors{},
79+
activeAllocatedCount: ptr.To(3),
80+
JobFitErrors: make(v2alpha2.UnschedulableExplanations, 0),
81+
NodesFitErrors: map[common_info.PodID]*common_info.FitErrors{},
8082
},
8183
},
8284
}
@@ -150,8 +152,9 @@ func TestDeleteTaskInfo(t *testing.T) {
150152
pod_status.Pending: {case01_task1.UID: case01_task1},
151153
pod_status.Running: {case01_task3.UID: case01_task3},
152154
},
153-
JobFitErrors: make(v2alpha2.UnschedulableExplanations, 0),
154-
NodesFitErrors: map[common_info.PodID]*common_info.FitErrors{},
155+
activeAllocatedCount: ptr.To(1),
156+
JobFitErrors: make(v2alpha2.UnschedulableExplanations, 0),
157+
NodesFitErrors: map[common_info.PodID]*common_info.FitErrors{},
155158
},
156159
},
157160
{
@@ -175,8 +178,9 @@ func TestDeleteTaskInfo(t *testing.T) {
175178
case02_task3.UID: case02_task3,
176179
},
177180
},
178-
JobFitErrors: make(v2alpha2.UnschedulableExplanations, 0),
179-
NodesFitErrors: map[common_info.PodID]*common_info.FitErrors{},
181+
activeAllocatedCount: ptr.To(1),
182+
JobFitErrors: make(v2alpha2.UnschedulableExplanations, 0),
183+
NodesFitErrors: map[common_info.PodID]*common_info.FitErrors{},
180184
},
181185
},
182186
}

pkg/scheduler/cache/status_updater/default_status_updater.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func (su *defaultStatusUpdater) recordJobNotReadyEvent(job *podgroup_info.PodGro
217217
func (su *defaultStatusUpdater) markPodGroupUnschedulable(job *podgroup_info.PodGroupInfo, message string) bool {
218218
su.recorder.Event(job.PodGroup, v1.EventTypeNormal, enginev2alpha2.PodGroupReasonUnschedulable, message)
219219

220-
if len(job.GetActiveAllocatedTasks()) > 0 {
220+
if job.GetActiveAllocatedTasksCount() > 0 {
221221
// Don't update podgroup condition if there are any allocated pods (RUN-20673)
222222
return false
223223
}

pkg/scheduler/plugins/elastic/elastic.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ func JobOrderFn(l, r interface{}) int {
2929
lv := l.(*podgroup_info.PodGroupInfo)
3030
rv := r.(*podgroup_info.PodGroupInfo)
3131

32-
lvAllocatedCount := int32(len(lv.GetActiveAllocatedTasks()))
33-
rvAllocatedCount := int32(len(rv.GetActiveAllocatedTasks()))
32+
lvAllocatedCount := int32(lv.GetActiveAllocatedTasksCount())
33+
rvAllocatedCount := int32(rv.GetActiveAllocatedTasksCount())
3434

3535
if lvAllocatedCount < lv.MinAvailable && rvAllocatedCount >= rv.MinAvailable {
3636
return -1

0 commit comments

Comments
 (0)