Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cluster-autoscaler/builder/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,18 @@ func (b *AutoscalerBuilder) Build(ctx context.Context) (core.Autoscaler, *loop.L
return nil, nil, fmt.Errorf("informerFactory is missing: ensure WithInformerFactory() is called")
}

fwHandle, err := framework.NewHandle(ctx, b.informerFactory, autoscalingOptions.SchedulerConfig, autoscalingOptions.DynamicResourceAllocationEnabled, autoscalingOptions.CSINodeAwareSchedulingEnabled)
fwHandle, err := framework.NewHandle(ctx, b.informerFactory, autoscalingOptions.SchedulerConfig, autoscalingOptions.DynamicResourceAllocationEnabled, autoscalingOptions.CSINodeAwareSchedulingEnabled, autoscalingOptions.FastPredicatesEnabled)
if err != nil {
return nil, nil, err
}
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)

var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore(autoscalingOptions.ClusterSnapshotParallelism)
var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore(autoscalingOptions.ClusterSnapshotParallelism, autoscalingOptions.FastPredicatesEnabled)
opts := coreoptions.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
FrameworkHandle: fwHandle,
ClusterSnapshot: predicate.NewPredicateSnapshot(snapshotStore, fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled, autoscalingOptions.PredicateParallelism, autoscalingOptions.CSINodeAwareSchedulingEnabled),
ClusterSnapshot: predicate.NewPredicateSnapshot(snapshotStore, fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled, autoscalingOptions.PredicateParallelism, autoscalingOptions.CSINodeAwareSchedulingEnabled, autoscalingOptions.FastPredicatesEnabled),
KubeClient: b.kubeClient,
InformerFactory: b.informerFactory,
AutoscalingKubeClients: b.kubeClients,
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ type AutoscalingOptions struct {
ClusterSnapshotParallelism int
// PredicateParallelism is the number of goroutines to use for running scheduler predicates.
PredicateParallelism int
// FastPredicatesEnabled enables the fast predicates feature.
FastPredicatesEnabled bool
// CheckCapacityProcessorInstance is the name of the processor instance.
// Only ProvisioningRequests that define this name in their parameters with the key "processorInstance" will be processed by this CA instance.
// It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance.
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ var (
enableCSINodeAwareScheduling = flag.Bool("enable-csi-node-aware-scheduling", false, "Whether logic for handling CSINode objects is enabled.")
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
predicateParallelism = flag.Int("predicate-parallelism", 4, "Maximum parallelism of scheduler predicate checking.")
fastPredicatesEnabled = flag.Bool("fast-predicates-enabled", false, "Whether logic for fast predicate checking is enabled.")
checkCapacityProcessorInstance = flag.String("check-capacity-processor-instance", "", "Name of the processor instance. Only ProvisioningRequests that define this name in their parameters with the key \"processorInstance\" will be processed by this CA instance. It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance. Not recommended: Until CA 1.35, ProvisioningRequests with this name as prefix in their class will be also processed.")
nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive")
capacitybufferControllerEnabled = flag.Bool("capacity-buffer-controller-enabled", false, "Whether to enable the default controller for capacity buffers or not")
Expand Down Expand Up @@ -425,6 +426,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
CSINodeAwareSchedulingEnabled: *enableCSINodeAwareScheduling,
ClusterSnapshotParallelism: *clusterSnapshotParallelism,
PredicateParallelism: *predicateParallelism,
FastPredicatesEnabled: *fastPredicatesEnabled,
CheckCapacityProcessorInstance: *checkCapacityProcessorInstance,
MaxInactivityTime: *maxInactivityTimeFlag,
MaxFailingTime: *maxFailingTimeFlag,
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ func initializeDefaultOptions(ctx context.Context, opts *coreoptions.AutoscalerO
opts.AutoscalingKubeClients = ca_context.NewAutoscalingKubeClients(ctx, opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory)
}
if opts.FrameworkHandle == nil {
fwHandle, err := framework.NewHandle(ctx, opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled, opts.CSINodeAwareSchedulingEnabled)
fwHandle, err := framework.NewHandle(ctx, opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled, opts.CSINodeAwareSchedulingEnabled, opts.FastPredicatesEnabled)
if err != nil {
return err
}
opts.FrameworkHandle = fwHandle
}
if opts.ClusterSnapshot == nil {
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled, opts.PredicateParallelism, opts.CSINodeAwareSchedulingEnabled)
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(opts.FastPredicatesEnabled), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled, opts.PredicateParallelism, opts.CSINodeAwareSchedulingEnabled, opts.FastPredicatesEnabled)
}
if opts.RemainingPdbTracker == nil {
opts.RemainingPdbTracker = pdb.NewBasicRemainingPdbTracker()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,10 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
}
snapshots := map[string]func() clustersnapshot.ClusterSnapshot{
"basic": func() clustersnapshot.ClusterSnapshot {
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewBasicSnapshotStore())
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewBasicSnapshotStore(false))
},
"delta": func() clustersnapshot.ClusterSnapshot {
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore(16))
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore(16, false))
},
}
for snapshotName, snapshotFactory := range snapshots {
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
}

func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.autoscalingCtx.FrameworkHandle, a.autoscalingCtx.DynamicResourceAllocationEnabled, a.autoscalingCtx.PredicateParallelism, a.autoscalingCtx.CSINodeAwareSchedulingEnabled)
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(a.autoscalingCtx.FastPredicatesEnabled), a.autoscalingCtx.FrameworkHandle, a.autoscalingCtx.DynamicResourceAllocationEnabled, a.autoscalingCtx.PredicateParallelism, a.autoscalingCtx.CSINodeAwareSchedulingEnabled, a.autoscalingCtx.FastPredicatesEnabled)
pods, err := a.autoscalingCtx.AllPodLister().List()
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/static_autoscaler_csi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func TestStaticAutoscalerCSI(t *testing.T) {
// Create a framework handle with informer-backed listers for StorageClass/PVC/CSIDriver.
client := clientsetfake.NewSimpleClientset(k8sObjects...)
informerFactory := informers.NewSharedInformerFactory(client, 0)
fwHandle, err := framework.NewHandle(context.Background(), informerFactory, nil, false, true)
fwHandle, err := framework.NewHandle(context.Background(), informerFactory, nil, false, true, false)
require.NoError(t, err)
stopCh := make(chan struct{})
t.Cleanup(func() { close(stopCh) })
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestStaticAutoscalerCSI(t *testing.T) {

// Replace framework handle + snapshot with CSI-aware snapshot using the handle that has PVC/SC/CSIDriver informers.
autoscaler.AutoscalingContext.FrameworkHandle = fwHandle
autoscaler.AutoscalingContext.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, true, 1, true)
autoscaler.AutoscalingContext.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(false), fwHandle, true, 1, true, false)

// Provide CSI nodes snapshotting for the real nodes.
autoscaler.AutoscalingContext.CsiProvider = csinodeprovider.NewCSINodeProvider(&fakeCSINodeLister{nodes: csiNodes})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func TestFilterOutNodesWithUnreadyCSIResources(t *testing.T) {
csiSnapshot = tc.csiSnapshot
}

clusterSnapshotStore := store.NewBasicSnapshotStore()
clusterSnapshotStore := store.NewBasicSnapshotStore(false)
clusterSnapshot, _, _ := testsnapshot.NewCustomTestSnapshotAndHandle(clusterSnapshotStore)
clusterSnapshot.SetClusterState([]*apiv1.Node{}, []*apiv1.Pod{}, nil, csiSnapshot)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func TestFilterOutNodesWithUnreadyDRAResources(t *testing.T) {
}
provider.SetMachineTemplates(machineTemplates)
draSnapshot := drasnapshot.NewSnapshot(nil, tc.nodesSlices, nil, nil)
clusterSnapshotStore := store.NewBasicSnapshotStore()
clusterSnapshotStore := store.NewBasicSnapshotStore(false)
clusterSnapshot, _, _ := testsnapshot.NewCustomTestSnapshotAndHandle(clusterSnapshotStore)
clusterSnapshot.SetClusterState([]*apiv1.Node{}, []*apiv1.Pod{}, draSnapshot, nil)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry())
clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16))
clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16, false))
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...))
assert.NoError(t, err)
allPodsLister := fakeAllPodsLister{podsToList: append(append(tc.scheduledPods, tc.unschedulablePods...), tc.otherPods...)}
Expand Down
15 changes: 15 additions & 0 deletions cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
csisnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/snapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
Expand Down Expand Up @@ -111,6 +112,10 @@ type ClusterSnapshotStore interface {
schedulerinterface.SharedLister
Forkable

// FastPredicateLister returns an interface that allows querying internal state for fast predicate checking.
// The lister may return nil if the internal state is not available (fast predicates are disabled).
FastPredicateLister() FastPredicateLister

// StorePodInfo adds the given PodInfo to the Node with the given nodeName inside the snapshot.
StorePodInfo(podInfo *framework.PodInfo, nodeName string) error
// RemovePodInfo removes the given Pod from the snapshot.
Expand All @@ -127,6 +132,16 @@ type ClusterSnapshotStore interface {
Clear()
}

// FastPredicateLister allows querying precomputed internal state for fast predicate checking.
type FastPredicateLister interface {
// PodAffinityCount returns the number of pods matching the given selector in the given topology domain.
PodAffinityCount(topologyKey, topologyValue string, selector labels.Selector) int
// TopologyValueCount returns the number of unique values for the given topology key in the snapshot.
TopologyValueCount(topologyKey string) int
// TopologyDomains returns all unique values for the given topology key in the snapshot.
TopologyDomains(topologyKey string) []string
}

// ErrNodeNotFound means that a node wasn't found in the snapshot.
var ErrNodeNotFound = errors.New("node not found")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package predicate

import (
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
)

type FastPredicateState struct {

Check failure on line 30 in cluster-autoscaler/simulator/clustersnapshot/predicate/fast_predicates.go

View workflow job for this annotation

GitHub Actions / verify

exported type FastPredicateState should have comment or be unexported
podAffinitySelectors []labels.Selector
podAntiAffinitySelectors []labels.Selector
topologySpreadSelectors []labels.Selector
}

func (p *SchedulerPluginRunner) computeFastPredicateState(pod *apiv1.Pod) (*FastPredicateState, error) {
state := &FastPredicateState{}

affinity := pod.Spec.Affinity
if affinity != nil {
if affinity.PodAffinity != nil {
for _, term := range affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution {
sel, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
state.podAffinitySelectors = append(state.podAffinitySelectors, sel)
}
}
if affinity.PodAntiAffinity != nil {
for _, term := range affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution {
sel, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
state.podAntiAffinitySelectors = append(state.podAntiAffinitySelectors, sel)
}
}
}

for _, constraint := range pod.Spec.TopologySpreadConstraints {
if constraint.WhenUnsatisfiable == apiv1.DoNotSchedule {
sel, err := metav1.LabelSelectorAsSelector(constraint.LabelSelector)
if err != nil {
return nil, err
}
state.topologySpreadSelectors = append(state.topologySpreadSelectors, sel)
}
}

return state, nil
}

func (p *SchedulerPluginRunner) fastCheckPredicates(pod *apiv1.Pod, nodeInfo *framework.NodeInfo, state *FastPredicateState) clustersnapshot.SchedulingError {
lister := p.snapshot.FastPredicateLister()
if lister == nil {
klog.V(4).Infof("FastPredicateLister is nil, skipping fast predicates")
return nil
}

if err := p.fastCheckPodAffinity(pod, nodeInfo, lister, state); err != nil {
klog.V(4).Infof("fastCheckPodAffinity failed: %v", err)
return err
}

if err := p.fastCheckPodTopologySpread(pod, nodeInfo, lister, state); err != nil {
klog.V(4).Infof("fastCheckPodTopologySpread failed: %v", err)
return err
}

return nil
}

func (p *SchedulerPluginRunner) fastCheckPodAffinity(pod *apiv1.Pod, nodeInfo *framework.NodeInfo, lister clustersnapshot.FastPredicateLister, state *FastPredicateState) clustersnapshot.SchedulingError {
affinity := pod.Spec.Affinity
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
return nil
}

node := nodeInfo.Node()

if affinity.PodAffinity != nil {
for i, term := range affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution {
topoValue, ok := node.Labels[term.TopologyKey]
if !ok {
continue
}
var selector labels.Selector
var err error
if state != nil && i < len(state.podAffinitySelectors) {
selector = state.podAffinitySelectors[i]
} else {
selector, err = metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return clustersnapshot.NewSchedulingInternalError(pod, err.Error())
}
}
count := lister.PodAffinityCount(term.TopologyKey, topoValue, selector)
if count == 0 {
return clustersnapshot.NewFailingPredicateError(pod, "InterPodAffinity", []string{interpodaffinity.ErrReasonAffinityRulesNotMatch}, "", "")
}
}
}

if affinity.PodAntiAffinity != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At which point do we check if existing pods have anti-affinity against incoming pod? Incoming pod may have no AA in the spec, but we still need to check if it violates constraints of the existing pods.

for i, term := range affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution {
topoValue, ok := node.Labels[term.TopologyKey]
if !ok {
continue
}
var selector labels.Selector
var err error
if state != nil && i < len(state.podAntiAffinitySelectors) {
selector = state.podAntiAffinitySelectors[i]
} else {
selector, err = metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return clustersnapshot.NewSchedulingInternalError(pod, err.Error())
}
}
count := lister.PodAffinityCount(term.TopologyKey, topoValue, selector)
if count > 0 {
return clustersnapshot.NewFailingPredicateError(pod, "InterPodAffinity", []string{interpodaffinity.ErrReasonAntiAffinityRulesNotMatch}, "", "")
}
}
}

return nil
}

func (p *SchedulerPluginRunner) fastCheckPodTopologySpread(pod *apiv1.Pod, nodeInfo *framework.NodeInfo, lister clustersnapshot.FastPredicateLister, state *FastPredicateState) clustersnapshot.SchedulingError {
if len(pod.Spec.TopologySpreadConstraints) == 0 {
return nil
}

node := nodeInfo.Node()

spreadIdx := 0
for _, constraint := range pod.Spec.TopologySpreadConstraints {
if constraint.WhenUnsatisfiable != apiv1.DoNotSchedule {
continue
}

topoValue, ok := node.Labels[constraint.TopologyKey]
if !ok {
// If node doesn't have the topology key, it's usually skipped or fails depending on configuration.
spreadIdx++
continue
}

var selector labels.Selector
var err error
if state != nil && spreadIdx < len(state.topologySpreadSelectors) {
selector = state.topologySpreadSelectors[spreadIdx]
} else {
selector, err = metav1.LabelSelectorAsSelector(constraint.LabelSelector)
if err != nil {
return clustersnapshot.NewSchedulingInternalError(pod, err.Error())
}
}
spreadIdx++

count := lister.PodAffinityCount(constraint.TopologyKey, topoValue, selector)

// To implement maxSkew, we need minCount across all domains.
minCount := p.getMinTopologyCount(constraint.TopologyKey, selector, lister)

if int32(count+1-minCount) > constraint.MaxSkew {
return clustersnapshot.NewFailingPredicateError(pod, "PodTopologySpread", []string{podtopologyspread.ErrReasonConstraintsNotMatch}, "", "")
}
}

return nil
}

func (p *SchedulerPluginRunner) getMinTopologyCount(topoKey string, selector labels.Selector, lister clustersnapshot.FastPredicateLister) int {
domains := lister.TopologyDomains(topoKey)
if len(domains) == 0 {
return 0
}

minCount := -1
for _, domain := range domains {
count := lister.PodAffinityCount(topoKey, domain, selector)
if minCount == -1 || count < minCount {
minCount = count
}
if minCount == 0 {
break
}
}

if minCount == -1 {
return 0
}
return minCount
}
Loading
Loading