Skip to content

Commit 45f73ec

Browse files
jmdealtzneal
andauthored
feat: support new topologySpread scheduling constraints (#852)
Co-authored-by: Todd Neal <[email protected]>
1 parent 057e1ed commit 45f73ec

13 files changed

+871
-158
lines changed

pkg/controllers/provisioning/provisioner.go

+2-38
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
appsv1 "k8s.io/api/apps/v1"
3232
corev1 "k8s.io/api/core/v1"
3333
"k8s.io/apimachinery/pkg/types"
34-
"k8s.io/apimachinery/pkg/util/sets"
3534
"k8s.io/client-go/util/workqueue"
3635
"k8s.io/klog/v2"
3736
"k8s.io/utils/clock"
@@ -236,8 +235,8 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
236235
nodepoolutils.OrderByWeight(nodePools)
237236

238237
instanceTypes := map[string][]*cloudprovider.InstanceType{}
239-
domains := map[string]sets.Set[string]{}
240238
for _, np := range nodePools {
239+
// Get instance type options
241240
its, err := p.cloudProvider.GetInstanceTypes(ctx, np)
242241
if err != nil {
243242
log.FromContext(ctx).WithValues("NodePool", klog.KRef("", np.Name)).Error(err, "skipping, unable to resolve instance types")
@@ -247,49 +246,14 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
247246
log.FromContext(ctx).WithValues("NodePool", klog.KRef("", np.Name)).Info("skipping, no resolved instance types found")
248247
continue
249248
}
250-
251249
instanceTypes[np.Name] = its
252-
253-
// Construct Topology Domains
254-
for _, it := range its {
255-
// We need to intersect the instance type requirements with the current nodePool requirements. This
256-
// ensures that something like zones from an instance type don't expand the universe of valid domains.
257-
requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(np.Spec.Template.Spec.Requirements...)
258-
requirements.Add(scheduling.NewLabelRequirements(np.Spec.Template.Labels).Values()...)
259-
requirements.Add(it.Requirements.Values()...)
260-
261-
for key, requirement := range requirements {
262-
// This code used to execute a Union between domains[key] and requirement.Values().
263-
// The downside of this is that Union is immutable and takes a copy of the set it is executed upon.
264-
// This resulted in a lot of memory pressure on the heap and poor performance
265-
// https://github.com/aws/karpenter/issues/3565
266-
if domains[key] == nil {
267-
domains[key] = sets.New(requirement.Values()...)
268-
} else {
269-
domains[key].Insert(requirement.Values()...)
270-
}
271-
}
272-
}
273-
274-
requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(np.Spec.Template.Spec.Requirements...)
275-
requirements.Add(scheduling.NewLabelRequirements(np.Spec.Template.Labels).Values()...)
276-
for key, requirement := range requirements {
277-
if requirement.Operator() == corev1.NodeSelectorOpIn {
278-
// The following is a performance optimisation, for the explanation see the comment above
279-
if domains[key] == nil {
280-
domains[key] = sets.New(requirement.Values()...)
281-
} else {
282-
domains[key].Insert(requirement.Values()...)
283-
}
284-
}
285-
}
286250
}
287251

288252
// inject topology constraints
289253
pods = p.injectVolumeTopologyRequirements(ctx, pods)
290254

291255
// Calculate cluster topology
292-
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, domains, pods)
256+
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, stateNodes, nodePools, instanceTypes, pods)
293257
if err != nil {
294258
return nil, fmt.Errorf("tracking topology counts, %w", err)
295259
}

pkg/controllers/provisioning/scheduling/existingnode.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func NewExistingNode(n *state.StateNode, topology *Topology, taints []v1.Taint,
6767

6868
func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podData *PodData) error {
6969
// Check Taints
70-
if err := scheduling.Taints(n.cachedTaints).Tolerates(pod); err != nil {
70+
if err := scheduling.Taints(n.cachedTaints).ToleratesPod(pod); err != nil {
7171
return err
7272
}
7373
// determine the volumes that will be mounted if the pod schedules
@@ -100,7 +100,7 @@ func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v
100100
nodeRequirements.Add(podData.Requirements.Values()...)
101101

102102
// Check Topology Requirements
103-
topologyRequirements, err := n.topology.AddRequirements(podData.StrictRequirements, nodeRequirements, pod)
103+
topologyRequirements, err := n.topology.AddRequirements(pod, n.cachedTaints, podData.StrictRequirements, nodeRequirements)
104104
if err != nil {
105105
return err
106106
}
@@ -113,7 +113,7 @@ func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v
113113
n.Pods = append(n.Pods, pod)
114114
n.requests = requests
115115
n.requirements = nodeRequirements
116-
n.topology.Record(pod, nodeRequirements)
116+
n.topology.Record(pod, n.cachedTaints, nodeRequirements)
117117
n.HostPortUsage().Add(pod, hostPorts)
118118
n.VolumeUsage().Add(pod, volumes)
119119
return nil

pkg/controllers/provisioning/scheduling/nodeclaim.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func NewNodeClaim(nodeClaimTemplate *NodeClaimTemplate, topology *Topology, daem
6666

6767
func (n *NodeClaim) Add(pod *v1.Pod, podData *PodData) error {
6868
// Check Taints
69-
if err := scheduling.Taints(n.Spec.Taints).Tolerates(pod); err != nil {
69+
if err := scheduling.Taints(n.Spec.Taints).ToleratesPod(pod); err != nil {
7070
return err
7171
}
7272

@@ -84,7 +84,7 @@ func (n *NodeClaim) Add(pod *v1.Pod, podData *PodData) error {
8484
nodeClaimRequirements.Add(podData.Requirements.Values()...)
8585

8686
// Check Topology Requirements
87-
topologyRequirements, err := n.topology.AddRequirements(podData.StrictRequirements, nodeClaimRequirements, pod, scheduling.AllowUndefinedWellKnownLabels)
87+
topologyRequirements, err := n.topology.AddRequirements(pod, n.NodeClaimTemplate.Spec.Taints, podData.StrictRequirements, nodeClaimRequirements, scheduling.AllowUndefinedWellKnownLabels)
8888
if err != nil {
8989
return err
9090
}
@@ -108,7 +108,7 @@ func (n *NodeClaim) Add(pod *v1.Pod, podData *PodData) error {
108108
n.InstanceTypeOptions = remaining
109109
n.Spec.Resources.Requests = requests
110110
n.Requirements = nodeClaimRequirements
111-
n.topology.Record(pod, nodeClaimRequirements, scheduling.AllowUndefinedWellKnownLabels)
111+
n.topology.Record(pod, n.NodeClaim.Spec.Taints, nodeClaimRequirements, scheduling.AllowUndefinedWellKnownLabels)
112112
n.hostPortUsage.Add(pod, hostPorts)
113113
return nil
114114
}

pkg/controllers/provisioning/scheduling/scheduler.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ func (s *Scheduler) calculateExistingNodeClaims(stateNodes []*state.StateNode, d
345345
taints := node.Taints()
346346
var daemons []*corev1.Pod
347347
for _, p := range daemonSetPods {
348-
if err := scheduling.Taints(taints).Tolerates(p); err != nil {
348+
if err := scheduling.Taints(taints).ToleratesPod(p); err != nil {
349349
continue
350350
}
351351
if err := scheduling.NewLabelRequirements(node.Labels()).Compatible(scheduling.NewPodRequirements(p)); err != nil {
@@ -388,7 +388,7 @@ func isDaemonPodCompatible(nodeClaimTemplate *NodeClaimTemplate, pod *corev1.Pod
388388
preferences := &Preferences{}
389389
// Add a toleration for PreferNoSchedule since a daemon pod shouldn't respect the preference
390390
_ = preferences.toleratePreferNoScheduleTaints(pod)
391-
if err := scheduling.Taints(nodeClaimTemplate.Spec.Taints).Tolerates(pod); err != nil {
391+
if err := scheduling.Taints(nodeClaimTemplate.Spec.Taints).ToleratesPod(pod); err != nil {
392392
return false
393393
}
394394
for {

pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go

+3-21
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
corev1 "k8s.io/api/core/v1"
3434
"k8s.io/apimachinery/pkg/api/resource"
3535
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36-
"k8s.io/apimachinery/pkg/util/sets"
3736
"k8s.io/apimachinery/pkg/util/uuid"
3837
"k8s.io/client-go/tools/record"
3938
"k8s.io/utils/clock"
@@ -156,8 +155,9 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
156155
pods := makeDiversePods(podCount)
157156
clock := &clock.RealClock{}
158157
cluster = state.NewCluster(clock, client, cloudProvider)
159-
160-
topology, err := scheduling.NewTopology(ctx, client, cluster, getDomains(instanceTypes), pods)
158+
topology, err := scheduling.NewTopology(ctx, client, cluster, nil, []*v1.NodePool{nodePool}, map[string][]*cloudprovider.InstanceType{
159+
nodePool.Name: instanceTypes,
160+
}, pods)
161161
if err != nil {
162162
b.Fatalf("creating topology, %s", err)
163163
}
@@ -221,24 +221,6 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
221221
}
222222
}
223223

224-
func getDomains(instanceTypes []*cloudprovider.InstanceType) map[string]sets.Set[string] {
225-
domains := map[string]sets.Set[string]{}
226-
for _, it := range instanceTypes {
227-
for key, requirement := range it.Requirements {
228-
// This code used to execute a Union between domains[key] and requirement.Values().
229-
// The downside of this is that Union is immutable and takes a copy of the set it is executed upon.
230-
// This resulted in a lot of memory pressure on the heap and poor performance
231-
// https://github.com/aws/karpenter/issues/3565
232-
if domains[key] == nil {
233-
domains[key] = sets.New(requirement.Values()...)
234-
} else {
235-
domains[key].Insert(requirement.Values()...)
236-
}
237-
}
238-
}
239-
return domains
240-
}
241-
242224
func makeDiversePods(count int) []*corev1.Pod {
243225
var pods []*corev1.Pod
244226
numTypes := 5

0 commit comments

Comments
 (0)