diff --git a/cluster-autoscaler/builder/autoscaler.go b/cluster-autoscaler/builder/autoscaler.go index 93eef18d66fd..c0c54c3af9e8 100644 --- a/cluster-autoscaler/builder/autoscaler.go +++ b/cluster-autoscaler/builder/autoscaler.go @@ -151,18 +151,18 @@ func (b *AutoscalerBuilder) Build(ctx context.Context) (core.Autoscaler, *loop.L return nil, nil, fmt.Errorf("informerFactory is missing: ensure WithInformerFactory() is called") } - fwHandle, err := framework.NewHandle(ctx, b.informerFactory, autoscalingOptions.SchedulerConfig, autoscalingOptions.DynamicResourceAllocationEnabled, autoscalingOptions.CSINodeAwareSchedulingEnabled) + fwHandle, err := framework.NewHandle(ctx, b.informerFactory, autoscalingOptions.SchedulerConfig, autoscalingOptions.DynamicResourceAllocationEnabled, autoscalingOptions.CSINodeAwareSchedulingEnabled, autoscalingOptions.FastPredicatesEnabled) if err != nil { return nil, nil, err } deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) drainabilityRules := rules.Default(deleteOptions) - var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore(autoscalingOptions.ClusterSnapshotParallelism) + var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore(autoscalingOptions.ClusterSnapshotParallelism, autoscalingOptions.FastPredicatesEnabled) opts := coreoptions.AutoscalerOptions{ AutoscalingOptions: autoscalingOptions, FrameworkHandle: fwHandle, - ClusterSnapshot: predicate.NewPredicateSnapshot(snapshotStore, fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled, autoscalingOptions.PredicateParallelism, autoscalingOptions.CSINodeAwareSchedulingEnabled), + ClusterSnapshot: predicate.NewPredicateSnapshot(snapshotStore, fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled, autoscalingOptions.PredicateParallelism, autoscalingOptions.CSINodeAwareSchedulingEnabled, autoscalingOptions.FastPredicatesEnabled), KubeClient: b.kubeClient, InformerFactory: b.informerFactory, AutoscalingKubeClients: b.kubeClients, diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index f950ee2f6ddd..f603435878a3 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -327,6 +327,8 @@ type AutoscalingOptions struct { ClusterSnapshotParallelism int // PredicateParallelism is the number of goroutines to use for running scheduler predicates. PredicateParallelism int + // FastPredicatesEnabled enables the fast predicates feature. + FastPredicatesEnabled bool // CheckCapacityProcessorInstance is the name of the processor instance. // Only ProvisioningRequests that define this name in their parameters with the key "processorInstance" will be processed by this CA instance. // It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance. diff --git a/cluster-autoscaler/config/flags/flags.go b/cluster-autoscaler/config/flags/flags.go index 66aee39a6fd4..d83085be89e2 100644 --- a/cluster-autoscaler/config/flags/flags.go +++ b/cluster-autoscaler/config/flags/flags.go @@ -234,6 +234,7 @@ var ( enableCSINodeAwareScheduling = flag.Bool("enable-csi-node-aware-scheduling", false, "Whether logic for handling CSINode objects is enabled.") clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.") predicateParallelism = flag.Int("predicate-parallelism", 4, "Maximum parallelism of scheduler predicate checking.") + fastPredicatesEnabled = flag.Bool("fast-predicates-enabled", false, "Whether logic for fast predicate checking is enabled.") checkCapacityProcessorInstance = flag.String("check-capacity-processor-instance", "", "Name of the processor instance. Only ProvisioningRequests that define this name in their parameters with the key \"processorInstance\" will be processed by this CA instance. It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance. Not recommended: Until CA 1.35, ProvisioningRequests with this name as prefix in their class will be also processed.") nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive") capacitybufferControllerEnabled = flag.Bool("capacity-buffer-controller-enabled", false, "Whether to enable the default controller for capacity buffers or not") @@ -425,6 +426,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { CSINodeAwareSchedulingEnabled: *enableCSINodeAwareScheduling, ClusterSnapshotParallelism: *clusterSnapshotParallelism, PredicateParallelism: *predicateParallelism, + FastPredicatesEnabled: *fastPredicatesEnabled, CheckCapacityProcessorInstance: *checkCapacityProcessorInstance, MaxInactivityTime: *maxInactivityTimeFlag, MaxFailingTime: *maxFailingTimeFlag, diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 6cfc1eb2ee80..691a31ca2623 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -100,14 +100,14 @@ func initializeDefaultOptions(ctx context.Context, opts *coreoptions.AutoscalerO opts.AutoscalingKubeClients = ca_context.NewAutoscalingKubeClients(ctx, opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory) } if opts.FrameworkHandle == nil { - fwHandle, err := framework.NewHandle(ctx, opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled, opts.CSINodeAwareSchedulingEnabled) + fwHandle, err := framework.NewHandle(ctx, opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled, opts.CSINodeAwareSchedulingEnabled, opts.FastPredicatesEnabled) if err != nil { return err } opts.FrameworkHandle = fwHandle } if opts.ClusterSnapshot == nil { - opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled, opts.PredicateParallelism, opts.CSINodeAwareSchedulingEnabled) + opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(opts.FastPredicatesEnabled), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled, opts.PredicateParallelism, opts.CSINodeAwareSchedulingEnabled, opts.FastPredicatesEnabled) } if opts.RemainingPdbTracker == nil { opts.RemainingPdbTracker = pdb.NewBasicRemainingPdbTracker() diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go index 3ddb5a488409..9671f2eb6980 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go @@ -250,10 +250,10 @@ func BenchmarkFilterOutSchedulable(b *testing.B) { } snapshots := map[string]func() clustersnapshot.ClusterSnapshot{ "basic": func() clustersnapshot.ClusterSnapshot { - return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewBasicSnapshotStore()) + return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewBasicSnapshotStore(false)) }, "delta": func() clustersnapshot.ClusterSnapshot { - return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore(16)) + return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore(16, false)) }, } for snapshotName, snapshotFactory := range snapshots { diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index 3dfbda177b91..0f9274e36a20 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -408,7 +408,7 @@ func (a *Actuator) taintNode(node *apiv1.Node) error { } func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) { - snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.autoscalingCtx.FrameworkHandle, a.autoscalingCtx.DynamicResourceAllocationEnabled, a.autoscalingCtx.PredicateParallelism, a.autoscalingCtx.CSINodeAwareSchedulingEnabled) + snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(a.autoscalingCtx.FastPredicatesEnabled), a.autoscalingCtx.FrameworkHandle, a.autoscalingCtx.DynamicResourceAllocationEnabled, a.autoscalingCtx.PredicateParallelism, a.autoscalingCtx.CSINodeAwareSchedulingEnabled, a.autoscalingCtx.FastPredicatesEnabled) pods, err := a.autoscalingCtx.AllPodLister().List() if err != nil { return nil, err diff --git a/cluster-autoscaler/core/static_autoscaler_csi_test.go b/cluster-autoscaler/core/static_autoscaler_csi_test.go index 8410b2326783..f9bc0669df85 100644 --- a/cluster-autoscaler/core/static_autoscaler_csi_test.go +++ b/cluster-autoscaler/core/static_autoscaler_csi_test.go @@ -223,7 +223,7 @@ func TestStaticAutoscalerCSI(t *testing.T) { // Create a framework handle with informer-backed listers for StorageClass/PVC/CSIDriver. client := clientsetfake.NewSimpleClientset(k8sObjects...) informerFactory := informers.NewSharedInformerFactory(client, 0) - fwHandle, err := framework.NewHandle(context.Background(), informerFactory, nil, false, true) + fwHandle, err := framework.NewHandle(context.Background(), informerFactory, nil, false, true, false) require.NoError(t, err) stopCh := make(chan struct{}) t.Cleanup(func() { close(stopCh) }) @@ -279,7 +279,7 @@ func TestStaticAutoscalerCSI(t *testing.T) { // Replace framework handle + snapshot with CSI-aware snapshot using the handle that has PVC/SC/CSIDriver informers. autoscaler.AutoscalingContext.FrameworkHandle = fwHandle - autoscaler.AutoscalingContext.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, true, 1, true) + autoscaler.AutoscalingContext.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(false), fwHandle, true, 1, true, false) // Provide CSI nodes snapshotting for the real nodes. autoscaler.AutoscalingContext.CsiProvider = csinodeprovider.NewCSINodeProvider(&fakeCSINodeLister{nodes: csiNodes}) diff --git a/cluster-autoscaler/processors/customresources/csi_processor_test.go b/cluster-autoscaler/processors/customresources/csi_processor_test.go index 757ac75cd4ab..407f2c2a5758 100644 --- a/cluster-autoscaler/processors/customresources/csi_processor_test.go +++ b/cluster-autoscaler/processors/customresources/csi_processor_test.go @@ -301,7 +301,7 @@ func TestFilterOutNodesWithUnreadyCSIResources(t *testing.T) { csiSnapshot = tc.csiSnapshot } - clusterSnapshotStore := store.NewBasicSnapshotStore() + clusterSnapshotStore := store.NewBasicSnapshotStore(false) clusterSnapshot, _, _ := testsnapshot.NewCustomTestSnapshotAndHandle(clusterSnapshotStore) clusterSnapshot.SetClusterState([]*apiv1.Node{}, []*apiv1.Pod{}, nil, csiSnapshot) diff --git a/cluster-autoscaler/processors/customresources/dra_processor_test.go b/cluster-autoscaler/processors/customresources/dra_processor_test.go index 229c9fc53926..144ff168487f 100644 --- a/cluster-autoscaler/processors/customresources/dra_processor_test.go +++ b/cluster-autoscaler/processors/customresources/dra_processor_test.go @@ -395,7 +395,7 @@ func TestFilterOutNodesWithUnreadyDRAResources(t *testing.T) { } provider.SetMachineTemplates(machineTemplates) draSnapshot := drasnapshot.NewSnapshot(nil, tc.nodesSlices, nil, nil) - clusterSnapshotStore := store.NewBasicSnapshotStore() + clusterSnapshotStore := store.NewBasicSnapshotStore(false) clusterSnapshot, _, _ := testsnapshot.NewCustomTestSnapshotAndHandle(clusterSnapshotStore) clusterSnapshot.SetClusterState([]*apiv1.Node{}, []*apiv1.Pod{}, draSnapshot, nil) diff --git a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go index ff14d377637b..c3b02bd08207 100644 --- a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go +++ b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go @@ -135,7 +135,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry()) - clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16)) + clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16, false)) err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...)) assert.NoError(t, err) allPodsLister := fakeAllPodsLister{podsToList: append(append(tc.scheduledPods, tc.unschedulablePods...), tc.otherPods...)} diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go index 1884da814257..4c40d1aa5293 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go @@ -20,6 +20,7 @@ import ( "errors" apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" csisnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/snapshot" drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" @@ -111,6 +112,10 @@ type ClusterSnapshotStore interface { schedulerinterface.SharedLister Forkable + // FastPredicateLister returns an interface that allows querying internal state for fast predicate checking. + // The lister may return nil if the internal state is not available (fast predicates are disabled). + FastPredicateLister() FastPredicateLister + // StorePodInfo adds the given PodInfo to the Node with the given nodeName inside the snapshot. StorePodInfo(podInfo *framework.PodInfo, nodeName string) error // RemovePodInfo removes the given Pod from the snapshot. @@ -127,6 +132,16 @@ type ClusterSnapshotStore interface { Clear() } +// FastPredicateLister allows querying precomputed internal state for fast predicate checking. +type FastPredicateLister interface { + // PodAffinityCount returns the number of pods matching the given selector in the given topology domain. + PodAffinityCount(topologyKey, topologyValue string, selector labels.Selector) int + // TopologyValueCount returns the number of unique values for the given topology key in the snapshot. + TopologyValueCount(topologyKey string) int + // TopologyDomains returns all unique values for the given topology key in the snapshot. + TopologyDomains(topologyKey string) []string +} + // ErrNodeNotFound means that a node wasn't found in the snapshot. var ErrNodeNotFound = errors.New("node not found") diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/fast_predicates.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/fast_predicates.go new file mode 100644 index 000000000000..08699c0fad08 --- /dev/null +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/fast_predicates.go @@ -0,0 +1,217 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package predicate + +import ( + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" +) + +type FastPredicateState struct { + podAffinitySelectors []labels.Selector + podAntiAffinitySelectors []labels.Selector + topologySpreadSelectors []labels.Selector +} + +func (p *SchedulerPluginRunner) computeFastPredicateState(pod *apiv1.Pod) (*FastPredicateState, error) { + state := &FastPredicateState{} + + affinity := pod.Spec.Affinity + if affinity != nil { + if affinity.PodAffinity != nil { + for _, term := range affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution { + sel, err := metav1.LabelSelectorAsSelector(term.LabelSelector) + if err != nil { + return nil, err + } + state.podAffinitySelectors = append(state.podAffinitySelectors, sel) + } + } + if affinity.PodAntiAffinity != nil { + for _, term := range affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution { + sel, err := metav1.LabelSelectorAsSelector(term.LabelSelector) + if err != nil { + return nil, err + } + state.podAntiAffinitySelectors = append(state.podAntiAffinitySelectors, sel) + } + } + } + + for _, constraint := range pod.Spec.TopologySpreadConstraints { + if constraint.WhenUnsatisfiable == apiv1.DoNotSchedule { + sel, err := metav1.LabelSelectorAsSelector(constraint.LabelSelector) + if err != nil { + return nil, err + } + state.topologySpreadSelectors = append(state.topologySpreadSelectors, sel) + } + } + + return state, nil +} + +func (p *SchedulerPluginRunner) fastCheckPredicates(pod *apiv1.Pod, nodeInfo *framework.NodeInfo, state *FastPredicateState) clustersnapshot.SchedulingError { + lister := p.snapshot.FastPredicateLister() + if lister == nil { + klog.V(4).Infof("FastPredicateLister is nil, skipping fast predicates") + return nil + } + + if err := p.fastCheckPodAffinity(pod, nodeInfo, lister, state); err != nil { + klog.V(4).Infof("fastCheckPodAffinity failed: %v", err) + return err + } + + if err := p.fastCheckPodTopologySpread(pod, nodeInfo, lister, state); err != nil { + klog.V(4).Infof("fastCheckPodTopologySpread failed: %v", err) + return err + } + + return nil +} + +func (p *SchedulerPluginRunner) fastCheckPodAffinity(pod *apiv1.Pod, nodeInfo *framework.NodeInfo, lister clustersnapshot.FastPredicateLister, state *FastPredicateState) clustersnapshot.SchedulingError { + affinity := pod.Spec.Affinity + if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) { + return nil + } + + node := nodeInfo.Node() + + if affinity.PodAffinity != nil { + for i, term := range affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution { + topoValue, ok := node.Labels[term.TopologyKey] + if !ok { + continue + } + var selector labels.Selector + var err error + if state != nil && i < len(state.podAffinitySelectors) { + selector = state.podAffinitySelectors[i] + } else { + selector, err = metav1.LabelSelectorAsSelector(term.LabelSelector) + if err != nil { + return clustersnapshot.NewSchedulingInternalError(pod, err.Error()) + } + } + count := lister.PodAffinityCount(term.TopologyKey, topoValue, selector) + if count == 0 { + return clustersnapshot.NewFailingPredicateError(pod, "InterPodAffinity", []string{interpodaffinity.ErrReasonAffinityRulesNotMatch}, "", "") + } + } + } + + if affinity.PodAntiAffinity != nil { + for i, term := range affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution { + topoValue, ok := node.Labels[term.TopologyKey] + if !ok { + continue + } + var selector labels.Selector + var err error + if state != nil && i < len(state.podAntiAffinitySelectors) { + selector = state.podAntiAffinitySelectors[i] + } else { + selector, err = metav1.LabelSelectorAsSelector(term.LabelSelector) + if err != nil { + return clustersnapshot.NewSchedulingInternalError(pod, err.Error()) + } + } + count := lister.PodAffinityCount(term.TopologyKey, topoValue, selector) + if count > 0 { + return clustersnapshot.NewFailingPredicateError(pod, "InterPodAffinity", []string{interpodaffinity.ErrReasonAntiAffinityRulesNotMatch}, "", "") + } + } + } + + return nil +} + +func (p *SchedulerPluginRunner) fastCheckPodTopologySpread(pod *apiv1.Pod, nodeInfo *framework.NodeInfo, lister clustersnapshot.FastPredicateLister, state *FastPredicateState) clustersnapshot.SchedulingError { + if len(pod.Spec.TopologySpreadConstraints) == 0 { + return nil + } + + node := nodeInfo.Node() + + spreadIdx := 0 + for _, constraint := range pod.Spec.TopologySpreadConstraints { + if constraint.WhenUnsatisfiable != apiv1.DoNotSchedule { + continue + } + + topoValue, ok := node.Labels[constraint.TopologyKey] + if !ok { + // If node doesn't have the topology key, it's usually skipped or fails depending on configuration. + spreadIdx++ + continue + } + + var selector labels.Selector + var err error + if state != nil && spreadIdx < len(state.topologySpreadSelectors) { + selector = state.topologySpreadSelectors[spreadIdx] + } else { + selector, err = metav1.LabelSelectorAsSelector(constraint.LabelSelector) + if err != nil { + return clustersnapshot.NewSchedulingInternalError(pod, err.Error()) + } + } + spreadIdx++ + + count := lister.PodAffinityCount(constraint.TopologyKey, topoValue, selector) + + // To implement maxSkew, we need minCount across all domains. + minCount := p.getMinTopologyCount(constraint.TopologyKey, selector, lister) + + if int32(count+1-minCount) > constraint.MaxSkew { + return clustersnapshot.NewFailingPredicateError(pod, "PodTopologySpread", []string{podtopologyspread.ErrReasonConstraintsNotMatch}, "", "") + } + } + + return nil +} + +func (p *SchedulerPluginRunner) getMinTopologyCount(topoKey string, selector labels.Selector, lister clustersnapshot.FastPredicateLister) int { + domains := lister.TopologyDomains(topoKey) + if len(domains) == 0 { + return 0 + } + + minCount := -1 + for _, domain := range domains { + count := lister.PodAffinityCount(topoKey, domain, selector) + if minCount == -1 || count < minCount { + minCount = count + } + if minCount == 0 { + break + } + } + + if minCount == -1 { + return 0 + } + return minCount +} diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go index 778820df5789..4deb152b424c 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go @@ -19,6 +19,7 @@ package predicate import ( "context" "fmt" + "math" "strings" "sync" @@ -32,19 +33,21 @@ import ( // SchedulerPluginRunner can be used to run various phases of scheduler plugins through the scheduler framework. type SchedulerPluginRunner struct { - fwHandle *framework.Handle - snapshot clustersnapshot.ClusterSnapshot - defaultNodeOrdering clustersnapshot.NodeOrderMapping - parallelism int + fwHandle *framework.Handle + snapshot clustersnapshot.ClusterSnapshot + defaultNodeOrdering clustersnapshot.NodeOrderMapping + parallelism int + fastPredicatesEnabled bool } // NewSchedulerPluginRunner builds a SchedulerPluginRunner. -func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapshot.ClusterSnapshot, parallelism int) *SchedulerPluginRunner { +func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapshot.ClusterSnapshot, parallelism int, fastPredicatesEnabled bool) *SchedulerPluginRunner { return &SchedulerPluginRunner{ - fwHandle: fwHandle, - snapshot: snapshot, - defaultNodeOrdering: clustersnapshot.NewLastIndexOrderMapping(1), - parallelism: parallelism, + fwHandle: fwHandle, + snapshot: snapshot, + defaultNodeOrdering: clustersnapshot.NewLastIndexOrderMapping(1), + parallelism: parallelism, + fastPredicatesEnabled: fastPredicatesEnabled, } } @@ -83,6 +86,15 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, opts nodeOrdering.Reset(nodeInfosList) + var fastState *FastPredicateState + if p.fastPredicatesEnabled { + var err error + fastState, err = p.computeFastPredicateState(pod) + if err != nil { + return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error computing fast predicate state: %v", err)) + } + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() earliestMatch := len(nodeInfosList) @@ -114,6 +126,13 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, opts return } + if p.fastPredicatesEnabled { + if err := p.fastCheckPredicates(pod, nodeInfo, fastState); err != nil { + // Fast check failed, so this Node won't work. + return + } + } + // Run the Filter phase of the framework. Plugins retrieve the state they saved during PreFilter from CycleState, and answer whether the // given Pod can be scheduled on the given Node. filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo) @@ -131,7 +150,8 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, opts // Filter didn't pass for some plugin, so this Node won't work - move on to the next one. } - workqueue.ParallelizeUntil(ctx, p.parallelism, len(nodeInfosList), checkNode) + chunkSize := chunkSizeFor(len(nodeInfosList), p.parallelism) + workqueue.ParallelizeUntil(ctx, p.parallelism, len(nodeInfosList), checkNode, workqueue.WithChunkSize(chunkSize)) if foundNode != nil { nodeOrdering.MarkMatch(foundIndex) @@ -163,6 +183,16 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string return nil, nil, clustersnapshot.NewFailingPredicateError(pod, strings.Join(nodeFilteringPlugins.UnsortedList(), ", "), nil, "PreFilter filtered the Node out", "") } + if p.fastPredicatesEnabled { + fastState, err := p.computeFastPredicateState(pod) + if err != nil { + return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error computing fast predicate state: %v", err)) + } + if err := p.fastCheckPredicates(pod, nodeInfo, fastState); err != nil { + return nil, nil, err + } + } + // Run the Filter phase of the framework for the Pod and the Node and check the results. See the corresponding comments in RunFiltersUntilPassingNode() for more info. filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo) if !filterStatus.IsSuccess() { @@ -201,3 +231,17 @@ func (p *SchedulerPluginRunner) failingFilterDebugInfo(filterName string, nodeIn return strings.Join(infoParts, ", ") } + +// chunkSizeFor returns a chunk size for the given number of items to use for +// parallel work. The size aims to produce good CPU utilization. +// returns max(1, min(sqrt(n), n/Parallelism)) +func chunkSizeFor(n, parallelism int) int { + s := int(math.Sqrt(float64(n))) + + if r := n/parallelism + 1; s > r { + s = r + } else if s < 1 { + s = 1 + } + return s +} diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go index b8ed5e4fa3b7..ff2488b00fd7 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go @@ -506,12 +506,29 @@ func newTestPluginRunnerAndSnapshot(schedConfig *config.KubeSchedulerConfigurati schedConfig = defaultConfig } - fwHandle, err := framework.NewHandle(context.Background(), informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig, true, false) + fwHandle, err := framework.NewHandle(context.Background(), informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig, true, false, false) if err != nil { return nil, nil, err } - snapshot := NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, true, 1, false) - return NewSchedulerPluginRunner(fwHandle, snapshot, 1), snapshot, nil + snapshot := NewPredicateSnapshot(store.NewBasicSnapshotStore(false), fwHandle, true, 1, false, false) + return NewSchedulerPluginRunner(fwHandle, snapshot, 1, false), snapshot, nil +} + +func newTestPluginRunnerAndSnapshotFast(schedConfig *config.KubeSchedulerConfiguration) (*SchedulerPluginRunner, clustersnapshot.ClusterSnapshot, error) { + if schedConfig == nil { + defaultConfig, err := scheduler_config_latest.Default() + if err != nil { + return nil, nil, err + } + schedConfig = defaultConfig + } + + fwHandle, err := framework.NewHandle(context.Background(), informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig, true, false, true) + if err != nil { + return nil, nil, err + } + snapshot := NewPredicateSnapshot(store.NewDeltaSnapshotStore(16, true), fwHandle, true, 1, false, true) + return NewSchedulerPluginRunner(fwHandle, snapshot, 1, true), snapshot, nil } func BenchmarkRunFiltersUntilPassingNode(b *testing.B) { @@ -535,7 +552,7 @@ func BenchmarkRunFiltersUntilPassingNode(b *testing.B) { lastNode := BuildTestNode(lastNodeName, 1000, 1000) nodes = append(nodes, lastNode) - pluginRunner, snapshot, err := newTestPluginRunnerAndSnapshot(nil) + pluginRunner, snapshot, err := newTestPluginRunnerAndSnapshotFast(nil) assert.NoError(b, err) for _, node := range nodes { diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go index caae92b58fa6..a2d9a4dc3a8c 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go @@ -41,17 +41,19 @@ type PredicateSnapshot struct { draEnabled bool enableCSINodeAwareScheduling bool parallelism int + fastPredicatesEnabled bool draSnapshot *drasnapshot.Snapshot csiSnapshot *csisnapshot.Snapshot } // NewPredicateSnapshot builds a PredicateSnapshot. -func NewPredicateSnapshot(snapshotStore clustersnapshot.ClusterSnapshotStore, fwHandle *framework.Handle, draEnabled bool, parallelism int, enableCSINodeAwareScheduling bool) *PredicateSnapshot { +func NewPredicateSnapshot(snapshotStore clustersnapshot.ClusterSnapshotStore, fwHandle *framework.Handle, draEnabled bool, parallelism int, enableCSINodeAwareScheduling bool, fastPredicatesEnabled bool) *PredicateSnapshot { snapshot := &PredicateSnapshot{ ClusterSnapshotStore: snapshotStore, draEnabled: draEnabled, enableCSINodeAwareScheduling: enableCSINodeAwareScheduling, parallelism: parallelism, + fastPredicatesEnabled: fastPredicatesEnabled, draSnapshot: drasnapshot.NewEmptySnapshot(), csiSnapshot: csisnapshot.NewEmptySnapshot(), } @@ -59,7 +61,7 @@ func NewPredicateSnapshot(snapshotStore clustersnapshot.ClusterSnapshotStore, fw // which operate on *framework.NodeInfo. The only object that allows obtaining *framework.NodeInfos is PredicateSnapshot, so we have an ugly circular // dependency between PluginRunner and PredicateSnapshot. // TODO: Refactor PluginRunner so that it doesn't depend on PredicateSnapshot (e.g. move retrieving NodeInfos out of PluginRunner, to PredicateSnapshot). - snapshot.pluginRunner = NewSchedulerPluginRunner(fwHandle, snapshot, parallelism) + snapshot.pluginRunner = NewSchedulerPluginRunner(fwHandle, snapshot, parallelism, fastPredicatesEnabled) return snapshot } @@ -165,12 +167,13 @@ func (s *PredicateSnapshot) setClusterStatePodsParallelized(nodeInfos []*framewo } ctx := context.Background() + chunkSize := chunkSizeFor(len(nodeInfos), s.parallelism) workqueue.ParallelizeUntil(ctx, s.parallelism, len(nodeInfos), func(nodeIdx int) { nodeInfo := nodeInfos[nodeIdx] for _, pi := range podInfosForNode[nodeIdx] { nodeInfo.AddPodInfo(pi) } - }) + }, workqueue.WithChunkSize(chunkSize)) return nil } @@ -386,6 +389,11 @@ func (s *PredicateSnapshot) CheckPredicates(pod *apiv1.Pod, nodeName string) clu return err } +// FastPredicateLister returns an interface that allows querying internal state for fast predicate checking. +func (s *PredicateSnapshot) FastPredicateLister() clustersnapshot.FastPredicateLister { + return s.ClusterSnapshotStore.FastPredicateLister() +} + // DraSnapshot returns an interface that allows accessing and modifying the DRA objects in the snapshot. func (s *PredicateSnapshot) DraSnapshot() *drasnapshot.Snapshot { return s.draSnapshot diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go index a379e1ed8c42..55381efc9077 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go @@ -17,6 +17,7 @@ limitations under the License. package predicate import ( + "context" "fmt" "maps" "math/rand" @@ -40,6 +41,8 @@ import ( drautils "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/utils" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" + "k8s.io/client-go/informers" + clientsetfake "k8s.io/client-go/kubernetes/fake" featuretesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/features" schedulerimpl "k8s.io/kubernetes/pkg/scheduler/framework" @@ -51,14 +54,21 @@ var snapshots = map[string]func() (clustersnapshot.ClusterSnapshot, error){ if err != nil { return nil, err } - return NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, true, 1, true), nil + return NewPredicateSnapshot(store.NewBasicSnapshotStore(false), fwHandle, true, 1, true, false), nil }, "delta": func() (clustersnapshot.ClusterSnapshot, error) { fwHandle, err := framework.NewTestFrameworkHandle() if err != nil { return nil, err } - return NewPredicateSnapshot(store.NewDeltaSnapshotStore(16), fwHandle, true, 1, true), nil + return NewPredicateSnapshot(store.NewDeltaSnapshotStore(16, false), fwHandle, true, 1, true, false), nil + }, + "fast": func() (clustersnapshot.ClusterSnapshot, error) { + fwHandle, err := framework.NewHandle(context.Background(), informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), nil, true, true, true) + if err != nil { + return nil, err + } + return NewPredicateSnapshot(store.NewDeltaSnapshotStore(16, true), fwHandle, true, 1, true, true), nil }, } @@ -2042,8 +2052,119 @@ func TestSetClusterStateConcurrentDRA(t *testing.T) { assert.NoError(t, err) // Set parallelism to 8 to ensure the workqueue utilizes multiple goroutines. - snapshot := NewPredicateSnapshot(store.NewBasicSnapshotStore(), fwHandle, true, 8, false) + snapshot := NewPredicateSnapshot(store.NewBasicSnapshotStore(false), fwHandle, true, 8, false, false) err = snapshot.SetClusterState(nodes, pods, draSnap, nil) assert.NoError(t, err) } + +func TestSchedulingPredicatesMatch(t *testing.T) { + for name, snapshotFactory := range snapshots { + t.Run(name, func(t *testing.T) { + snapshot, err := snapshotFactory() + assert.NoError(t, err) + + node1 := BuildTestNode("node1", 100, 1000) + node1.Labels = map[string]string{"zone": "z1", "region": "r1"} + + node2 := BuildTestNode("node2", 100, 1000) + node2.Labels = map[string]string{"zone": "z1", "region": "r1"} + + node3 := BuildTestNode("node3", 100, 1000) + node3.Labels = map[string]string{"zone": "z2", "region": "r1"} + + pod1 := BuildTestPod("pod1", 1, 1) + pod1.Labels = map[string]string{"app": "foo", "tier": "backend"} + pod1.Spec.NodeName = "node1" + + pod2 := BuildTestPod("pod2", 1, 1) + pod2.Labels = map[string]string{"app": "foo", "tier": "frontend"} + pod2.Spec.NodeName = "node1" + + pod3 := BuildTestPod("pod3", 1, 1) + pod3.Labels = map[string]string{"app": "bar", "tier": "backend"} + pod3.Spec.NodeName = "node3" + + // 1. Initial State + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node1, pod1, pod2)) + assert.NoError(t, err) + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node2)) + assert.NoError(t, err) + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node3, pod3)) + assert.NoError(t, err) + + // Test PodAntiAffinity + podAntiAffinity := BuildTestPod("pod-anti-affinity", 1, 1) + podAntiAffinity.Spec.Affinity = &apiv1.Affinity{ + PodAntiAffinity: &apiv1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []apiv1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": "foo"}}, + TopologyKey: "zone", + }, + }, + }, + } + + // Should fail on node1 and node2 (zone z1 has app=foo pods) + err = snapshot.SchedulePod(podAntiAffinity, "node1") + assert.Error(t, err) + err = snapshot.SchedulePod(podAntiAffinity, "node2") + assert.Error(t, err) + + // Should succeed on node3 (zone z2 has no app=foo pods) + err = snapshot.SchedulePod(podAntiAffinity, "node3") + assert.NoError(t, err) + err = snapshot.UnschedulePod("default", "pod-anti-affinity", "node3") + assert.NoError(t, err) + + // Test TopologySpreadConstraints + podSpread := BuildTestPod("pod-spread", 1, 1) + podSpread.Labels = map[string]string{"app": "foo"} + podSpread.Spec.TopologySpreadConstraints = []apiv1.TopologySpreadConstraint{ + { + MaxSkew: 1, + TopologyKey: "zone", + WhenUnsatisfiable: apiv1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": "foo"}}, + }, + } + + // Currently app=foo pods: 2 in z1, 0 in z2. Skew if scheduled to z1 = 3-0=3 (fails). + err = snapshot.SchedulePod(podSpread, "node1") + assert.Error(t, err) + + // Skew if scheduled to z2 = 2-1=1 (succeeds) + err = snapshot.SchedulePod(podSpread, "node3") + assert.NoError(t, err) + err = snapshot.UnschedulePod("default", "pod-spread", "node3") + assert.NoError(t, err) + + // Forking behavior + snapshot.Fork() + + // Add an app=foo pod to z2 (node3) manually. + pod4 := BuildTestPod("pod4", 1, 1) + pod4.Labels = map[string]string{"app": "foo"} + err = snapshot.ForceAddPod(pod4, "node3") + assert.NoError(t, err) + + // Now app=foo pods: 2 in z1, 1 in z2. + // Scheduling podSpread on z1 -> 3 in z1 vs 1 in z2 -> skew 2 -> fails. + err = snapshot.SchedulePod(podSpread, "node1") + assert.Error(t, err) + + // Scheduling podAntiAffinity on z2 -> 1 app=foo pod in z2 -> fails. + err = snapshot.SchedulePod(podAntiAffinity, "node3") + assert.Error(t, err) + + snapshot.Revert() + + // Back to 2 in z1, 0 in z2. + err = snapshot.SchedulePod(podAntiAffinity, "node3") + assert.NoError(t, err) + err = snapshot.UnschedulePod("default", "pod-anti-affinity", "node3") + assert.NoError(t, err) + }) + } +} diff --git a/cluster-autoscaler/simulator/clustersnapshot/store/basic.go b/cluster-autoscaler/simulator/clustersnapshot/store/basic.go index 4f488499218e..b0ecfaf8e936 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/store/basic.go +++ b/cluster-autoscaler/simulator/clustersnapshot/store/basic.go @@ -30,12 +30,14 @@ import ( // BasicSnapshotStore is simple, reference implementation of ClusterSnapshotStore. // It is inefficient. But hopefully bug-free and good for initial testing. type BasicSnapshotStore struct { - data []*internalBasicSnapshotData + data []*internalBasicSnapshotData + fastPredicatesEnabled bool } type internalBasicSnapshotData struct { nodeInfoMap map[string]schedulerinterface.NodeInfo pvcNamespacePodMap map[string]map[string]bool + fastPredicateIndex *fastPredicateIndex } func (data *internalBasicSnapshotData) listNodeInfos() []schedulerinterface.NodeInfo { @@ -120,11 +122,15 @@ func (data *internalBasicSnapshotData) removePvcUsedByPod(pod *apiv1.Pod) { } } -func newInternalBasicSnapshotData() *internalBasicSnapshotData { - return &internalBasicSnapshotData{ +func newInternalBasicSnapshotData(fastPredicatesEnabled bool) *internalBasicSnapshotData { + data := &internalBasicSnapshotData{ nodeInfoMap: make(map[string]schedulerinterface.NodeInfo), pvcNamespacePodMap: make(map[string]map[string]bool), } + if fastPredicatesEnabled { + data.fastPredicateIndex = newFastPredicateIndex() + } + return data } func (data *internalBasicSnapshotData) clone() *internalBasicSnapshotData { @@ -139,10 +145,14 @@ func (data *internalBasicSnapshotData) clone() *internalBasicSnapshotData { clonedPvcNamespaceNodeMap[k][k1] = v1 } } - return &internalBasicSnapshotData{ + clonedData := &internalBasicSnapshotData{ nodeInfoMap: clonedNodeInfoMap, pvcNamespacePodMap: clonedPvcNamespaceNodeMap, } + if data.fastPredicateIndex != nil { + clonedData.fastPredicateIndex = data.fastPredicateIndex.clone() + } + return clonedData } func (data *internalBasicSnapshotData) addNodeInfo(nodeInfo schedulerinterface.NodeInfo) error { @@ -154,26 +164,43 @@ func (data *internalBasicSnapshotData) addNodeInfo(nodeInfo schedulerinterface.N for _, podInfo := range nodeInfo.GetPods() { data.addPvcUsedByPod(podInfo.GetPod()) } + if data.fastPredicateIndex != nil { + data.fastPredicateIndex.addNode(nodeInfo.Node()) + for _, podInfo := range nodeInfo.GetPods() { + data.fastPredicateIndex.addPod(podInfo.GetPod(), nodeInfo.Node()) + } + } return nil } func (data *internalBasicSnapshotData) removeNodeInfo(nodeName string) error { - if _, found := data.nodeInfoMap[nodeName]; !found { + nodeInfo, found := data.nodeInfoMap[nodeName] + if !found { return clustersnapshot.ErrNodeNotFound } - for _, pod := range data.nodeInfoMap[nodeName].GetPods() { + for _, pod := range nodeInfo.GetPods() { data.removePvcUsedByPod(pod.GetPod()) } + if data.fastPredicateIndex != nil { + for _, pod := range nodeInfo.GetPods() { + data.fastPredicateIndex.removePod(pod.GetPod(), nodeInfo.Node()) + } + data.fastPredicateIndex.removeNode(nodeInfo.Node()) + } delete(data.nodeInfoMap, nodeName) return nil } func (data *internalBasicSnapshotData) addPodInfo(podInfo schedulerinterface.PodInfo, nodeName string) error { - if _, found := data.nodeInfoMap[nodeName]; !found { + nodeInfo, found := data.nodeInfoMap[nodeName] + if !found { return clustersnapshot.ErrNodeNotFound } - data.nodeInfoMap[nodeName].AddPodInfo(podInfo) + nodeInfo.AddPodInfo(podInfo) data.addPvcUsedByPod(podInfo.GetPod()) + if data.fastPredicateIndex != nil { + data.fastPredicateIndex.addPod(podInfo.GetPod(), nodeInfo.Node()) + } return nil } @@ -186,9 +213,15 @@ func (data *internalBasicSnapshotData) removePod(namespace, podName, nodeName st for _, podInfo := range nodeInfo.GetPods() { if podInfo.GetPod().Namespace == namespace && podInfo.GetPod().Name == podName { data.removePvcUsedByPod(podInfo.GetPod()) + if data.fastPredicateIndex != nil { + data.fastPredicateIndex.removePod(podInfo.GetPod(), nodeInfo.Node()) + } err := nodeInfo.RemovePod(logger, podInfo.GetPod()) if err != nil { data.addPvcUsedByPod(podInfo.GetPod()) + if data.fastPredicateIndex != nil { + data.fastPredicateIndex.addPod(podInfo.GetPod(), nodeInfo.Node()) + } return fmt.Errorf("cannot remove pod; %v", err) } return nil @@ -198,8 +231,10 @@ func (data *internalBasicSnapshotData) removePod(namespace, podName, nodeName st } // NewBasicSnapshotStore creates instances of BasicSnapshotStore. -func NewBasicSnapshotStore() *BasicSnapshotStore { - snapshot := &BasicSnapshotStore{} +func NewBasicSnapshotStore(fastPredicatesEnabled bool) *BasicSnapshotStore { + snapshot := &BasicSnapshotStore{ + fastPredicatesEnabled: fastPredicatesEnabled, + } snapshot.Clear() return snapshot } @@ -213,6 +248,15 @@ func (snapshot *BasicSnapshotStore) RemoveNodeInfo(nodeName string) error { return snapshot.getInternalData().removeNodeInfo(nodeName) } +// FastPredicateLister returns an interface that allows querying internal state for fast predicate checking. +func (snapshot *BasicSnapshotStore) FastPredicateLister() clustersnapshot.FastPredicateLister { + data := snapshot.getInternalData() + if data.fastPredicateIndex != nil { + return data.fastPredicateIndex + } + return nil +} + // StoreNodeInfo adds the given *framework.NodeInfo to the snapshot without checking scheduler predicates. func (snapshot *BasicSnapshotStore) StoreNodeInfo(nodeInfo *framework.NodeInfo) error { return snapshot.getInternalData().addNodeInfo(nodeInfo) @@ -259,7 +303,7 @@ func (snapshot *BasicSnapshotStore) Commit() error { // Clear reset cluster snapshot to empty, unforked state func (snapshot *BasicSnapshotStore) Clear() { - baseData := newInternalBasicSnapshotData() + baseData := newInternalBasicSnapshotData(snapshot.fastPredicatesEnabled) snapshot.data = []*internalBasicSnapshotData{baseData} } diff --git a/cluster-autoscaler/simulator/clustersnapshot/store/delta.go b/cluster-autoscaler/simulator/clustersnapshot/store/delta.go index bf08d70c33a0..87d10916a0d2 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/store/delta.go +++ b/cluster-autoscaler/simulator/clustersnapshot/store/delta.go @@ -48,8 +48,9 @@ import ( // memory and time complexities of DeltaSnapshotStore - they are optimized for // cluster autoscaler operations type DeltaSnapshotStore struct { - data *internalDeltaSnapshotData - parallelism int + data *internalDeltaSnapshotData + parallelism int + fastPredicatesEnabled bool } type deltaSnapshotStoreNodeLister DeltaSnapshotStore @@ -66,14 +67,19 @@ type internalDeltaSnapshotData struct { havePodsWithAffinity []schedulerinterface.NodeInfo havePodsWithRequiredAntiAffinity []schedulerinterface.NodeInfo pvcNamespaceMap map[string]int + fastPredicateIndex *fastPredicateIndex } -func newInternalDeltaSnapshotData() *internalDeltaSnapshotData { - return &internalDeltaSnapshotData{ +func newInternalDeltaSnapshotData(fastPredicatesEnabled bool) *internalDeltaSnapshotData { + data := &internalDeltaSnapshotData{ addedNodeInfoMap: make(map[string]schedulerinterface.NodeInfo), modifiedNodeInfoMap: make(map[string]schedulerinterface.NodeInfo), deletedNodeInfos: make(map[string]bool), } + if fastPredicatesEnabled { + data.fastPredicateIndex = newFastPredicateIndex() + } + return data } func (data *internalDeltaSnapshotData) getNodeInfo(name string) (schedulerinterface.NodeInfo, bool) { @@ -162,6 +168,13 @@ func (data *internalDeltaSnapshotData) addNodeInfo(nodeInfo schedulerinterface.N data.clearPodCaches() } + if data.fastPredicateIndex != nil { + data.fastPredicateIndex.addNode(nodeInfo.Node()) + for _, podInfo := range nodeInfo.GetPods() { + data.fastPredicateIndex.addPod(podInfo.GetPod(), nodeInfo.Node()) + } + } + return nil } @@ -178,6 +191,14 @@ func (data *internalDeltaSnapshotData) clearPodCaches() { } func (data *internalDeltaSnapshotData) removeNodeInfo(nodeName string) error { + nodeInfo, found := data.getNodeInfoLocal(nodeName) + if !found { + ni, foundInBase := data.baseData.getNodeInfo(nodeName) + if foundInBase { + nodeInfo = ni + } + } + _, foundInDelta := data.addedNodeInfoMap[nodeName] if foundInDelta { // If node was added within this delta, delete this change. @@ -205,6 +226,13 @@ func (data *internalDeltaSnapshotData) removeNodeInfo(nodeName string) error { return clustersnapshot.ErrNodeNotFound } + if data.fastPredicateIndex != nil && nodeInfo != nil { + for _, pod := range nodeInfo.GetPods() { + data.fastPredicateIndex.removePod(pod.GetPod(), nodeInfo.Node()) + } + data.fastPredicateIndex.removeNode(nodeInfo.Node()) + } + // Maybe consider deleting from the lists instead. Maybe not. data.clearCaches() return nil @@ -235,6 +263,10 @@ func (data *internalDeltaSnapshotData) addPodInfo(podInfo schedulerinterface.Pod ni.AddPodInfo(podInfo) + if data.fastPredicateIndex != nil { + data.fastPredicateIndex.addPod(podInfo.GetPod(), ni.Node()) + } + // Maybe consider deleting from the list in the future. Maybe not. data.clearCaches() return nil @@ -253,7 +285,13 @@ func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName strin logger := klog.Background() for _, podInfo := range ni.GetPods() { if podInfo.GetPod().Namespace == namespace && podInfo.GetPod().Name == name { + if data.fastPredicateIndex != nil { + data.fastPredicateIndex.removePod(podInfo.GetPod(), ni.Node()) + } if err := ni.RemovePod(logger, podInfo.GetPod()); err != nil { + if data.fastPredicateIndex != nil { + data.fastPredicateIndex.addPod(podInfo.GetPod(), ni.Node()) + } return fmt.Errorf("cannot remove pod; %v", err) } podFound = true @@ -285,8 +323,15 @@ func (data *internalDeltaSnapshotData) isPVCUsedByPods(key string) bool { } func (data *internalDeltaSnapshotData) fork() *internalDeltaSnapshotData { - forkedData := newInternalDeltaSnapshotData() + fastPredicatesEnabled := data.fastPredicateIndex != nil + forkedData := newInternalDeltaSnapshotData(fastPredicatesEnabled) forkedData.baseData = data + if fastPredicatesEnabled { + // Reuse the same index object and call Fork on it. + // fastPredicateIndex uses PatchSet, which handles O(1) Fork/Revert/Commit. + forkedData.fastPredicateIndex = data.fastPredicateIndex + forkedData.fastPredicateIndex.Fork() + } return forkedData } @@ -295,6 +340,11 @@ func (data *internalDeltaSnapshotData) commit() (*internalDeltaSnapshotData, err // do nothing... as in basic snapshot. return data, nil } + + if data.fastPredicateIndex != nil { + data.fastPredicateIndex.Commit() + } + for node := range data.deletedNodeInfos { if err := data.baseData.removeNodeInfo(node); err != nil { return nil, err @@ -388,9 +438,10 @@ func (snapshot *DeltaSnapshotStore) StorageInfos() schedulerinterface.StorageInf } // NewDeltaSnapshotStore creates instances of DeltaSnapshotStore. -func NewDeltaSnapshotStore(parallelism int) *DeltaSnapshotStore { +func NewDeltaSnapshotStore(parallelism int, fastPredicatesEnabled bool) *DeltaSnapshotStore { snapshot := &DeltaSnapshotStore{ - parallelism: parallelism, + parallelism: parallelism, + fastPredicatesEnabled: fastPredicatesEnabled, } snapshot.Clear() return snapshot @@ -401,6 +452,14 @@ func (snapshot *DeltaSnapshotStore) RemoveNodeInfo(nodeName string) error { return snapshot.data.removeNodeInfo(nodeName) } +// FastPredicateLister returns an interface that allows querying internal state for fast predicate checking. +func (snapshot *DeltaSnapshotStore) FastPredicateLister() clustersnapshot.FastPredicateLister { + if snapshot.data.fastPredicateIndex != nil { + return snapshot.data.fastPredicateIndex + } + return nil +} + // StoreNodeInfo adds the given *framework.NodeInfo to the snapshot without checking scheduler predicates. func (snapshot *DeltaSnapshotStore) StoreNodeInfo(nodeInfo *framework.NodeInfo) error { return snapshot.data.addNodeInfo(nodeInfo) @@ -431,6 +490,9 @@ func (snapshot *DeltaSnapshotStore) Fork() { // Time: O(1) func (snapshot *DeltaSnapshotStore) Revert() { if snapshot.data.baseData != nil { + if snapshot.data.fastPredicateIndex != nil { + snapshot.data.fastPredicateIndex.Revert() + } snapshot.data = snapshot.data.baseData } } @@ -449,5 +511,5 @@ func (snapshot *DeltaSnapshotStore) Commit() error { // Clear reset cluster snapshot to empty, unforked state // Time: O(1) func (snapshot *DeltaSnapshotStore) Clear() { - snapshot.data = newInternalDeltaSnapshotData() + snapshot.data = newInternalDeltaSnapshotData(snapshot.fastPredicatesEnabled) } diff --git a/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go b/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go index 6ac97c7abd2e..186a5c9ffbfe 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go @@ -46,7 +46,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { for _, tc := range testCases { b.Run(fmt.Sprintf("fork add 1000 to %d", tc.nodeCount), func(b *testing.B) { nodes := clustersnapshot.CreateTestNodes(tc.nodeCount + 1000) - deltaStore := NewDeltaSnapshotStore(16) + deltaStore := NewDeltaSnapshotStore(16, false) for _, node := range nodes[:tc.nodeCount] { nodeInfo := framework.NewNodeInfo(node, nil) if err := deltaStore.StoreNodeInfo(nodeInfo); err != nil { @@ -70,7 +70,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { for _, tc := range testCases { b.Run(fmt.Sprintf("base %d", tc.nodeCount), func(b *testing.B) { nodes := clustersnapshot.CreateTestNodes(tc.nodeCount) - deltaStore := NewDeltaSnapshotStore(16) + deltaStore := NewDeltaSnapshotStore(16, false) for _, node := range nodes { nodeInfo := framework.NewNodeInfo(node, nil) if err := deltaStore.StoreNodeInfo(nodeInfo); err != nil { diff --git a/cluster-autoscaler/simulator/clustersnapshot/store/fast_predicate_index.go b/cluster-autoscaler/simulator/clustersnapshot/store/fast_predicate_index.go new file mode 100644 index 000000000000..54ab94174c1a --- /dev/null +++ b/cluster-autoscaler/simulator/clustersnapshot/store/fast_predicate_index.go @@ -0,0 +1,283 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/common" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" +) + +type podCountKey struct { + labelSetHash string + topologyKey string + topologyValue string +} + +type nodeCountKey struct { + topologyKey string + topologyValue string +} + +type fastPredicateIndex struct { + // (labelSetHash, topologyKey, topologyValue) -> count + podCounts *common.PatchSet[podCountKey, int] + // labelSetHash -> labels (immutable) + // Ref-counted: entries are removed when no more pods use them. + hashToLabels *common.PatchSet[string, map[string]string] + // labelSetHash -> total count of pods across all nodes + labelSetCounts *common.PatchSet[string, int] + // (topologyKey, topologyValue) -> count of nodes + nodeCounts *common.PatchSet[nodeCountKey, int] + // topologyKey -> set of topology values + topologyDomains *common.PatchSet[string, map[string]bool] +} + +func newFastPredicateIndex() *fastPredicateIndex { + return &fastPredicateIndex{ + podCounts: common.NewPatchSet(common.NewPatch[podCountKey, int]()), + hashToLabels: common.NewPatchSet(common.NewPatch[string, map[string]string]()), + labelSetCounts: common.NewPatchSet(common.NewPatch[string, int]()), + nodeCounts: common.NewPatchSet(common.NewPatch[nodeCountKey, int]()), + topologyDomains: common.NewPatchSet(common.NewPatch[string, map[string]bool]()), + } +} + +func (idx *fastPredicateIndex) Fork() { + idx.podCounts.Fork() + idx.hashToLabels.Fork() + idx.labelSetCounts.Fork() + idx.nodeCounts.Fork() + idx.topologyDomains.Fork() +} + +func (idx *fastPredicateIndex) Revert() { + idx.podCounts.Revert() + idx.hashToLabels.Revert() + idx.labelSetCounts.Revert() + idx.nodeCounts.Revert() + idx.topologyDomains.Revert() +} + +func (idx *fastPredicateIndex) Commit() { + idx.podCounts.Commit() + idx.hashToLabels.Commit() + idx.labelSetCounts.Commit() + idx.nodeCounts.Commit() + idx.topologyDomains.Commit() +} + +func (idx *fastPredicateIndex) addPod(pod *apiv1.Pod, node *apiv1.Node) { + if pod == nil || node == nil { + return + } + labelSetHash := framework.GetPodLabelSetHash(pod) + + // Global ref-counting for labels to avoid unbounded growth. + globalCount, _ := idx.labelSetCounts.FindValue(labelSetHash) + if globalCount == 0 { + labels := make(map[string]string) + for k, v := range pod.Labels { + labels[k] = v + } + idx.hashToLabels.SetCurrent(labelSetHash, labels) + } + idx.labelSetCounts.SetCurrent(labelSetHash, globalCount+1) + + topologyKeys := make(map[string]string) + for k, v := range node.Labels { + topologyKeys[k] = v + } + topologyKeys[apiv1.LabelHostname] = node.Name + + for tk, tv := range topologyKeys { + key := podCountKey{labelSetHash: labelSetHash, topologyKey: tk, topologyValue: tv} + count, _ := idx.podCounts.FindValue(key) + idx.podCounts.SetCurrent(key, count+1) + } +} + +func (idx *fastPredicateIndex) removePod(pod *apiv1.Pod, node *apiv1.Node) { + if pod == nil || node == nil { + return + } + labelSetHash := framework.GetPodLabelSetHash(pod) + + // Global ref-counting for labels. + globalCount, found := idx.labelSetCounts.FindValue(labelSetHash) + if found && globalCount > 0 { + if globalCount == 1 { + idx.labelSetCounts.DeleteCurrent(labelSetHash) + idx.hashToLabels.DeleteCurrent(labelSetHash) + } else { + idx.labelSetCounts.SetCurrent(labelSetHash, globalCount-1) + } + } + + topologyKeys := make(map[string]string) + for k, v := range node.Labels { + topologyKeys[k] = v + } + topologyKeys[apiv1.LabelHostname] = node.Name + + for tk, tv := range topologyKeys { + key := podCountKey{labelSetHash: labelSetHash, topologyKey: tk, topologyValue: tv} + count, found := idx.podCounts.FindValue(key) + if found && count > 0 { + if count == 1 { + idx.podCounts.DeleteCurrent(key) + } else { + idx.podCounts.SetCurrent(key, count-1) + } + } + } +} + +func (idx *fastPredicateIndex) addNode(node *apiv1.Node) { + if node == nil { + return + } + + topologyKeys := make(map[string]string) + for k, v := range node.Labels { + topologyKeys[k] = v + } + topologyKeys[apiv1.LabelHostname] = node.Name + + for tk, tv := range topologyKeys { + key := nodeCountKey{topologyKey: tk, topologyValue: tv} + count, _ := idx.nodeCounts.FindValue(key) + idx.nodeCounts.SetCurrent(key, count+1) + + if count == 0 { + domains, _ := idx.topologyDomains.FindValue(tk) + newDomains := make(map[string]bool) + for d := range domains { + newDomains[d] = true + } + newDomains[tv] = true + idx.topologyDomains.SetCurrent(tk, newDomains) + } + } +} + +func (idx *fastPredicateIndex) removeNode(node *apiv1.Node) { + if node == nil { + return + } + + topologyKeys := make(map[string]string) + for k, v := range node.Labels { + topologyKeys[k] = v + } + topologyKeys[apiv1.LabelHostname] = node.Name + + for tk, tv := range topologyKeys { + key := nodeCountKey{topologyKey: tk, topologyValue: tv} + count, found := idx.nodeCounts.FindValue(key) + if found && count > 0 { + if count == 1 { + idx.nodeCounts.DeleteCurrent(key) + + domains, _ := idx.topologyDomains.FindValue(tk) + newDomains := make(map[string]bool) + for d := range domains { + newDomains[d] = true + } + delete(newDomains, tv) + if len(newDomains) == 0 { + idx.topologyDomains.DeleteCurrent(tk) + } else { + idx.topologyDomains.SetCurrent(tk, newDomains) + } + } else { + idx.nodeCounts.SetCurrent(key, count-1) + } + } + } +} + +func (idx *fastPredicateIndex) PodAffinityCount(topologyKey, topologyValue string, selector labels.Selector) int { + count := 0 + // We iterate over the current view of unique label sets. + // Size is O(UniqueLabelSets), which is usually small. + for hash, labelsMap := range idx.hashToLabels.AsMap() { + if selector.Matches(labels.Set(labelsMap)) { + key := podCountKey{labelSetHash: hash, topologyKey: topologyKey, topologyValue: topologyValue} + c, _ := idx.podCounts.FindValue(key) + count += c + } + } + return count +} + +func (idx *fastPredicateIndex) TopologyValueCount(topologyKey string) int { + domains, found := idx.topologyDomains.FindValue(topologyKey) + if !found { + return 0 + } + return len(domains) +} + +func (idx *fastPredicateIndex) TopologyDomains(topologyKey string) []string { + domainsMap, found := idx.topologyDomains.FindValue(topologyKey) + if !found { + return nil + } + domains := make([]string, 0, len(domainsMap)) + for d := range domainsMap { + domains = append(domains, d) + } + return domains +} + +func (idx *fastPredicateIndex) clone() *fastPredicateIndex { + newIdx := &fastPredicateIndex{ + podCounts: common.ClonePatchSet(idx.podCounts, + func(k podCountKey) podCountKey { return k }, + func(v int) int { return v }), + hashToLabels: common.ClonePatchSet(idx.hashToLabels, + func(k string) string { return k }, + func(v map[string]string) map[string]string { + newMap := make(map[string]string) + for k1, v1 := range v { + newMap[k1] = v1 + } + return newMap + }), + labelSetCounts: common.ClonePatchSet(idx.labelSetCounts, + func(k string) string { return k }, + func(v int) int { return v }), + nodeCounts: common.ClonePatchSet(idx.nodeCounts, + func(k nodeCountKey) nodeCountKey { return k }, + func(v int) int { return v }), + topologyDomains: common.ClonePatchSet(idx.topologyDomains, + func(k string) string { return k }, + func(v map[string]bool) map[string]bool { + newMap := make(map[string]bool) + for k1, v1 := range v { + newMap[k1] = v1 + } + return newMap + }), + } + return newIdx +} + +var _ clustersnapshot.FastPredicateLister = &fastPredicateIndex{} diff --git a/cluster-autoscaler/simulator/clustersnapshot/testsnapshot/test_snapshot.go b/cluster-autoscaler/simulator/clustersnapshot/testsnapshot/test_snapshot.go index 37105fdd9b1b..0c452a14044b 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/testsnapshot/test_snapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/testsnapshot/test_snapshot.go @@ -48,7 +48,7 @@ func NewCustomTestSnapshotOrDie(t testFailer, snapshotStore clustersnapshot.Clus // NewTestSnapshotAndHandle returns an instance of ClusterSnapshot and a framework handle that can be used in tests. func NewTestSnapshotAndHandle() (clustersnapshot.ClusterSnapshot, *framework.Handle, error) { - return NewCustomTestSnapshotAndHandle(store.NewBasicSnapshotStore()) + return NewCustomTestSnapshotAndHandle(store.NewBasicSnapshotStore(false)) } // NewCustomTestSnapshotAndHandle returns an instance of ClusterSnapshot with a specific ClusterSnapshotStore that can be used in tests. @@ -57,5 +57,5 @@ func NewCustomTestSnapshotAndHandle(snapshotStore clustersnapshot.ClusterSnapsho if err != nil { return nil, nil, err } - return predicate.NewPredicateSnapshot(snapshotStore, testFwHandle, true, 1, false), testFwHandle, nil + return predicate.NewPredicateSnapshot(snapshotStore, testFwHandle, true, 1, false, false), testFwHandle, nil } diff --git a/cluster-autoscaler/simulator/framework/handle.go b/cluster-autoscaler/simulator/framework/handle.go index 425320d17386..3991141e7a95 100644 --- a/cluster-autoscaler/simulator/framework/handle.go +++ b/cluster-autoscaler/simulator/framework/handle.go @@ -41,7 +41,7 @@ type Handle struct { } // NewHandle builds a framework Handle based on the provided informers and scheduler config. -func NewHandle(ctx context.Context, informerFactory informers.SharedInformerFactory, schedConfig *schedulerconfig.KubeSchedulerConfiguration, draEnabled bool, csiEnabled bool) (*Handle, error) { +func NewHandle(ctx context.Context, informerFactory informers.SharedInformerFactory, schedConfig *schedulerconfig.KubeSchedulerConfiguration, draEnabled bool, csiEnabled bool, fastPredicatesEnabled bool) (*Handle, error) { if schedConfig == nil { var err error schedConfig, err = schedulerconfiglatest.Default() @@ -49,6 +49,9 @@ func NewHandle(ctx context.Context, informerFactory informers.SharedInformerFact return nil, fmt.Errorf("couldn't create scheduler config: %v", err) } } + if fastPredicatesEnabled { + DisableFastPredicates(schedConfig) + } if len(schedConfig.Profiles) != 1 { return nil, fmt.Errorf("unexpected scheduler config: expected one scheduler profile only (found %d profiles)", len(schedConfig.Profiles)) } @@ -90,3 +93,24 @@ func NewHandle(ctx context.Context, informerFactory informers.SharedInformerFact DelegatingLister: sharedLister, }, nil } + +// DisableFastPredicates disables InterPodAffinity and PodTopologySpread plugins in the given configuration. +func DisableFastPredicates(cfg *schedulerconfig.KubeSchedulerConfiguration) { + if cfg == nil { + return + } + for i := range cfg.Profiles { + profile := &cfg.Profiles[i] + if profile.Plugins == nil { + profile.Plugins = &schedulerconfig.Plugins{} + } + profile.Plugins.PreFilter.Disabled = append(profile.Plugins.PreFilter.Disabled, + schedulerconfig.Plugin{Name: "InterPodAffinity"}, + schedulerconfig.Plugin{Name: "PodTopologySpread"}, + ) + profile.Plugins.Filter.Disabled = append(profile.Plugins.Filter.Disabled, + schedulerconfig.Plugin{Name: "InterPodAffinity"}, + schedulerconfig.Plugin{Name: "PodTopologySpread"}, + ) + } +} diff --git a/cluster-autoscaler/simulator/framework/infos.go b/cluster-autoscaler/simulator/framework/infos.go index 0d58735f3e8c..19ebf67930bd 100644 --- a/cluster-autoscaler/simulator/framework/infos.go +++ b/cluster-autoscaler/simulator/framework/infos.go @@ -18,6 +18,8 @@ package framework import ( "fmt" + "sort" + "strings" apiv1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1" @@ -178,3 +180,23 @@ func NewNodeInfo(node *apiv1.Node, slices []*resourceapi.ResourceSlice, pods ... } return result } + +// GetPodLabelSetHash returns a deterministic hash of the pod's labels. +func GetPodLabelSetHash(pod *apiv1.Pod) string { + if pod == nil || len(pod.Labels) == 0 { + return "" + } + keys := make([]string, 0, len(pod.Labels)) + for k := range pod.Labels { + keys = append(keys, k) + } + sort.Strings(keys) + var sb strings.Builder + for _, k := range keys { + sb.WriteString(k) + sb.WriteString("=") + sb.WriteString(pod.Labels[k]) + sb.WriteString(",") + } + return sb.String() +} diff --git a/cluster-autoscaler/simulator/framework/test_utils.go b/cluster-autoscaler/simulator/framework/test_utils.go index ce9b745f3789..91623edffdac 100644 --- a/cluster-autoscaler/simulator/framework/test_utils.go +++ b/cluster-autoscaler/simulator/framework/test_utils.go @@ -96,7 +96,7 @@ func NewTestFrameworkHandle() (*Handle, error) { if err != nil { return nil, err } - fwHandle, err := NewHandle(context.Background(), informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), defaultConfig, true, true) + fwHandle, err := NewHandle(context.Background(), informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), defaultConfig, true, true, false) if err != nil { return nil, err }