Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pods sharding processor that allows sharding on user specified node selectors #7309

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ type AutoscalingOptions struct {
CheckCapacityProvisioningRequestMaxBatchSize int
// CheckCapacityProvisioningRequestBatchTimebox is the maximum time to spend processing a batch of provisioning requests
CheckCapacityProvisioningRequestBatchTimebox time.Duration
// PodShardingEnabled indicates if pod sharding is enabled
PodShardingEnabled bool
// PodShardingLabels is a list of labels to use when comparing if two node groups are similar for pod sharding.
PodShardingNodeSelectors []string
}

// KubeClientOptions specify options for kube client
Expand Down
27 changes: 27 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/podsharding"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
"k8s.io/autoscaler/cluster-autoscaler/version"
Expand Down Expand Up @@ -279,6 +280,8 @@ var (
checkCapacityBatchProcessing = flag.Bool("check-capacity-batch-processing", false, "Whether to enable batch processing for check capacity requests.")
checkCapacityProvisioningRequestMaxBatchSize = flag.Int("check-capacity-provisioning-request-max-batch-size", 10, "Maximum number of provisioning requests to process in a single batch.")
checkCapacityProvisioningRequestBatchTimebox = flag.Duration("check-capacity-provisioning-request-batch-timebox", 10*time.Second, "Maximum time to process a batch of provisioning requests.")
podShardingEnabled = flag.Bool("pod-sharding", false, "Enable sharding of pending pods into groups to be handled separately by scale-up algorithm")
podShardingNodeSelector = multiStringFlag("pod-sharding-node-selector", "Label to use for sharding pods. Can be used multiple times.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -457,6 +460,8 @@ func createAutoscalingOptions() config.AutoscalingOptions {
CheckCapacityBatchProcessing: *checkCapacityBatchProcessing,
CheckCapacityProvisioningRequestMaxBatchSize: *checkCapacityProvisioningRequestMaxBatchSize,
CheckCapacityProvisioningRequestBatchTimebox: *checkCapacityProvisioningRequestBatchTimebox,
PodShardingEnabled: *podShardingEnabled,
PodShardingNodeSelectors: *podShardingNodeSelector,
}
}

Expand Down Expand Up @@ -550,6 +555,28 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
podListProcessor.AddProcessor(provreqProcesor)
}

// Add pod sharding related processors if pod sharding is enabled.
if autoscalingOptions.PodShardingEnabled {
klog.Info("Pod sharding is enabled")

PodShardingNodeSelectors := make(map[string]string)
for _, label := range autoscalingOptions.PodShardingNodeSelectors {
parts := strings.Split(label, "=")
if len(parts) != 2 {
klog.Errorf("Invalid pod sharding label: %s", label)
continue
}

PodShardingNodeSelectors[parts[0]] = parts[1]
}

podsharder := podsharding.NewOssPodSharder(PodShardingNodeSelectors)
podshardselector := podsharding.NewLruPodShardSelector()
podShardFilter := podsharding.NewPredicatePodShardFilter()
podShardingProcessor := podsharding.NewPodShardingProcessor(podsharder, podshardselector, podShardFilter)
podListProcessor.AddProcessor(podShardingProcessor)
}

if *proactiveScaleupEnabled {
podInjectionBackoffRegistry := podinjectionbackoff.NewFakePodControllerRegistry()

Expand Down
77 changes: 77 additions & 0 deletions cluster-autoscaler/utils/podsharding/composite_pod_sharder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podsharding

import (
"k8s.io/api/core/v1"
apitypes "k8s.io/apimachinery/pkg/types"
)

// FeatureShardComputeFunction function computes part of shard to which pod belongs, looking at specific feature.
// If function does not detect that pod should be separated from others based on handled feature it should return empty string
type FeatureShardComputeFunction struct {
// Feature is feature name
Feature string

// Function is used to compute shard value for feature.
// Result of computation is mutation done to passed NodeGroupDescriptor.
// After mutations for all features are applied for given pod we end up with final NodeGroupDescriptor which represents
// a shard pod belongs to.
// NodeGroupDescriptor for selected shard is also later used for building NodeInfo to be used for extending set of pods in
// shard using predicate checking.
Function func(*v1.Pod, *NodeGroupDescriptor)
}

// CompositePodSharder is an implementation of PodSharder based on list of features with corresponding
// functions to compute shard value for given feature.
type CompositePodSharder struct {
featureShardComputeFunctions []FeatureShardComputeFunction
}

// NewCompositePodSharder creates new instance of CompositePodSharder
func NewCompositePodSharder(computeFunctions []FeatureShardComputeFunction) *CompositePodSharder {
return &CompositePodSharder{
featureShardComputeFunctions: append([]FeatureShardComputeFunction{}, computeFunctions...),
}
}

// ComputePodShards computes sharding for given set of pods
func (sharder *CompositePodSharder) ComputePodShards(pods []*v1.Pod) []*PodShard {
podShards := make(map[ShardSignature]*PodShard)

for _, pod := range pods {
nodeGroupDescriptor := EmptyNodeGroupDescriptor()
for _, computeFunction := range sharder.featureShardComputeFunctions {
computeFunction.Function(pod, &nodeGroupDescriptor)
}

shardSignature := nodeGroupDescriptor.signature()
if _, found := podShards[shardSignature]; !found {
podShards[shardSignature] = &PodShard{
PodUids: make(map[apitypes.UID]bool),
NodeGroupDescriptor: nodeGroupDescriptor,
}
}
podShards[shardSignature].PodUids[pod.UID] = true
}

var result []*PodShard
for _, podShard := range podShards {
result = append(result, podShard)
}
return result
}
Loading
Loading