Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 126 additions & 4 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4017,7 +4017,12 @@ var _ = Describe("Consolidation", func() {
rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)

pods := test.Pods(3, test.PodOptions{
pods := test.Pods(2, test.PodOptions{
ResourceRequirements: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
},
},
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
Expand All @@ -4028,7 +4033,26 @@ var _ = Describe("Consolidation", func() {
Controller: lo.ToPtr(true),
BlockOwnerDeletion: lo.ToPtr(true),
},
}}})
},
},
})
pods = append(pods, test.Pods(1, test.PodOptions{
ResourceRequirements: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
},
},
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: lo.ToPtr(true),
BlockOwnerDeletion: lo.ToPtr(true),
},
}}})...)

ExpectApplied(ctx, env.Client, rs, pods[0], pods[1], pods[2], nodePool)
ExpectApplied(ctx, env.Client, nodeClaims[0], nodes[0]) // ensure node1 is the oldest node
Expand All @@ -4055,15 +4079,113 @@ var _ = Describe("Consolidation", func() {
ExpectSingletonReconciled(ctx, queue)

// Cascade any deletion of the nodeclaim to the node
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaims[0])
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaims...)

// the second node has more pods, so it would normally not be picked for consolidation, except it very little
// the first node has more pods, so it would normally not be picked for consolidation, except it very little
// lifetime remaining, so it should be deleted
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectNotFound(ctx, env.Client, nodeClaims[0], nodes[0])
})
})
Context("Node Utilization Consideration", func() {
var nodeClaims []*v1.NodeClaim
var nodes []*corev1.Node

BeforeEach(func() {
nodePool.Spec.Template.Spec.ExpireAfter = v1.MustParseNillableDuration("3s")
nodeClaims, nodes = test.NodeClaimsAndNodes(2, v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
corev1.LabelInstanceTypeStable: leastExpensiveInstance.Name,
v1.CapacityTypeLabelKey: leastExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(),
corev1.LabelTopologyZone: leastExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
},
},
Status: v1.NodeClaimStatus{
Allocatable: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("32"),
corev1.ResourcePods: resource.MustParse("100"),
},
},
})
for _, nc := range nodeClaims {
nc.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable)
}
})
It("should consider node utilization when calculating disruption cost", func() {
// create our RS so we can link a pod to it
rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)

pods := test.Pods(1, test.PodOptions{
ResourceRequirements: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("30"),
},
},
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: lo.ToPtr(true),
BlockOwnerDeletion: lo.ToPtr(true),
},
}}})
pods = append(pods, test.Pods(2, test.PodOptions{
ResourceRequirements: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
},
},
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: lo.ToPtr(true),
BlockOwnerDeletion: lo.ToPtr(true),
},
}}})...)

ExpectApplied(ctx, env.Client, rs, pods[0], pods[1], pods[2], nodePool)
ExpectApplied(ctx, env.Client, nodeClaims[0], nodes[0])
ExpectApplied(ctx, env.Client, nodeClaims[1], nodes[1])

// one pods on node 1, two on node 2, but node 1 has higher utilization
ExpectManualBinding(ctx, env.Client, pods[0], nodes[0])
ExpectManualBinding(ctx, env.Client, pods[1], nodes[1])
ExpectManualBinding(ctx, env.Client, pods[2], nodes[1])

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{nodes[0], nodes[1]}, []*v1.NodeClaim{nodeClaims[0], nodeClaims[1]})

fakeClock.SetTime(time.Now())

var wg sync.WaitGroup
ExpectToWait(fakeClock, &wg)
ExpectSingletonReconciled(ctx, disruptionController)
wg.Wait()

// Process the item so that the nodes can be deleted.
ExpectSingletonReconciled(ctx, queue)

// Cascade any deletion of the nodeclaim to the node
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaims...)

// the second node has more pods, so it would normally not be picked for consolidation, except it has
// much lower utilization (2cpu vs 30cpu). So it should be deleted.
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectNotFound(ctx, env.Client, nodeClaims[1], nodes[1])
})
})
Context("Topology Consideration", func() {
var nodeClaims []*v1.NodeClaim
var nodes []*corev1.Node
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, recorder events
zone: node.Labels()[corev1.LabelTopologyZone],
reschedulablePods: lo.Filter(pods, func(p *corev1.Pod, _ int) bool { return pod.IsReschedulable(p) }),
// We get the disruption cost from all pods in the candidate, not just the reschedulable pods
DisruptionCost: disruptionutils.ReschedulingCost(ctx, pods) * disruptionutils.LifetimeRemaining(clk, nodePool, node.NodeClaim),
DisruptionCost: disruptionutils.ReschedulingCost(ctx, pods) * disruptionutils.LifetimeRemaining(clk, nodePool, node.NodeClaim) * node.Utilization(),
}, nil
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/controllers/state/statenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,21 @@ func (in *StateNode) Available() corev1.ResourceList {
return resources.Subtract(in.Allocatable(), in.PodRequests())
}

// Utilization is the ratio of requested resources to allocatable resources
func (in *StateNode) Utilization() float64 {
requested := in.PodRequests()
if len(requested) == 0 {
return 0
}
alloc := in.Allocatable()
utilization := 0.0
for resource, request := range requested {
allocResource := alloc[resource]
utilization += float64(request.MilliValue()) / float64(allocResource.MilliValue())
}
return utilization / float64(len(requested))
}

func (in *StateNode) DaemonSetRequests() corev1.ResourceList {
return resources.Merge(lo.Values(in.daemonSetRequests)...)
}
Expand Down
Loading