Skip to content

Commit e1f2adc

Browse files
committed
fix(scheduler): preserve unschedulable pod condition
Signed-off-by: Erez Freiberger <enoodle@gmail.com>
1 parent 33b627b commit e1f2adc

2 files changed

Lines changed: 80 additions & 24 deletions

File tree

pkg/scheduler/cache/record_job_status_event_test.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -542,19 +542,26 @@ func TestRecordJobStatusEventScenarioSearchUnresolved(t *testing.T) {
542542
if err != nil {
543543
return nil, err
544544
}
545-
if len(pod.Status.Conditions) > 0 {
545+
if hasPodCondition(pod, v1.PodScheduled) &&
546+
hasPodCondition(pod, v1.PodConditionType(enginev2alpha2.ScenarioSearchUnresolved)) {
546547
return pod, nil
547548
}
548-
return nil, fmt.Errorf("no conditions found for pod %s", pod.Name)
549+
return nil, fmt.Errorf("missing expected conditions for pod %s", pod.Name)
549550
})
550-
assert.Nil(t, err)
551+
if err != nil {
552+
t.Fatal(err)
553+
}
551554

552555
pod := podObj.(*v1.Pod)
553-
podCondition := pod.Status.Conditions[0]
554-
assert.Equal(t, v1.PodConditionType(enginev2alpha2.ScenarioSearchUnresolved), podCondition.Type)
555-
assert.Equal(t, v1.ConditionTrue, podCondition.Status)
556-
assert.Equal(t, string(enginev2alpha2.ScenarioSearchUnresolved), podCondition.Reason)
557-
assert.Equal(t, exhaustedMessage, podCondition.Message)
556+
unschedulableCondition := podConditionByType(pod, v1.PodScheduled)
557+
assert.Equal(t, v1.ConditionFalse, unschedulableCondition.Status)
558+
assert.Equal(t, v1.PodReasonUnschedulable, unschedulableCondition.Reason)
559+
assert.Equal(t, "Node-Pool 'default': "+common_info.DefaultPodError, unschedulableCondition.Message)
560+
561+
scenarioSearchCondition := podConditionByType(pod, v1.PodConditionType(enginev2alpha2.ScenarioSearchUnresolved))
562+
assert.Equal(t, v1.ConditionTrue, scenarioSearchCondition.Status)
563+
assert.Equal(t, string(enginev2alpha2.ScenarioSearchUnresolved), scenarioSearchCondition.Reason)
564+
assert.Equal(t, exhaustedMessage, scenarioSearchCondition.Message)
558565
}
559566

560567
for podID, event := range getPodEvents(t, kubeClient) {
@@ -683,6 +690,19 @@ func waitForCondition(condition func() (runtime.Object, error)) (runtime.Object,
683690
}
684691
}
685692

693+
func hasPodCondition(pod *v1.Pod, conditionType v1.PodConditionType) bool {
694+
return podConditionByType(pod, conditionType) != nil
695+
}
696+
697+
func podConditionByType(pod *v1.Pod, conditionType v1.PodConditionType) *v1.PodCondition {
698+
for index := range pod.Status.Conditions {
699+
if pod.Status.Conditions[index].Type == conditionType {
700+
return &pod.Status.Conditions[index]
701+
}
702+
}
703+
return nil
704+
}
705+
686706
func validatePodEvents(t *testing.T, eventsPerPod map[common_info.PodID]*v1.Event, expectedPatterns map[common_info.PodID][]string) {
687707
for podID, expectedMessagePatterns := range expectedPatterns {
688708
event, found := eventsPerPod[podID]

pkg/scheduler/cache/status_updater/default_status_updater.go

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
commonconstants "github.com/kai-scheduler/KAI-scheduler/pkg/common/constants"
2424
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/common_info"
2525
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/eviction_info"
26+
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/pod_info"
2627
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/pod_status"
2728
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/podgroup_info"
2829
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/k8s_internal"
@@ -216,6 +217,9 @@ func (su *defaultStatusUpdater) RecordJobStatusEvent(job *podgroup_info.PodGroup
216217
return nil
217218
}
218219
if job.ScenarioSearchUnresolved != nil {
220+
if err := su.recordUnschedulablePodsConditions(job); err != nil {
221+
return err
222+
}
219223
if err := su.recordScenarioSearchUnresolvedPodsEvents(job); err != nil {
220224
return err
221225
}
@@ -355,7 +359,7 @@ func (su *defaultStatusUpdater) updatePodCondition(pod *v1.Pod, condition *v1.Po
355359
pod.Namespace, pod.Name, condition.Type, condition.Status)
356360
if k8s_internal.UpdatePodCondition(&pod.Status, condition) {
357361
statusPatchBaseObject := v1.PodStatus{}
358-
statusPatchBaseObject.Conditions = []v1.PodCondition{*condition}
362+
statusPatchBaseObject.Conditions = pod.Status.Conditions
359363
podStatusPatchBytes, err := json.Marshal(statusPatchBaseObject)
360364
if err != nil {
361365
return err
@@ -386,21 +390,7 @@ func (su *defaultStatusUpdater) recordUnschedulablePodsEvents(job *podgroup_info
386390
continue
387391
}
388392

389-
msg := common_info.DefaultPodError
390-
fitError := job.TasksFitErrors[taskInfo.UID]
391-
if fitError != nil {
392-
msg = fitError.Error()
393-
394-
if su.detailedFitErrors {
395-
msg = fitError.DetailedError()
396-
} else {
397-
log.InfraLogger.V(6).Infof("Full fit error: %s", fitError.DetailedError())
398-
}
399-
} else if len(job.JobFitErrors) > 0 {
400-
msg = fmt.Sprintf("%s", common_info.JobFitErrorsToMessage(job.JobFitErrors))
401-
}
402-
403-
msg = su.addNodePoolPrefixIfNeeded(job, msg)
393+
msg := su.unschedulableTaskMessage(job, taskInfo)
404394
log.InfraLogger.V(6).Infof("setting message for task: %v, %v", taskInfo.Name, msg)
405395
updatePodCondition := utils.GetMarkUnschedulableValue(job.PodGroup.Spec.MarkUnschedulable)
406396
if err := su.markTaskUnschedulable(taskInfo.Pod, msg, updatePodCondition); err != nil {
@@ -412,6 +402,52 @@ func (su *defaultStatusUpdater) recordUnschedulablePodsEvents(job *podgroup_info
412402
return errors.Join(errs...)
413403
}
414404

405+
func (su *defaultStatusUpdater) recordUnschedulablePodsConditions(job *podgroup_info.PodGroupInfo) error {
406+
if !utils.GetMarkUnschedulableValue(job.PodGroup.Spec.MarkUnschedulable) {
407+
return nil
408+
}
409+
410+
var errs []error
411+
for _, taskInfo := range job.PodStatusIndex[pod_status.Pending] {
412+
if job.IsInvalidSubGroupTask(taskInfo.UID) {
413+
continue
414+
}
415+
416+
msg := su.unschedulableTaskMessage(job, taskInfo)
417+
if err := su.updatePodCondition(taskInfo.Pod, &v1.PodCondition{
418+
Type: v1.PodScheduled,
419+
Status: v1.ConditionFalse,
420+
Reason: v1.PodReasonUnschedulable,
421+
Message: msg,
422+
}); err != nil {
423+
errs = append(errs, fmt.Errorf("failed to update unschedulable task status <%s/%s>: %v",
424+
taskInfo.Namespace, taskInfo.Name, err))
425+
}
426+
}
427+
428+
return errors.Join(errs...)
429+
}
430+
431+
func (su *defaultStatusUpdater) unschedulableTaskMessage(
432+
job *podgroup_info.PodGroupInfo, taskInfo *pod_info.PodInfo,
433+
) string {
434+
msg := common_info.DefaultPodError
435+
fitError := job.TasksFitErrors[taskInfo.UID]
436+
if fitError != nil {
437+
msg = fitError.Error()
438+
439+
if su.detailedFitErrors {
440+
msg = fitError.DetailedError()
441+
} else {
442+
log.InfraLogger.V(6).Infof("Full fit error: %s", fitError.DetailedError())
443+
}
444+
} else if len(job.JobFitErrors) > 0 {
445+
msg = fmt.Sprintf("%s", common_info.JobFitErrorsToMessage(job.JobFitErrors))
446+
}
447+
448+
return su.addNodePoolPrefixIfNeeded(job, msg)
449+
}
450+
415451
func (su *defaultStatusUpdater) recordScenarioSearchUnresolvedPodsEvents(job *podgroup_info.PodGroupInfo) error {
416452
var errs []error
417453
message := scenarioSearchUnresolvedMessage(job.ScenarioSearchUnresolved)

0 commit comments

Comments
 (0)