Skip to content

Commit ad16378

Browse files
committed
Add scheduling strategies in CA.
To be more exact: - An iterator to control the order of nodes to go through. - An option to prefer the smallest number of iterations in case of multiple matches, even if parallelism is enabled.
1 parent 9a7264a commit ad16378

File tree

18 files changed

+236
-33
lines changed

18 files changed

+236
-33
lines changed

cluster-autoscaler/core/podlistprocessor/filter_out_schedulable.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (p *filterOutSchedulablePodListProcessor) filterOutSchedulableByPacking(uns
100100
return corev1helpers.PodPriority(unschedulableCandidates[i]) > corev1helpers.PodPriority(unschedulableCandidates[j])
101101
})
102102

103-
statuses, overflowingControllerCount, err := p.schedulingSimulator.TrySchedulePods(clusterSnapshot, unschedulableCandidates, p.nodeFilter, false)
103+
statuses, overflowingControllerCount, err := p.schedulingSimulator.TrySchedulePods(clusterSnapshot, unschedulableCandidates, p.nodeFilter, false, clustersnapshot.SchedulingOptions{})
104104
if err != nil {
105105
return nil, err
106106
}

cluster-autoscaler/core/scaledown/planner/planner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func (p *Planner) injectPods(pods []*apiv1.Pod) error {
261261
pods = pod_util.ClearPodNodeNames(pods)
262262
// Note: We're using ScheduleAnywhere, but the pods won't schedule back
263263
// on the drained nodes due to taints.
264-
statuses, _, err := p.actuationInjector.TrySchedulePods(p.autoscalingCtx.ClusterSnapshot, pods, scheduling.ScheduleAnywhere, true)
264+
statuses, _, err := p.actuationInjector.TrySchedulePods(p.autoscalingCtx.ClusterSnapshot, pods, scheduling.ScheduleAnywhere, true, clustersnapshot.SchedulingOptions{})
265265
if err != nil {
266266
return fmt.Errorf("cannot scale down, an unexpected error occurred: %v", err)
267267
}

cluster-autoscaler/estimator/binpacking_estimator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func (e *BinpackingNodeEstimator) tryToScheduleOnExistingNodes(
149149
// Try to schedule the pod on all nodes created during simulation
150150
nodeName, err := e.clusterSnapshot.SchedulePodOnAnyNodeMatching(pod, func(nodeInfo *framework.NodeInfo) bool {
151151
return estimationState.newNodeNames[nodeInfo.Node().Name]
152-
})
152+
}, clustersnapshot.SchedulingOptions{})
153153
if err != nil && err.Type() == clustersnapshot.SchedulingInternalError {
154154
// Unexpected error.
155155
return nil, err
@@ -192,7 +192,7 @@ func (e *BinpackingNodeEstimator) tryToScheduleOnNewNodes(
192192
// The pod can't be scheduled on any new node either, because it has the same topology constraints.
193193
nodeName, err := e.clusterSnapshot.SchedulePodOnAnyNodeMatching(pod, func(nodeInfo *framework.NodeInfo) bool {
194194
return nodeInfo.Node().Name != estimationState.lastNodeName // only skip the last node that failed scheduling
195-
})
195+
}, clustersnapshot.SchedulingOptions{})
196196
if err != nil && err.Type() == clustersnapshot.SchedulingInternalError {
197197
// Unexpected error.
198198
return false, err

cluster-autoscaler/processors/provreq/processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ const (
4646
)
4747

4848
type injector interface {
49-
TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool) ([]scheduling.Status, int, error)
49+
TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool, opts clustersnapshot.SchedulingOptions) ([]scheduling.Status, int, error)
5050
}
5151

5252
type provReqProcessor struct {
@@ -165,7 +165,7 @@ func (p *provReqProcessor) bookCapacity(autoscalingCtx *ca_context.AutoscalingCo
165165
return nil
166166
}
167167
// Scheduling the pods to reserve capacity for provisioning request.
168-
if _, _, err = p.injector.TrySchedulePods(autoscalingCtx.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil {
168+
if _, _, err = p.injector.TrySchedulePods(autoscalingCtx.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false, clustersnapshot.SchedulingOptions{}); err != nil {
169169
return err
170170
}
171171
return nil

cluster-autoscaler/processors/provreq/processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ type fakeInjector struct {
232232
pods []*apiv1.Pod
233233
}
234234

235-
func (f *fakeInjector) TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool) ([]scheduling.Status, int, error) {
235+
func (f *fakeInjector) TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool, opts clustersnapshot.SchedulingOptions) ([]scheduling.Status, int, error) {
236236
f.pods = pods
237237
return nil, 0, nil
238238
}

cluster-autoscaler/provisioningrequest/besteffortatomic/provisioning_class.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"k8s.io/apimachinery/pkg/types"
2424
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
2525
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
26+
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
2627
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
2728
"k8s.io/klog/v2"
2829

@@ -137,7 +138,7 @@ func (o *bestEffortAtomicProvClass) Provision(
137138
}
138139

139140
func (o *bestEffortAtomicProvClass) filterOutSchedulable(pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
140-
statuses, _, err := o.injector.TrySchedulePods(o.autoscalingCtx.ClusterSnapshot, pods, scheduling.ScheduleAnywhere, false)
141+
statuses, _, err := o.injector.TrySchedulePods(o.autoscalingCtx.ClusterSnapshot, pods, scheduling.ScheduleAnywhere, false, clustersnapshot.SchedulingOptions{})
141142
if err != nil {
142143
return nil, err
143144
}

cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
3737
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
3838
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
39+
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
3940
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
4041
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
4142
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
@@ -177,7 +178,7 @@ func (o *checkCapacityProvClass) checkCapacity(unschedulablePods []*apiv1.Pod, p
177178
o.autoscalingCtx.ClusterSnapshot.Fork()
178179

179180
// Case 1: Capacity fits.
180-
scheduled, _, err := o.schedulingSimulator.TrySchedulePods(o.autoscalingCtx.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true)
181+
scheduled, _, err := o.schedulingSimulator.TrySchedulePods(o.autoscalingCtx.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true, clustersnapshot.SchedulingOptions{})
181182
if err == nil && len(scheduled) == len(unschedulablePods) {
182183
commitError := o.autoscalingCtx.ClusterSnapshot.Commit()
183184
if commitError != nil {

cluster-autoscaler/simulator/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func (r *RemovalSimulator) findPlaceFor(removedNode string, pods []*apiv1.Pod, n
205205
newpods = append(newpods, &newpod)
206206
}
207207

208-
statuses, _, err := r.schedulingSimulator.TrySchedulePods(r.clusterSnapshot, newpods, isCandidateNode, true)
208+
statuses, _, err := r.schedulingSimulator.TrySchedulePods(r.clusterSnapshot, newpods, isCandidateNode, true, clustersnapshot.SchedulingOptions{})
209209
if err != nil {
210210
return err
211211
}

cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type ClusterSnapshot interface {
5757
// Node its scheduled on and nil are returned. If the pod can't be scheduled on any Node, an empty string and a non-nil
5858
// error explaining why are returned. The error Type() can be checked against SchedulingInternalError to distinguish
5959
// failing predicates from unexpected errors.
60-
SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (matchingNode string, err SchedulingError)
60+
SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool, opts SchedulingOptions) (matchingNode string, err SchedulingError)
6161
// UnschedulePod removes the given Pod from the given Node inside the snapshot, and modifies all relevant DRA objects
6262
// to reflect the removal. The pod can then be scheduled on another Node in the snapshot using the Schedule methods.
6363
UnschedulePod(namespace string, podName string, nodeName string) error

cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ import (
2121
"fmt"
2222
"strings"
2323
"sync"
24+
"time"
2425

2526
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
2627
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
2728
"k8s.io/client-go/util/workqueue"
29+
"k8s.io/klog/v2"
2830

2931
apiv1 "k8s.io/api/core/v1"
3032
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
@@ -47,12 +49,17 @@ func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapsh
4749
// function - until a Node where the Filters pass is found. Filters are only run for matching Nodes. If no matching Node with passing Filters is found, an error is returned.
4850
//
4951
// The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner.
50-
func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) {
52+
func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool, opts clustersnapshot.SchedulingOptions) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) {
5153
nodeInfosList, err := p.snapshot.ListNodeInfos()
5254
if err != nil {
5355
return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error listing NodeInfos: %v", err))
5456
}
5557

58+
nodeNameToIndex := make(map[string]int, len(nodeInfosList))
59+
for i, ni := range nodeInfosList {
60+
nodeNameToIndex[ni.Node().Name] = i
61+
}
62+
5663
p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot)
5764
defer p.fwHandle.DelegatingLister.ResetDelegate()
5865

@@ -74,13 +81,41 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM
7481
mu sync.Mutex
7582
)
7683

84+
var seqTracker *sequenceTracker
85+
getNodeIndex := func(i int) int {
86+
return (p.lastIndex + i + 1) % len(nodeInfosList)
87+
}
88+
89+
if opts.PreferSmallestSteps {
90+
seqTracker = newSequenceTracker()
91+
}
92+
93+
if opts.Next != nil {
94+
lastNode := nodeInfosList[p.lastIndex%len(nodeInfosList)].Node().Name
95+
getNodeIndex = func(i int) int {
96+
nodeName := opts.Next(lastNode, i)
97+
if nodeIndex, ok := nodeNameToIndex[nodeName]; ok {
98+
return nodeIndex
99+
}
100+
return -1
101+
}
102+
}
103+
77104
ctx, cancel := context.WithCancel(context.Background())
78105
defer cancel()
79106

80107
checkNode := func(i int) {
81-
nodeIndex := (p.lastIndex + i) % len(nodeInfosList)
108+
nodeIndex := getNodeIndex(i)
109+
if nodeIndex < 0 {
110+
return
111+
}
112+
82113
nodeInfo := nodeInfosList[nodeIndex]
83114

115+
if seqTracker != nil {
116+
defer seqTracker.Insert(i)
117+
}
118+
84119
// Plugins can filter some Nodes out during the PreFilter phase, if they're sure the Nodes won't work for the Pod at that stage.
85120
// Filters are only run for Nodes that haven't been filtered out during the PreFilter phase. Match that behavior here - skip such Nodes.
86121
if !preFilterResult.AllNodes() && !preFilterResult.NodeNames.Has(nodeInfo.Node().Name) {
@@ -104,6 +139,13 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM
104139
clonedState := state.Clone()
105140
filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), clonedState, pod, nodeInfo.ToScheduler())
106141
if filterStatus.IsSuccess() {
142+
if seqTracker != nil {
143+
waitTimeout := 10 * time.Second
144+
err := seqTracker.WaitUntilAllUpToInserted(ctx, i-1, waitTimeout)
145+
if err != nil {
146+
klog.Warningf("Error while waiting for all previous nodes to be attempted scheduling, proceeding...: %v", err)
147+
}
148+
}
107149
// Filter passed for all plugins, so this pod can be scheduled on this Node.
108150
mu.Lock()
109151
defer mu.Unlock()
@@ -120,7 +162,7 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM
120162
workqueue.ParallelizeUntil(ctx, p.parallelism, len(nodeInfosList), checkNode)
121163

122164
if foundNode != nil {
123-
p.lastIndex = (foundIndex + 1) % len(nodeInfosList)
165+
p.lastIndex = foundIndex
124166
return foundNode, foundCycleState, nil
125167
}
126168

0 commit comments

Comments
 (0)