Skip to content

Commit 1dbfa0a

Browse files
committed
block unmatched empty domains
1 parent 31a62ea commit 1dbfa0a

File tree

6 files changed

+82
-23
lines changed

6 files changed

+82
-23
lines changed

pkg/controllers/provisioning/provisioner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ func validateKarpenterManagedLabelCanExist(p *corev1.Pod) error {
471471
func (p *Provisioner) convertToPodVolumeRequirements(ctx context.Context, pods []*corev1.Pod) map[*corev1.Pod][]corev1.NodeSelectorRequirement {
472472
var schedulablePods = make(map[*corev1.Pod][]corev1.NodeSelectorRequirement)
473473
for _, pod := range pods {
474-
if err, requirements := p.volumeTopology.GetVolumeRequirements(ctx, pod); err != nil {
474+
if requirements, err := p.volumeTopology.GetVolumeRequirements(ctx, pod); err != nil {
475475
log.FromContext(ctx).WithValues("Pod", klog.KRef(pod.Namespace, pod.Name)).Error(err, "failed getting volume topology requirements")
476476
} else {
477477
schedulablePods[pod] = requirements

pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
168168
clock := &clock.RealClock{}
169169
cluster = state.NewCluster(clock, client, cloudProvider)
170170
domains := map[string]sets.Set[string]{}
171-
topology, err := scheduling.NewTopology(ctx, client, cluster, domains, pods)
171+
podsVolumeRequirements := make(map[*corev1.Pod][]corev1.NodeSelectorRequirement)
172+
topology, err := scheduling.NewTopology(ctx, client, cluster, domains, podsVolumeRequirements)
172173
if err != nil {
173174
b.Fatalf("creating topology, %s", err)
174175
}

pkg/controllers/provisioning/scheduling/topology.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func (t *Topology) AddRequirements(podRequirements, nodeRequirements scheduling.
178178
if nodeRequirements.Has(topology.Key) {
179179
nodeDomains = nodeRequirements.Get(topology.Key)
180180
}
181-
domains := topology.Get(p, podDomains, nodeDomains)
181+
domains := topology.Get(p, podDomains, nodeDomains, len(t.podVolumeRequirements[p]) != 0)
182182
if domains.Len() == 0 {
183183
return nil, topologyError{
184184
topology: topology,
@@ -249,7 +249,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *corev1.Po
249249
return err
250250
}
251251

252-
tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey])
252+
tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, t.cluster, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey])
253253

254254
hash := tg.Hash()
255255
if existing, ok := t.inverseTopologies[hash]; !ok {
@@ -327,7 +327,7 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error {
327327
func (t *Topology) newForTopologies(p *corev1.Pod) []*TopologyGroup {
328328
var topologyGroups []*TopologyGroup
329329
for _, cs := range p.Spec.TopologySpreadConstraints {
330-
topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace), cs.LabelSelector, cs.MaxSkew, cs.MinDomains, t.domains[cs.TopologyKey]))
330+
topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, t.cluster, sets.New(p.Namespace), cs.LabelSelector, cs.MaxSkew, cs.MinDomains, t.domains[cs.TopologyKey]))
331331
}
332332
return topologyGroups
333333
}
@@ -364,7 +364,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *corev1.Pod) ([]*Topo
364364
if err != nil {
365365
return nil, err
366366
}
367-
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey]))
367+
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, t.cluster, namespaces, term.LabelSelector, math.MaxInt32, nil, t.domains[term.TopologyKey]))
368368
}
369369
}
370370
return topologyGroups, nil

pkg/controllers/provisioning/scheduling/topologygroup.go

Lines changed: 70 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"math"
2222

2323
"github.com/awslabs/operatorpkg/option"
24+
"github.com/emicklei/go-restful/v3/log"
2425
"github.com/mitchellh/hashstructure/v2"
2526
"github.com/samber/lo"
2627
v1 "k8s.io/api/core/v1"
@@ -29,6 +30,7 @@ import (
2930
"k8s.io/apimachinery/pkg/types"
3031
"k8s.io/apimachinery/pkg/util/sets"
3132

33+
"sigs.k8s.io/karpenter/pkg/controllers/state"
3234
"sigs.k8s.io/karpenter/pkg/scheduling"
3335
)
3436

@@ -59,6 +61,7 @@ type TopologyGroup struct {
5961
Type TopologyType
6062
maxSkew int32
6163
minDomains *int32
64+
cluster *state.Cluster
6265
namespaces sets.Set[string]
6366
selector labels.Selector
6467
rawSelector *metav1.LabelSelector
@@ -69,7 +72,7 @@ type TopologyGroup struct {
6972
emptyDomains sets.Set[string] // domains for which we know that no pod exists
7073
}
7174

72-
func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, namespaces sets.Set[string], labelSelector *metav1.LabelSelector, maxSkew int32, minDomains *int32, domains sets.Set[string]) *TopologyGroup {
75+
func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, cluster *state.Cluster, namespaces sets.Set[string], labelSelector *metav1.LabelSelector, maxSkew int32, minDomains *int32, domains sets.Set[string]) *TopologyGroup {
7376
domainCounts := map[string]int32{}
7477
for domain := range domains {
7578
domainCounts[domain] = 0
@@ -86,6 +89,7 @@ func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod
8689
return &TopologyGroup{
8790
Type: topologyType,
8891
Key: topologyKey,
92+
cluster: cluster,
8993
namespaces: namespaces,
9094
selector: selector,
9195
rawSelector: labelSelector,
@@ -98,10 +102,10 @@ func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod
98102
}
99103
}
100104

101-
func (t *TopologyGroup) Get(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement) *scheduling.Requirement {
105+
func (t *TopologyGroup) Get(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement, hasVolumeRequirements bool) *scheduling.Requirement {
102106
switch t.Type {
103107
case TopologyTypeSpread:
104-
return t.nextDomainTopologySpread(pod, podDomains, nodeDomains)
108+
return t.nextDomainTopologySpread(pod, podDomains, nodeDomains, hasVolumeRequirements)
105109
case TopologyTypePodAffinity:
106110
return t.nextDomainAffinity(pod, podDomains, nodeDomains)
107111
case TopologyTypePodAntiAffinity:
@@ -175,34 +179,79 @@ func (t *TopologyGroup) Hash() uint64 {
175179
}
176180

177181
// nextDomainTopologySpread returns a scheduling.Requirement that includes a node domain that a pod should be scheduled to.
178-
// If there are multiple eligible domains, we return all eligible domains that satisfies the `maxSkew` configuration.
182+
// If there are multiple eligible domains, we return any random domain that satisfies the `maxSkew` configuration.
179183
// If there are no eligible domains, we return a `DoesNotExist` requirement, implying that we could not satisfy the topologySpread requirement.
180184
// nolint:gocyclo
181-
func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement) *scheduling.Requirement {
185+
func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDomains *scheduling.Requirement, hasVolumeRequirement bool) *scheduling.Requirement {
186+
var nodes = make(map[string][]*v1.Node)
187+
var blockedDomains = sets.New[string]()
188+
var candidateDomains = []string{}
189+
var firstDomains = []string{}
190+
191+
if t.cluster != nil {
192+
for _, node := range t.cluster.Nodes() {
193+
if node == nil || node.Node == nil {
194+
continue
195+
}
196+
if _, ok := node.Node.GetLabels()[t.Key]; !ok {
197+
continue
198+
}
199+
nodes[node.Node.GetLabels()[t.Key]] = append(nodes[node.Node.GetLabels()[t.Key]], node.Node)
200+
}
201+
}
202+
// some empty domains, which all existing nodes with them don't match the pod, should not be in the calculations.
203+
for _, domain := range t.emptyDomains.UnsortedList() {
204+
// no existing node has this domain, so this domain is in nodeclaim and may will be created first time.
205+
if len(nodes[domain]) == 0 {
206+
// if we have volume requirement, we should block the first time domain, since it's skew is always 0 which may break the skew caculations.
207+
if hasVolumeRequirement {
208+
firstDomains = append(firstDomains, domain)
209+
log.Printf("first domain %s", domain)
210+
} else {
211+
continue
212+
}
213+
}
214+
var needBlock = true
215+
for _, node := range nodes[domain] {
216+
if node.GetLabels()[t.Key] == domain && t.nodeFilter.Matches(node) {
217+
needBlock = false
218+
break
219+
}
220+
}
221+
if needBlock {
222+
blockedDomains.Insert(domain)
223+
}
224+
}
182225
// min count is calculated across all domains
183-
min := t.domainMinCount(podDomains)
226+
min := t.domainMinCount(podDomains, blockedDomains)
184227
selfSelecting := t.selects(pod)
185228

186-
candidateDomains := []string{}
229+
minDomain := ""
230+
minCount := int32(math.MaxInt32)
231+
187232
// If we are explicitly selecting on specific node domains ("In" requirement),
188233
// this is going to be more efficient to iterate through
189234
// This is particularly useful when considering the hostname topology key that can have a
190235
// lot of t.domains but only a single nodeDomain
191236
if nodeDomains.Operator() == v1.NodeSelectorOpIn {
192237
for _, domain := range nodeDomains.Values() {
193-
if count, ok := t.domains[domain]; ok {
238+
if count, ok := t.domains[domain]; ok && !blockedDomains.Has(domain) {
194239
if selfSelecting {
195240
count++
196241
}
197242
if count-min <= t.maxSkew {
198243
candidateDomains = append(candidateDomains, domain)
244+
if count < minCount {
245+
minDomain = domain
246+
minCount = count
247+
}
199248
}
200249
}
201250
}
202251
} else {
203252
for domain := range t.domains {
204253
// but we can only choose from the node domains
205-
if nodeDomains.Has(domain) {
254+
if nodeDomains.Has(domain) && !blockedDomains.Has(domain) {
206255
// comment from kube-scheduler regarding the viable choices to schedule to based on skew is:
207256
// 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew'
208257
count := t.domains[domain]
@@ -211,18 +260,26 @@ func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDo
211260
}
212261
if count-min <= t.maxSkew {
213262
candidateDomains = append(candidateDomains, domain)
263+
if count < minCount {
264+
minDomain = domain
265+
minCount = count
266+
}
214267
}
215268
}
216269
}
217270
}
218-
if len(candidateDomains) == 0 {
271+
if minDomain == "" && len(firstDomains) == 0 {
219272
// avoids an error message about 'zone in [""]', preferring 'zone in []'
220273
return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpDoesNotExist)
221274
}
222-
return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, candidateDomains...)
275+
// we should pop all candidate domains for volume requirments
276+
if hasVolumeRequirement {
277+
return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, append(firstDomains, candidateDomains...)...)
278+
}
279+
return scheduling.NewRequirement(podDomains.Key, v1.NodeSelectorOpIn, minDomain)
223280
}
224281

225-
func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement) int32 {
282+
func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement, blockedDomains sets.Set[string]) int32 {
226283
// hostname based topologies always have a min pod count of zero since we can create one
227284
if t.Key == v1.LabelHostname {
228285
return 0
@@ -232,7 +289,7 @@ func (t *TopologyGroup) domainMinCount(domains *scheduling.Requirement) int32 {
232289
var numPodSupportedDomains int32
233290
// determine our current min count
234291
for domain, count := range t.domains {
235-
if domains.Has(domain) {
292+
if domains.Has(domain) && !blockedDomains.Has(domain) {
236293
numPodSupportedDomains++
237294
if count < min {
238295
min = count

pkg/controllers/provisioning/scheduling/topologynodefilter.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func MakeTopologyNodeFilter(p *v1.Pod) TopologyNodeFilter {
5151
}
5252

5353
// Matches returns true if the TopologyNodeFilter doesn't prohibit node from the participating in the topology
54+
// TODO: Node filter should respect nodeAffinityPolicy/nodeTaintsPolicy field in future.
5455
func (t TopologyNodeFilter) Matches(node *v1.Node) bool {
5556
return t.MatchesRequirements(scheduling.NewLabelRequirements(node.Labels))
5657
}

pkg/controllers/provisioning/scheduling/volumetopology.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,23 +39,23 @@ type VolumeTopology struct {
3939
kubeClient client.Client
4040
}
4141

42-
func (v *VolumeTopology) GetVolumeRequirements(ctx context.Context, pod *v1.Pod) (error, []v1.NodeSelectorRequirement) {
42+
func (v *VolumeTopology) GetVolumeRequirements(ctx context.Context, pod *v1.Pod) ([]v1.NodeSelectorRequirement, error) {
4343
var requirements []v1.NodeSelectorRequirement
4444
for _, volume := range pod.Spec.Volumes {
4545
req, err := v.getRequirements(ctx, pod, volume)
4646
if err != nil {
47-
return err, nil
47+
return nil, err
4848
}
4949
requirements = append(requirements, req...)
5050
}
5151
if len(requirements) == 0 {
52-
return nil, requirements
52+
return requirements, nil
5353
}
5454

5555
log.FromContext(ctx).
5656
WithValues("Pod", klog.KRef(pod.Namespace, pod.Name)).
5757
V(1).Info(fmt.Sprintf("found requirements from pod volumes, %s", requirements))
58-
return nil, requirements
58+
return requirements, nil
5959
}
6060

6161
func (v *VolumeTopology) getRequirements(ctx context.Context, pod *v1.Pod, volume v1.Volume) ([]v1.NodeSelectorRequirement, error) {

0 commit comments

Comments
 (0)