Skip to content

Commit cec1814

Browse files
committed
fix topology spread constraints with zonal volume
1 parent 75f5bc3 commit cec1814

File tree

9 files changed

+259
-57
lines changed

9 files changed

+259
-57
lines changed

pkg/controllers/provisioning/provisioner.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -254,11 +254,11 @@ func (p *Provisioner) NewScheduler(
254254
instanceTypes[np.Name] = its
255255
}
256256

257-
// inject topology constraints
258-
pods = p.injectVolumeTopologyRequirements(ctx, pods)
257+
// Link volume requirements to pods
258+
podsVolumeRequirements := p.convertToPodVolumeRequirements(ctx, pods)
259259

260260
// Calculate cluster topology
261-
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, stateNodes, nodePools, instanceTypes, pods)
261+
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, stateNodes, nodePools, instanceTypes, pods, podsVolumeRequirements)
262262
if err != nil {
263263
return nil, fmt.Errorf("tracking topology counts, %w", err)
264264
}
@@ -464,13 +464,13 @@ func validateKarpenterManagedLabelCanExist(p *corev1.Pod) error {
464464
return nil
465465
}
466466

467-
func (p *Provisioner) injectVolumeTopologyRequirements(ctx context.Context, pods []*corev1.Pod) []*corev1.Pod {
468-
var schedulablePods []*corev1.Pod
467+
func (p *Provisioner) convertToPodVolumeRequirements(ctx context.Context, pods []*corev1.Pod) map[*corev1.Pod][]corev1.NodeSelectorRequirement {
468+
var schedulablePods = make(map[*corev1.Pod][]corev1.NodeSelectorRequirement)
469469
for _, pod := range pods {
470-
if err := p.volumeTopology.Inject(ctx, pod); err != nil {
470+
if requirements, err := p.volumeTopology.GetVolumeRequirements(ctx, pod); err != nil {
471471
log.FromContext(ctx).WithValues("Pod", klog.KObj(pod)).Error(err, "failed getting volume topology requirements")
472472
} else {
473-
schedulablePods = append(schedulablePods, pod)
473+
schedulablePods[pod] = requirements
474474
}
475475
}
476476
return schedulablePods

pkg/controllers/provisioning/scheduling/existingnode.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func NewExistingNode(n *state.StateNode, topology *Topology, taints []v1.Taint,
6565
return node
6666
}
6767

68-
func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podData *PodData) error {
68+
func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podData *PodData, volumeRequirements []v1.NodeSelectorRequirement) error {
6969
// Check Taints
7070
if err := scheduling.Taints(n.cachedTaints).ToleratesPod(pod); err != nil {
7171
return err
@@ -111,6 +111,13 @@ func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v
111111
}
112112
nodeRequirements.Add(topologyRequirements.Values()...)
113113

114+
podVolumeRequirements := scheduling.NewNodeSelectorRequirements(volumeRequirements...)
115+
// Check Pod Volume Requirements
116+
if err = nodeRequirements.Compatible(podVolumeRequirements); err != nil {
117+
return err
118+
}
119+
nodeRequirements.Add(podVolumeRequirements.Values()...)
120+
114121
// Update node
115122
n.Pods = append(n.Pods, pod)
116123
n.requests = requests

pkg/controllers/provisioning/scheduling/nodeclaim.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func NewNodeClaim(
108108
}
109109
}
110110

111-
func (n *NodeClaim) Add(ctx context.Context, pod *corev1.Pod, podData *PodData) error {
111+
func (n *NodeClaim) Add(ctx context.Context, pod *corev1.Pod, podData *PodData, volumeRequirements []corev1.NodeSelectorRequirement) error {
112112
// Check Taints
113113
if err := scheduling.Taints(n.Spec.Taints).ToleratesPod(pod); err != nil {
114114
return err
@@ -137,6 +137,13 @@ func (n *NodeClaim) Add(ctx context.Context, pod *corev1.Pod, podData *PodData)
137137
}
138138
nodeClaimRequirements.Add(topologyRequirements.Values()...)
139139

140+
podVolumeRequirements := scheduling.NewNodeSelectorRequirements(volumeRequirements...)
141+
// Check Pod Volume Requirements
142+
if err = nodeClaimRequirements.Compatible(podVolumeRequirements, scheduling.AllowUndefinedWellKnownLabels); err != nil {
143+
return err
144+
}
145+
nodeClaimRequirements.Add(podVolumeRequirements.Values()...)
146+
140147
// Check instance type combinations
141148
requests := resources.Merge(n.Spec.Resources.Requests, podData.Requests)
142149

pkg/controllers/provisioning/scheduling/scheduler.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,13 @@ func (s *Scheduler) updateCachedPodData(p *corev1.Pod) {
350350

351351
//nolint:gocyclo
352352
func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
353+
var volumeRequirements []corev1.NodeSelectorRequirement
354+
if _, ok := s.topology.podVolumeRequirements[pod]; ok {
355+
volumeRequirements = s.topology.podVolumeRequirements[pod]
356+
}
353357
// first try to schedule against an in-flight real node
354358
for _, node := range s.existingNodes {
355-
if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodData[pod.UID]); err == nil {
359+
if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodData[pod.UID], volumeRequirements); err == nil {
356360
return nil
357361
}
358362
}
@@ -362,7 +366,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
362366

363367
// Pick existing node that we are about to create
364368
for _, nodeClaim := range s.newNodeClaims {
365-
if err := nodeClaim.Add(ctx, pod, s.cachedPodData[pod.UID]); err == nil {
369+
if err := nodeClaim.Add(ctx, pod, s.cachedPodData[pod.UID], volumeRequirements); err == nil {
366370
return nil
367371
}
368372
}
@@ -389,7 +393,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
389393
}
390394

391395
nodeClaim := NewNodeClaim(nodeClaimTemplate, s.topology, s.daemonOverhead[nodeClaimTemplate], instanceTypes, s.reservationManager, s.reservedOfferingMode)
392-
if err := nodeClaim.Add(ctx, pod, s.cachedPodData[pod.UID]); err != nil {
396+
if err := nodeClaim.Add(ctx, pod, s.cachedPodData[pod.UID], volumeRequirements); err != nil {
393397
nodeClaim.Destroy()
394398
if IsReservedOfferingError(err) {
395399
errs = multierr.Append(errs, fmt.Errorf(

pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,10 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
155155
pods := makeDiversePods(podCount)
156156
clock := &clock.RealClock{}
157157
cluster = state.NewCluster(clock, client, cloudProvider)
158+
podsVolumeRequirements := make(map[*corev1.Pod][]corev1.NodeSelectorRequirement)
158159
topology, err := scheduling.NewTopology(ctx, client, cluster, nil, []*v1.NodePool{nodePool}, map[string][]*cloudprovider.InstanceType{
159160
nodePool.Name: instanceTypes,
160-
}, pods)
161+
}, pods, podsVolumeRequirements)
161162
if err != nil {
162163
b.Fatalf("creating topology, %s", err)
163164
}

pkg/controllers/provisioning/scheduling/suite_test.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3443,6 +3443,148 @@ var _ = Context("Scheduling", func() {
34433443
Expect(node.Name).ToNot(Equal(node2.Name))
34443444
})
34453445
})
3446+
Context("Pods with Zonal Volume and Topology Spread", func() {
3447+
var labels = map[string]string{"test": "test"}
3448+
var pvcs []*corev1.PersistentVolumeClaim
3449+
var pods []*corev1.Pod
3450+
var sc1 *storagev1.StorageClass
3451+
var sc2 *storagev1.StorageClass
3452+
var tsc = corev1.TopologySpreadConstraint{
3453+
MaxSkew: 1,
3454+
TopologyKey: corev1.LabelTopologyZone,
3455+
WhenUnsatisfiable: corev1.DoNotSchedule,
3456+
LabelSelector: &metav1.LabelSelector{MatchLabels: labels},
3457+
}
3458+
BeforeEach(func() {
3459+
pvcs = []*corev1.PersistentVolumeClaim{}
3460+
pods = []*corev1.Pod{}
3461+
sc1 = test.StorageClass(test.StorageClassOptions{
3462+
ObjectMeta: metav1.ObjectMeta{Name: "my-storage-class-1"},
3463+
Zones: []string{"test-zone-1"},
3464+
})
3465+
sc2 = test.StorageClass(test.StorageClassOptions{
3466+
ObjectMeta: metav1.ObjectMeta{Name: "my-storage-class-2"},
3467+
Zones: []string{"test-zone-2"},
3468+
})
3469+
for i := 0; i < 3; i++ {
3470+
// one is in test-zone-1 and others are in test-zone-2
3471+
scname := sc1.Name
3472+
if i > 0 {
3473+
scname = sc2.Name
3474+
}
3475+
pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{
3476+
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("my-claim-%d", i)},
3477+
StorageClassName: lo.ToPtr(scname),
3478+
})
3479+
pod := test.UnschedulablePod(test.PodOptions{
3480+
// to ensure one node with one pod
3481+
PodAntiRequirements: []corev1.PodAffinityTerm{
3482+
{
3483+
LabelSelector: &metav1.LabelSelector{MatchLabels: labels},
3484+
TopologyKey: corev1.LabelHostname,
3485+
},
3486+
},
3487+
TopologySpreadConstraints: []corev1.TopologySpreadConstraint{tsc},
3488+
PersistentVolumeClaims: []string{pvc.Name},
3489+
ObjectMeta: metav1.ObjectMeta{Labels: labels},
3490+
})
3491+
pvcs = append(pvcs, pvc)
3492+
pods = append(pods, pod)
3493+
}
3494+
})
3495+
It("should launch nodes when volume zone is compatible with topology spread", func() {
3496+
node1 := test.Node(test.NodeOptions{
3497+
ObjectMeta: metav1.ObjectMeta{
3498+
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-1"},
3499+
},
3500+
})
3501+
node2 := test.Node(test.NodeOptions{
3502+
ObjectMeta: metav1.ObjectMeta{
3503+
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-2"},
3504+
},
3505+
})
3506+
ExpectApplied(ctx, env.Client, nodePool, sc1, sc2)
3507+
ExpectApplied(ctx, env.Client, pvcs[0], pvcs[1], pvcs[2])
3508+
ExpectApplied(ctx, env.Client, pods[0], pods[1], node1, node2)
3509+
ExpectManualBinding(ctx, env.Client, pods[0], node1)
3510+
ExpectManualBinding(ctx, env.Client, pods[1], node2)
3511+
3512+
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node1, node2}, nil)
3513+
3514+
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods[2])
3515+
ExpectScheduled(ctx, env.Client, pods[2])
3516+
})
3517+
It("should not launch nodes when volume zone is not compatible with topology spread", func() {
3518+
node1 := test.Node(test.NodeOptions{
3519+
ObjectMeta: metav1.ObjectMeta{
3520+
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-1"},
3521+
},
3522+
})
3523+
node2 := test.Node(test.NodeOptions{
3524+
ObjectMeta: metav1.ObjectMeta{
3525+
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-2"},
3526+
},
3527+
})
3528+
node3 := test.Node(test.NodeOptions{
3529+
ObjectMeta: metav1.ObjectMeta{
3530+
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-3"},
3531+
},
3532+
})
3533+
3534+
ExpectApplied(ctx, env.Client, nodePool, sc1, sc2)
3535+
ExpectApplied(ctx, env.Client, pvcs[0], pvcs[1], pvcs[2])
3536+
ExpectApplied(ctx, env.Client, pods[0], pods[1], node1, node2, node3)
3537+
ExpectManualBinding(ctx, env.Client, pods[0], node1)
3538+
ExpectManualBinding(ctx, env.Client, pods[1], node2)
3539+
3540+
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node1, node2, node3}, nil)
3541+
3542+
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods[2])
3543+
// for topology spread 3rd pod should be schduled to test-zone-3, but volume need be in test-zone-2
3544+
ExpectNotScheduled(ctx, env.Client, pods[2])
3545+
3546+
})
3547+
It("only nodes matching nodeAffinity/nodeSelector are included in the calculations by default", func() {
3548+
node1 := test.Node(test.NodeOptions{
3549+
ObjectMeta: metav1.ObjectMeta{
3550+
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-1", "test": "test"},
3551+
},
3552+
})
3553+
node2 := test.Node(test.NodeOptions{
3554+
ObjectMeta: metav1.ObjectMeta{
3555+
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-2", "test": "test"},
3556+
},
3557+
})
3558+
node3 := test.Node(test.NodeOptions{
3559+
ObjectMeta: metav1.ObjectMeta{
3560+
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-3"},
3561+
},
3562+
})
3563+
nodePool.Spec.Template.Spec.Requirements = []v1.NodeSelectorRequirementWithMinValues{
3564+
{
3565+
NodeSelectorRequirement: corev1.NodeSelectorRequirement{
3566+
Key: "test",
3567+
Operator: corev1.NodeSelectorOpIn,
3568+
Values: []string{"test"},
3569+
},
3570+
},
3571+
}
3572+
pods[2].Spec.NodeSelector = map[string]string{"test": "test"}
3573+
3574+
ExpectApplied(ctx, env.Client, nodePool, sc1, sc2)
3575+
ExpectApplied(ctx, env.Client, pvcs[0], pvcs[1], pvcs[2])
3576+
ExpectApplied(ctx, env.Client, pods[0], pods[1], node1, node2, node3)
3577+
ExpectManualBinding(ctx, env.Client, pods[0], node1)
3578+
ExpectManualBinding(ctx, env.Client, pods[1], node2)
3579+
3580+
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node1, node2, node3}, nil)
3581+
3582+
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods[2])
3583+
// since there is no node in test-zone-3 has label test, just test-zone-1 and test-zone-2 are included in the calculations.
3584+
ExpectScheduled(ctx, env.Client, pods[2])
3585+
3586+
})
3587+
})
34463588
})
34473589

34483590
Describe("Deleting Nodes", func() {

pkg/controllers/provisioning/scheduling/topology.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ type Topology struct {
6060
excludedPods sets.Set[string]
6161
cluster *state.Cluster
6262
stateNodes []*state.StateNode
63+
// podVolumeRequirements links volume requirements to pods. This is used so we
64+
// can track the volume requirements in simulate scheduler
65+
podVolumeRequirements map[*corev1.Pod][]corev1.NodeSelectorRequirement
6366
}
6467

6568
func NewTopology(
@@ -70,6 +73,9 @@ func NewTopology(
7073
nodePools []*v1.NodePool,
7174
instanceTypes map[string][]*cloudprovider.InstanceType,
7275
pods []*corev1.Pod,
76+
// podVolumeRequirements links volume requirements to pods. This is used so we
77+
// can track the volume requirements in simulate scheduler
78+
podsVolumeRequirements map[*corev1.Pod][]corev1.NodeSelectorRequirement,
7379
) (*Topology, error) {
7480
t := &Topology{
7581
kubeClient: kubeClient,
@@ -79,17 +85,18 @@ func NewTopology(
7985
topologyGroups: map[uint64]*TopologyGroup{},
8086
inverseTopologyGroups: map[uint64]*TopologyGroup{},
8187
excludedPods: sets.New[string](),
88+
podVolumeRequirements: podsVolumeRequirements,
8289
}
8390

8491
// these are the pods that we intend to schedule, so if they are currently in the cluster we shouldn't count them for
8592
// topology purposes
86-
for _, p := range pods {
93+
for p := range podsVolumeRequirements {
8794
t.excludedPods.Insert(string(p.UID))
8895
}
8996

9097
errs := t.updateInverseAffinities(ctx)
91-
for i := range pods {
92-
errs = multierr.Append(errs, t.Update(ctx, pods[i]))
98+
for p := range podsVolumeRequirements {
99+
errs = multierr.Append(errs, t.Update(ctx, p))
93100
}
94101
if errs != nil {
95102
return nil, errs
@@ -228,7 +235,7 @@ func (t *Topology) AddRequirements(p *corev1.Pod, taints []corev1.Taint, podRequ
228235
if nodeRequirements.Has(topology.Key) {
229236
nodeDomains = nodeRequirements.Get(topology.Key)
230237
}
231-
domains := topology.Get(p, podDomains, nodeDomains)
238+
domains := topology.Get(p, podDomains, nodeDomains, len(t.podVolumeRequirements[p]) != 0)
232239
if domains.Len() == 0 {
233240
return nil, topologyError{
234241
topology: topology,
@@ -299,7 +306,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *corev1.Po
299306
return err
300307
}
301308

302-
tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey])
309+
tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey], t.cluster)
303310

304311
hash := tg.Hash()
305312
if existing, ok := t.inverseTopologyGroups[hash]; !ok {
@@ -442,6 +449,7 @@ func (t *Topology) newForTopologies(p *corev1.Pod) []*TopologyGroup {
442449
tsc.NodeTaintsPolicy,
443450
tsc.NodeAffinityPolicy,
444451
t.domainGroups[tsc.TopologyKey],
452+
t.cluster,
445453
))
446454
}
447455
return topologyGroups
@@ -479,7 +487,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *corev1.Pod) ([]*Topo
479487
if err != nil {
480488
return nil, err
481489
}
482-
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey]))
490+
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey], t.cluster))
483491
}
484492
}
485493
return topologyGroups, nil

0 commit comments

Comments
 (0)