From 6d8d793744027f02e3ac1407797967f905188512 Mon Sep 17 00:00:00 2001 From: leo-ryu Date: Thu, 9 Jan 2025 16:37:39 +0800 Subject: [PATCH] block unmatched empty domains --- pkg/controllers/provisioning/provisioner.go | 2 +- .../scheduling/scheduling_benchmark_test.go | 3 +- .../provisioning/scheduling/topology.go | 8 +- .../provisioning/scheduling/topologygroup.go | 81 ++++++++++++++++--- .../scheduling/topologynodefilter.go | 1 + .../provisioning/scheduling/volumetopology.go | 8 +- 6 files changed, 80 insertions(+), 23 deletions(-) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 78f667106e..7711cefd25 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -471,7 +471,7 @@ func validateKarpenterManagedLabelCanExist(p *corev1.Pod) error { func (p *Provisioner) convertToPodVolumeRequirements(ctx context.Context, pods []*corev1.Pod) map[*corev1.Pod][]corev1.NodeSelectorRequirement { var schedulablePods = make(map[*corev1.Pod][]corev1.NodeSelectorRequirement) for _, pod := range pods { - if err, requirements := p.volumeTopology.GetVolumeRequirements(ctx, pod); err != nil { + if requirements, err := p.volumeTopology.GetVolumeRequirements(ctx, pod); err != nil { log.FromContext(ctx).WithValues("Pod", klog.KRef(pod.Namespace, pod.Name)).Error(err, "failed getting volume topology requirements") } else { schedulablePods[pod] = requirements diff --git a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go index 2844b95362..16006899df 100644 --- a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go +++ b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go @@ -168,7 +168,8 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) { clock := &clock.RealClock{} cluster = state.NewCluster(clock, client, cloudProvider) domains := map[string]sets.Set[string]{} - topology, err := scheduling.NewTopology(ctx, client, cluster, domains, pods) + podsVolumeRequirements := make(map[*corev1.Pod][]corev1.NodeSelectorRequirement) + topology, err := scheduling.NewTopology(ctx, client, cluster, domains, podsVolumeRequirements) if err != nil { b.Fatalf("creating topology, %s", err) } diff --git a/pkg/controllers/provisioning/scheduling/topology.go b/pkg/controllers/provisioning/scheduling/topology.go index 9956eedd65..57caed3dc4 100644 --- a/pkg/controllers/provisioning/scheduling/topology.go +++ b/pkg/controllers/provisioning/scheduling/topology.go @@ -178,7 +178,7 @@ func (t *Topology) AddRequirements(podRequirements, nodeRequirements scheduling. if nodeRequirements.Has(topology.Key) { nodeDomains = nodeRequirements.Get(topology.Key) } - domains := topology.Get(p, podDomains, nodeDomains) + domains := topology.Get(p, podDomains, nodeDomains, len(t.podVolumeRequirements[p]) != 0) if domains.Len() == 0 { return nil, topologyError{ topology: topology, @@ -249,7 +249,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *corev1.Po return err } - tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey]) + tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, t.cluster, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey]) hash := tg.Hash() if existing, ok := t.inverseTopologies[hash]; !ok { @@ -327,7 +327,7 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error { func (t *Topology) newForTopologies(p *corev1.Pod) []*TopologyGroup { var topologyGroups []*TopologyGroup for _, cs := range p.Spec.TopologySpreadConstraints { - topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace), cs.LabelSelector, cs.MaxSkew, cs.MinDomains, t.domains[cs.TopologyKey])) + topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, t.cluster, sets.New(p.Namespace), cs.LabelSelector, cs.MaxSkew, cs.MinDomains, t.domains[cs.TopologyKey])) } return topologyGroups } @@ -364,7 +364,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *corev1.Pod) ([]*Topo if err != nil { return nil, err } - topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey])) + topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, t.cluster, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey])) } } return topologyGroups, nil diff --git a/pkg/controllers/provisioning/scheduling/topologygroup.go b/pkg/controllers/provisioning/scheduling/topologygroup.go index fe7376b850..eae701eef5 100644 --- a/pkg/controllers/provisioning/scheduling/topologygroup.go +++ b/pkg/controllers/provisioning/scheduling/topologygroup.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/scheduling" ) @@ -59,6 +60,7 @@ type TopologyGroup struct { Type TopologyType maxSkew int32 minDomains *int32 + cluster *state.Cluster namespaces sets.Set[string] selector labels.Selector rawSelector *metav1.LabelSelector @@ -69,7 +71,7 @@ type TopologyGroup struct { emptyDomains sets.Set[string] // domains for which we know that no pod exists } -func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, namespaces sets.Set[string], labelSelector *metav1.LabelSelector, maxSkew int32, minDomains *int32, domains sets.Set[string]) *TopologyGroup { +func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, cluster *state.Cluster, namespaces sets.Set[string], labelSelector *metav1.LabelSelector, maxSkew int32, minDomains *int32, domains sets.Set[string]) *TopologyGroup { domainCounts := map[string]int32{} for domain := range domains { domainCounts[domain] = 0 @@ -86,6 +88,7 @@ func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod return &TopologyGroup{ Type: topologyType, Key: topologyKey, + cluster: cluster, namespaces: namespaces, selector: selector, rawSelector: labelSelector, @@ -98,10 +101,10 @@ func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod } } -func (t *TopologyGroup) Get(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement) *scheduling.Requirement { +func (t *TopologyGroup) Get(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement, hasVolumeRequirements bool) *scheduling.Requirement { switch t.Type { case TopologyTypeSpread: - return t.nextDomainTopologySpread(pod, podDomains, nodeDomains) + return t.nextDomainTopologySpread(pod, podDomains, nodeDomains, hasVolumeRequirements) case TopologyTypePodAffinity: return t.nextDomainAffinity(pod, podDomains, nodeDomains) case TopologyTypePodAntiAffinity: @@ -175,34 +178,78 @@ func (t *TopologyGroup) Hash() uint64 { } // nextDomainTopologySpread returns a scheduling.Requirement that includes a node domain that a pod should be scheduled to. -// If there are multiple eligible domains, we return all eligible domains that satisfies the `maxSkew` configuration. +// If there are multiple eligible domains, we return any random domain that satisfies the `maxSkew` configuration. // If there are no eligible domains, we return a `DoesNotExist` requirement, implying that we could not satisfy the topologySpread requirement. // nolint:gocyclo -func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement) *scheduling.Requirement { +func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement, hasVolumeRequirement bool) *scheduling.Requirement { + var nodes = make(map[string][]*v1.Node) + var blockedDomains = sets.New[string]() + var candidateDomains = []string{} + var firstDomains = []string{} + + if t.cluster != nil { + for _, node := range t.cluster.Nodes() { + if node == nil || node.Node == nil { + continue + } + if _, ok := node.Node.GetLabels()[t.Key]; !ok { + continue + } + nodes[node.Node.GetLabels()[t.Key]] = append(nodes[node.Node.GetLabels()[t.Key]], node.Node) + } + } + // some empty domains, which all existing nodes with them don't match the pod, should not be in the calculations. + for _, domain := range t.emptyDomains.UnsortedList() { + // no existing node has this domain, so this domain is in nodeclaim and may will be created first time. + if len(nodes[domain]) == 0 { + // if we have volume requirement, we should block the first time domain, since it's skew is always 0 which may break the skew caculations. + if hasVolumeRequirement { + firstDomains = append(firstDomains, domain) + } else { + continue + } + } + var needBlock = true + for _, node := range nodes[domain] { + if node.GetLabels()[t.Key] == domain && t.nodeFilter.Matches(node) { + needBlock = false + break + } + } + if needBlock { + blockedDomains.Insert(domain) + } + } // min count is calculated across all domains - min := t.domainMinCount(podDomains) + min := t.domainMinCount(podDomains, blockedDomains) selfSelecting := t.selects(pod) - candidateDomains := []string{} + minDomain := "" + minCount := int32(math.MaxInt32) + // If we are explicitly selecting on specific node domains ("In" requirement), // this is going to be more efficient to iterate through // This is particularly useful when considering the hostname topology key that can have a // lot of t.domains but only a single nodeDomain if nodeDomains.Operator() == v1.NodeSelectorOpIn { for _, domain := range nodeDomains.Values() { - if count, ok := t.domains[domain]; ok { + if count, ok := t.domains[domain]; ok && !blockedDomains.Has(domain) { if selfSelecting { count++ } if count-min <= t.maxSkew { candidateDomains = append(candidateDomains, domain) + if count < minCount { + minDomain = domain + minCount = count + } } } } } else { for domain := range t.domains { // but we can only choose from the node domains - if nodeDomains.Has(domain) { + if nodeDomains.Has(domain) && !blockedDomains.Has(domain) { // comment from kube-scheduler regarding the viable choices to schedule to based on skew is: // 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew' count := t.domains[domain] @@ -211,18 +258,26 @@ func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDo } if count-min <= t.maxSkew { candidateDomains = append(candidateDomains, domain) + if count < minCount { + minDomain = domain + minCount = count + } } } } } - if len(candidateDomains) == 0 { + if minDomain == "" { // avoids an error message about 'zone in [""]', preferring 'zone in []' return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpDoesNotExist) } - return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, candidateDomains...) + // we should pop all candidate domains for volume requirments + if hasVolumeRequirement { + return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, append(firstDomains, candidateDomains...)...) + } + return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, minDomain) } -func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement) int32 { +func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement, blockedDomains sets.Set[string]) int32 { // hostname based topologies always have a min pod count of zero since we can create one if t.Key == v1.LabelHostname { return 0 @@ -232,7 +287,7 @@ func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement) int32 { var numPodSupportedDomains int32 // determine our current min count for domain, count := range t.domains { - if domains.Has(domain) { + if domains.Has(domain) && !blockedDomains.Has(domain) { numPodSupportedDomains++ if count < min { min = count diff --git a/pkg/controllers/provisioning/scheduling/topologynodefilter.go b/pkg/controllers/provisioning/scheduling/topologynodefilter.go index d73b3b7936..0533880425 100644 --- a/pkg/controllers/provisioning/scheduling/topologynodefilter.go +++ b/pkg/controllers/provisioning/scheduling/topologynodefilter.go @@ -51,6 +51,7 @@ func MakeTopologyNodeFilter(p *v1.Pod) TopologyNodeFilter { } // Matches returns true if the TopologyNodeFilter doesn't prohibit node from the participating in the topology +// TODO: Node filter should respect nodeAffinityPolicy/nodeTaintsPolicy field in future. func (t TopologyNodeFilter) Matches(node *v1.Node) bool { return t.MatchesRequirements(scheduling.NewLabelRequirements(node.Labels)) } diff --git a/pkg/controllers/provisioning/scheduling/volumetopology.go b/pkg/controllers/provisioning/scheduling/volumetopology.go index 6d0411e6d6..4417a89aa2 100644 --- a/pkg/controllers/provisioning/scheduling/volumetopology.go +++ b/pkg/controllers/provisioning/scheduling/volumetopology.go @@ -39,23 +39,23 @@ type VolumeTopology struct { kubeClient client.Client } -func (v *VolumeTopology) GetVolumeRequirements(ctx context.Context, pod *v1.Pod) (error, []v1.NodeSelectorRequirement) { +func (v *VolumeTopology) GetVolumeRequirements(ctx context.Context, pod *v1.Pod) ([]v1.NodeSelectorRequirement, error) { var requirements []v1.NodeSelectorRequirement for _, volume := range pod.Spec.Volumes { req, err := v.getRequirements(ctx, pod, volume) if err != nil { - return err, nil + return nil, err } requirements = append(requirements, req...) } if len(requirements) == 0 { - return nil, requirements + return requirements, nil } log.FromContext(ctx). WithValues("Pod", klog.KRef(pod.Namespace, pod.Name)). V(1).Info(fmt.Sprintf("found requirements from pod volumes, %s", requirements)) - return nil, requirements + return requirements, nil } func (v *VolumeTopology) getRequirements(ctx context.Context, pod *v1.Pod, volume v1.Volume) ([]v1.NodeSelectorRequirement, error) {