Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,11 @@ func (p *Provisioner) NewScheduler(
instanceTypes[np.Name] = its
}

// inject topology constraints
pods = p.injectVolumeTopologyRequirements(ctx, pods)
// Link volume requirements to pods
podsVolumeRequirements := p.convertToPodVolumeRequirements(ctx, pods)

// Calculate cluster topology
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, stateNodes, nodePools, instanceTypes, pods)
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, stateNodes, nodePools, instanceTypes, pods, podsVolumeRequirements)
if err != nil {
return nil, fmt.Errorf("tracking topology counts, %w", err)
}
Expand Down Expand Up @@ -464,13 +464,13 @@ func validateKarpenterManagedLabelCanExist(p *corev1.Pod) error {
return nil
}

func (p *Provisioner) injectVolumeTopologyRequirements(ctx context.Context, pods []*corev1.Pod) []*corev1.Pod {
var schedulablePods []*corev1.Pod
func (p *Provisioner) convertToPodVolumeRequirements(ctx context.Context, pods []*corev1.Pod) map[*corev1.Pod][]corev1.NodeSelectorRequirement {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not really converting anything here, right? We're just creating a mapping between pods and their volume requirements. I think something along these lines is more accurate.

Suggested change
func (p *Provisioner) convertToPodVolumeRequirements(ctx context.Context, pods []*corev1.Pod) map[*corev1.Pod][]corev1.NodeSelectorRequirement {
func (p *Provisioner) volumeRequirementsForPods(ctx context.Context, pods []*corev1.Pod) map[*corev1.Pod][]corev1.NodeSelectorRequirement {

var schedulablePods = make(map[*corev1.Pod][]corev1.NodeSelectorRequirement)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this project uses this style for map initialization. Also, I think this name is more representative of what we're storing.

Suggested change
var schedulablePods = make(map[*corev1.Pod][]corev1.NodeSelectorRequirement)
podVolumeRequirements := map[*corev1.Pod][]corev1.NodeSelectorRequirement{}

for _, pod := range pods {
if err := p.volumeTopology.Inject(ctx, pod); err != nil {
if requirements, err := p.volumeTopology.GetVolumeRequirements(ctx, pod); err != nil {
log.FromContext(ctx).WithValues("Pod", klog.KObj(pod)).Error(err, "failed getting volume topology requirements")
} else {
schedulablePods = append(schedulablePods, pod)
schedulablePods[pod] = requirements
}
}
return schedulablePods
Expand Down
9 changes: 8 additions & 1 deletion pkg/controllers/provisioning/scheduling/existingnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewExistingNode(n *state.StateNode, topology *Topology, taints []v1.Taint,
return node
}

func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podData *PodData) error {
func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podData *PodData, volumeRequirements []v1.NodeSelectorRequirement) error {
// Check Taints
if err := scheduling.Taints(n.cachedTaints).ToleratesPod(pod); err != nil {
return err
Expand Down Expand Up @@ -111,6 +111,13 @@ func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v
}
nodeRequirements.Add(topologyRequirements.Values()...)

podVolumeRequirements := scheduling.NewNodeSelectorRequirements(volumeRequirements...)
// Check Pod Volume Requirements
if err = nodeRequirements.Compatible(podVolumeRequirements); err != nil {
return err
}
nodeRequirements.Add(podVolumeRequirements.Values()...)

// Update node
n.Pods = append(n.Pods, pod)
n.requests = requests
Expand Down
9 changes: 8 additions & 1 deletion pkg/controllers/provisioning/scheduling/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewNodeClaim(
}
}

func (n *NodeClaim) Add(ctx context.Context, pod *corev1.Pod, podData *PodData) error {
func (n *NodeClaim) Add(ctx context.Context, pod *corev1.Pod, podData *PodData, volumeRequirements []corev1.NodeSelectorRequirement) error {
// Check Taints
if err := scheduling.Taints(n.Spec.Taints).ToleratesPod(pod); err != nil {
return err
Expand Down Expand Up @@ -137,6 +137,13 @@ func (n *NodeClaim) Add(ctx context.Context, pod *corev1.Pod, podData *PodData)
}
nodeClaimRequirements.Add(topologyRequirements.Values()...)

podVolumeRequirements := scheduling.NewNodeSelectorRequirements(volumeRequirements...)
// Check Pod Volume Requirements
if err = nodeClaimRequirements.Compatible(podVolumeRequirements, scheduling.AllowUndefinedWellKnownLabels); err != nil {
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should wrap this error, this will be propagated out to the user if we fail to schedule the pod.

}
nodeClaimRequirements.Add(podVolumeRequirements.Values()...)

// Check instance type combinations
requests := resources.Merge(n.Spec.Resources.Requests, podData.Requests)

Expand Down
10 changes: 7 additions & 3 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,13 @@ func (s *Scheduler) updateCachedPodData(p *corev1.Pod) {

//nolint:gocyclo
func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
var volumeRequirements []corev1.NodeSelectorRequirement
if _, ok := s.topology.podVolumeRequirements[pod]; ok {
volumeRequirements = s.topology.podVolumeRequirements[pod]
}
// first try to schedule against an in-flight real node
for _, node := range s.existingNodes {
if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodData[pod.UID]); err == nil {
if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodData[pod.UID], volumeRequirements); err == nil {
return nil
}
}
Expand All @@ -362,7 +366,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {

// Pick existing node that we are about to create
for _, nodeClaim := range s.newNodeClaims {
if err := nodeClaim.Add(ctx, pod, s.cachedPodData[pod.UID]); err == nil {
if err := nodeClaim.Add(ctx, pod, s.cachedPodData[pod.UID], volumeRequirements); err == nil {
return nil
}
}
Expand All @@ -389,7 +393,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
}

nodeClaim := NewNodeClaim(nodeClaimTemplate, s.topology, s.daemonOverhead[nodeClaimTemplate], instanceTypes, s.reservationManager, s.reservedOfferingMode)
if err := nodeClaim.Add(ctx, pod, s.cachedPodData[pod.UID]); err != nil {
if err := nodeClaim.Add(ctx, pod, s.cachedPodData[pod.UID], volumeRequirements); err != nil {
nodeClaim.Destroy()
if IsReservedOfferingError(err) {
errs = multierr.Append(errs, fmt.Errorf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,10 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
pods := makeDiversePods(podCount)
clock := &clock.RealClock{}
cluster = state.NewCluster(clock, client, cloudProvider)
podsVolumeRequirements := make(map[*corev1.Pod][]corev1.NodeSelectorRequirement)
topology, err := scheduling.NewTopology(ctx, client, cluster, nil, []*v1.NodePool{nodePool}, map[string][]*cloudprovider.InstanceType{
nodePool.Name: instanceTypes,
}, pods)
}, pods, podsVolumeRequirements)
if err != nil {
b.Fatalf("creating topology, %s", err)
}
Expand Down
142 changes: 142 additions & 0 deletions pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3443,6 +3443,148 @@ var _ = Context("Scheduling", func() {
Expect(node.Name).ToNot(Equal(node2.Name))
})
})
Context("Pods with Zonal Volume and Topology Spread", func() {
var labels = map[string]string{"test": "test"}
var pvcs []*corev1.PersistentVolumeClaim
var pods []*corev1.Pod
var sc1 *storagev1.StorageClass
var sc2 *storagev1.StorageClass
var tsc = corev1.TopologySpreadConstraint{
MaxSkew: 1,
TopologyKey: corev1.LabelTopologyZone,
WhenUnsatisfiable: corev1.DoNotSchedule,
LabelSelector: &metav1.LabelSelector{MatchLabels: labels},
}
BeforeEach(func() {
pvcs = []*corev1.PersistentVolumeClaim{}
pods = []*corev1.Pod{}
sc1 = test.StorageClass(test.StorageClassOptions{
ObjectMeta: metav1.ObjectMeta{Name: "my-storage-class-1"},
Zones: []string{"test-zone-1"},
})
sc2 = test.StorageClass(test.StorageClassOptions{
ObjectMeta: metav1.ObjectMeta{Name: "my-storage-class-2"},
Zones: []string{"test-zone-2"},
})
for i := 0; i < 3; i++ {
// one is in test-zone-1 and others are in test-zone-2
scname := sc1.Name
if i > 0 {
scname = sc2.Name
}
pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("my-claim-%d", i)},
StorageClassName: lo.ToPtr(scname),
})
pod := test.UnschedulablePod(test.PodOptions{
// to ensure one node with one pod
PodAntiRequirements: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{MatchLabels: labels},
TopologyKey: corev1.LabelHostname,
},
},
TopologySpreadConstraints: []corev1.TopologySpreadConstraint{tsc},
PersistentVolumeClaims: []string{pvc.Name},
ObjectMeta: metav1.ObjectMeta{Labels: labels},
})
pvcs = append(pvcs, pvc)
pods = append(pods, pod)
}
})
It("should launch nodes when volume zone is compatible with topology spread", func() {
node1 := test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-1"},
},
})
node2 := test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-2"},
},
})
ExpectApplied(ctx, env.Client, nodePool, sc1, sc2)
ExpectApplied(ctx, env.Client, pvcs[0], pvcs[1], pvcs[2])
ExpectApplied(ctx, env.Client, pods[0], pods[1], node1, node2)
ExpectManualBinding(ctx, env.Client, pods[0], node1)
ExpectManualBinding(ctx, env.Client, pods[1], node2)

ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node1, node2}, nil)

ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods[2])
ExpectScheduled(ctx, env.Client, pods[2])
})
It("should not launch nodes when volume zone is not compatible with topology spread", func() {
node1 := test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-1"},
},
})
node2 := test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-2"},
},
})
node3 := test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-3"},
},
})

ExpectApplied(ctx, env.Client, nodePool, sc1, sc2)
ExpectApplied(ctx, env.Client, pvcs[0], pvcs[1], pvcs[2])
ExpectApplied(ctx, env.Client, pods[0], pods[1], node1, node2, node3)
ExpectManualBinding(ctx, env.Client, pods[0], node1)
ExpectManualBinding(ctx, env.Client, pods[1], node2)

ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node1, node2, node3}, nil)

ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods[2])
// for topology spread 3rd pod should be schduled to test-zone-3, but volume need be in test-zone-2
ExpectNotScheduled(ctx, env.Client, pods[2])

})
It("only nodes matching nodeAffinity/nodeSelector are included in the calculations by default", func() {
node1 := test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-1", "test": "test"},
},
})
node2 := test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-2", "test": "test"},
},
})
node3 := test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{corev1.LabelTopologyZone: "test-zone-3"},
},
})
nodePool.Spec.Template.Spec.Requirements = []v1.NodeSelectorRequirementWithMinValues{
{
NodeSelectorRequirement: corev1.NodeSelectorRequirement{
Key: "test",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"test"},
},
},
}
pods[2].Spec.NodeSelector = map[string]string{"test": "test"}

ExpectApplied(ctx, env.Client, nodePool, sc1, sc2)
ExpectApplied(ctx, env.Client, pvcs[0], pvcs[1], pvcs[2])
ExpectApplied(ctx, env.Client, pods[0], pods[1], node1, node2, node3)
ExpectManualBinding(ctx, env.Client, pods[0], node1)
ExpectManualBinding(ctx, env.Client, pods[1], node2)

ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node1, node2, node3}, nil)

ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods[2])
// since there is no node in test-zone-3 has label test, just test-zone-1 and test-zone-2 are included in the calculations.
ExpectScheduled(ctx, env.Client, pods[2])

})
})
})

Describe("Deleting Nodes", func() {
Expand Down
20 changes: 14 additions & 6 deletions pkg/controllers/provisioning/scheduling/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type Topology struct {
excludedPods sets.Set[string]
cluster *state.Cluster
stateNodes []*state.StateNode
// podVolumeRequirements links volume requirements to pods. This is used so we
// can track the volume requirements in simulate scheduler
podVolumeRequirements map[*corev1.Pod][]corev1.NodeSelectorRequirement
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use the pod's UID as the key here rather than a pointer to the pod object. We still use the pod object as a key elsewhere in the project, but we've moved to pod UID here (see excludedPods) and I think it would be wise for us to move to using it elsewhere when possible since we don't need to worry about copies preventing us from indexing.

}

func NewTopology(
Expand All @@ -70,6 +73,9 @@ func NewTopology(
nodePools []*v1.NodePool,
instanceTypes map[string][]*cloudprovider.InstanceType,
pods []*corev1.Pod,
// podVolumeRequirements links volume requirements to pods. This is used so we
// can track the volume requirements in simulate scheduler
podsVolumeRequirements map[*corev1.Pod][]corev1.NodeSelectorRequirement,
) (*Topology, error) {
t := &Topology{
kubeClient: kubeClient,
Expand All @@ -79,17 +85,18 @@ func NewTopology(
topologyGroups: map[uint64]*TopologyGroup{},
inverseTopologyGroups: map[uint64]*TopologyGroup{},
excludedPods: sets.New[string](),
podVolumeRequirements: podsVolumeRequirements,
}

// these are the pods that we intend to schedule, so if they are currently in the cluster we shouldn't count them for
// topology purposes
for _, p := range pods {
for p := range podsVolumeRequirements {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we iterating over the pods stored as keys in podVolumeRequirements rather than all pods? I believe this should be the same set in this implementation, but the intention is to exclude all pods we're attempting to schedule, not just those which have an associated volume requirement. Even if it's the same in practice today, this obfuscates intent.

t.excludedPods.Insert(string(p.UID))
}

errs := t.updateInverseAffinities(ctx)
for i := range pods {
errs = multierr.Append(errs, t.Update(ctx, pods[i]))
for p := range podsVolumeRequirements {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here - we should still be using pods rather than the pods stored as keys in the podVolumeRequirements. Let me know if this is intentional and I'm missing the rationale, but as far as I can tell we should be updating the topology for all pods we're attempting to schedule.

Copy link
Author

@leoryu leoryu Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmdeal Since we need to know whether the pod has VolumeRequirements, the pods is replaced by podVolumeRequirements. Please check line 238.

errs = multierr.Append(errs, t.Update(ctx, p))
}
if errs != nil {
return nil, errs
Expand Down Expand Up @@ -228,7 +235,7 @@ func (t *Topology) AddRequirements(p *corev1.Pod, taints []corev1.Taint, podRequ
if nodeRequirements.Has(topology.Key) {
nodeDomains = nodeRequirements.Get(topology.Key)
}
domains := topology.Get(p, podDomains, nodeDomains)
domains := topology.Get(p, podDomains, nodeDomains, len(t.podVolumeRequirements[p]) != 0)
if domains.Len() == 0 {
return nil, topologyError{
topology: topology,
Expand Down Expand Up @@ -299,7 +306,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *corev1.Po
return err
}

tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey])
tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey], t.cluster)

hash := tg.Hash()
if existing, ok := t.inverseTopologyGroups[hash]; !ok {
Expand Down Expand Up @@ -442,6 +449,7 @@ func (t *Topology) newForTopologies(p *corev1.Pod) []*TopologyGroup {
tsc.NodeTaintsPolicy,
tsc.NodeAffinityPolicy,
t.domainGroups[tsc.TopologyKey],
t.cluster,
))
}
return topologyGroups
Expand Down Expand Up @@ -479,7 +487,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *corev1.Pod) ([]*Topo
if err != nil {
return nil, err
}
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey]))
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey], t.cluster))
}
}
return topologyGroups, nil
Expand Down
Loading