Skip to content

Commit

Permalink
Add testing for consolidation budgets e2e (#5525)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis authored Jan 26, 2024
1 parent daeb5da commit 0be1ea7
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 239 deletions.
99 changes: 89 additions & 10 deletions test/pkg/environment/common/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/base64"
"fmt"
"io"
"math"
"strings"
"time"

Expand Down Expand Up @@ -468,25 +469,44 @@ func (env *Environment) ExpectCreatedNodeCount(comparator string, count int) []*
return createdNodes
}

func (env *Environment) ExpectNodeCount(comparator string, count int) {
GinkgoHelper()

nodeList := &v1.NodeList{}
Expect(env.Client.List(env, nodeList, client.HasLabels{test.DiscoveryLabel})).To(Succeed())
Expect(len(nodeList.Items)).To(BeNumerically(comparator, count))
}

func (env *Environment) ExpectNodeClaimCount(comparator string, count int) {
GinkgoHelper()

nodeClaimList := &corev1beta1.NodeClaimList{}
Expect(env.Client.List(env, nodeClaimList, client.HasLabels{test.DiscoveryLabel})).To(Succeed())
Expect(len(nodeClaimList.Items)).To(BeNumerically(comparator, count))
}

func NodeNames(nodes []*v1.Node) []string {
return lo.Map(nodes, func(n *v1.Node, index int) string {
return n.Name
})
}

func (env *Environment) ConsistentlyExpectNodeCount(comparator string, count int, duration string) []*v1.Node {
func (env *Environment) ConsistentlyExpectNodeCount(comparator string, count int, duration time.Duration) []*v1.Node {
GinkgoHelper()
By(fmt.Sprintf("expecting nodes to be %s to %d for %s", comparator, count, duration))
nodeList := &v1.NodeList{}
Consistently(func(g Gomega) {
g.Expect(env.Client.List(env, nodeList, client.HasLabels{test.DiscoveryLabel})).To(Succeed())
g.Expect(len(nodeList.Items)).To(BeNumerically(comparator, count),
fmt.Sprintf("expected %d nodes, had %d (%v) for %s", count, len(nodeList.Items), NodeNames(lo.ToSlicePtr(nodeList.Items)), duration))
}, duration).Should(Succeed())
}, duration.String()).Should(Succeed())
return lo.ToSlicePtr(nodeList.Items)
}

func (env *Environment) ConsistentlyExpectNoDisruptions(nodeCount int, duration string) {
// ConsistentlyExpectNoDisruptions ensures that the state of the cluster is not changed within a passed duration
// Specifically, we check if the cluster size in terms of nodes is the same as the passed-in size and we validate
// that no disrupting taints are added throughout the window
func (env *Environment) ConsistentlyExpectNoDisruptions(nodeCount int, duration time.Duration) {
GinkgoHelper()
Consistently(func(g Gomega) {
// Ensure we don't change our NodeClaims
Expand All @@ -504,7 +524,20 @@ func (env *Environment) ConsistentlyExpectNoDisruptions(nodeCount int, duration
})
g.Expect(ok).To(BeFalse())
}
}, duration).Should(Succeed())
}, duration.String()).Should(Succeed())
}

func (env *Environment) ConsistentlyExpectTaintedNodeCount(comparator string, count int, duration time.Duration) []*v1.Node {
GinkgoHelper()

By(fmt.Sprintf("checking for tainted nodes to be %s to %d for %s", comparator, count, duration))
nodeList := &v1.NodeList{}
Consistently(func(g Gomega) {
g.Expect(env.Client.List(env, nodeList, client.MatchingFields{"spec.taints[*].karpenter.sh/disruption": "disrupting"})).To(Succeed())
g.Expect(len(nodeList.Items)).To(BeNumerically(comparator, count),
fmt.Sprintf("expected %d tainted nodes, had %d (%v)", count, len(nodeList.Items), NodeNames(lo.ToSlicePtr(nodeList.Items))))
}, duration.String()).Should(Succeed())
return lo.ToSlicePtr(nodeList.Items)
}

func (env *Environment) EventuallyExpectTaintedNodeCount(comparator string, count int) []*v1.Node {
Expand Down Expand Up @@ -751,17 +784,63 @@ func (env *Environment) ExpectDaemonSetEnvironmentVariableUpdated(obj client.Obj
Expect(env.Client.Patch(env.Context, ds, patch)).To(Succeed())
}

func (env *Environment) ExpectHealthyPodsForNode(nodeName string) []*v1.Pod {
// ForcePodsToSpread ensures that currently scheduled pods get spread evenly across all passed nodes by deleting pods off of existing
// nodes and waiting them to reschedule. This is useful for scenarios where you want to force the nodes be underutilized
// but you want to keep a consistent count of nodes rather than leaving around empty ones.
func (env *Environment) ForcePodsToSpread(nodes ...*v1.Node) {
GinkgoHelper()

// Get the total count of pods across
podCount := 0
for _, n := range nodes {
podCount += len(env.ExpectActivePodsForNode(n.Name))
}
maxPodsPerNode := int(math.Ceil(float64(podCount) / float64(len(nodes))))

By(fmt.Sprintf("forcing %d pods to spread across %d nodes", podCount, len(nodes)))
start := time.Now()
for {
var nodePods []*v1.Pod
node, found := lo.Find(nodes, func(n *v1.Node) bool {
nodePods = env.ExpectActivePodsForNode(n.Name)
return len(nodePods) > maxPodsPerNode
})
if !found {
break
}
// Set the nodes to unschedulable so that the pods won't reschedule.
Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).To(Succeed())
stored := node.DeepCopy()
node.Spec.Unschedulable = true
Expect(env.Client.Patch(env.Context, node, client.MergeFrom(stored))).To(Succeed())
for _, pod := range nodePods[maxPodsPerNode:] {
env.ExpectDeleted(pod)
}
Eventually(func(g Gomega) {
g.Expect(len(env.ExpectActivePodsForNode(node.Name))).To(Or(Equal(maxPodsPerNode), Equal(maxPodsPerNode-1)))
}).WithTimeout(5 * time.Second).Should(Succeed())

// TODO: Consider moving this time check to an Eventually poll. This gets a little tricker with helper functions
// since you need to make sure that your Expectation helper functions are scoped to to your "g Gomega" scope
// so that you don't fail the first time you get a failure on your expectation
if time.Since(start) > time.Minute*15 {
Fail("forcing pods to spread failed due to a timeout")
}
}
for _, n := range nodes {
stored := n.DeepCopy()
n.Spec.Unschedulable = false
Expect(env.Client.Patch(env.Context, n, client.MergeFrom(stored))).To(Succeed())
}
}

func (env *Environment) ExpectActivePodsForNode(nodeName string) []*v1.Pod {
GinkgoHelper()
podList := &v1.PodList{}
Expect(env.Client.List(env, podList, client.MatchingFields{"spec.nodeName": nodeName}, client.HasLabels{test.DiscoveryLabel})).To(Succeed())

// Return the healthy pods
return lo.Filter(lo.ToSlicePtr(podList.Items), func(p *v1.Pod, _ int) bool {
_, found := lo.Find(p.Status.Conditions, func(cond v1.PodCondition) bool {
return cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue
})
return found
return p.DeletionTimestamp.IsZero()
})
}

Expand Down
220 changes: 220 additions & 0 deletions test/suites/consolidation/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/samber/lo"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"

corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/test"
Expand Down Expand Up @@ -63,6 +65,224 @@ var _ = AfterEach(func() { env.Cleanup() })
var _ = AfterEach(func() { env.AfterEach() })

var _ = Describe("Consolidation", func() {
Context("Budgets", func() {
var nodePool *corev1beta1.NodePool
var dep *appsv1.Deployment
var selector labels.Selector
var numPods int32
BeforeEach(func() {
nodePool = env.DefaultNodePool(nodeClass)
nodePool.Spec.Disruption.ConsolidateAfter = nil

numPods = 5
dep = test.Deployment(test.DeploymentOptions{
Replicas: numPods,
PodOptions: test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "regular-app"},
},
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")},
},
},
})
selector = labels.SelectorFromSet(dep.Spec.Selector.MatchLabels)
})
It("should respect budgets for empty delete consolidation", func() {
nodePool.Spec.Disruption.Budgets = []corev1beta1.Budget{
{
Nodes: "40%",
},
}

// Hostname anti-affinity to require one pod on each node
dep.Spec.Template.Spec.Affinity = &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: dep.Spec.Selector,
TopologyKey: v1.LabelHostname,
},
},
},
}
env.ExpectCreated(nodeClass, nodePool, dep)

env.EventuallyExpectCreatedNodeClaimCount("==", 5)
nodes := env.EventuallyExpectCreatedNodeCount("==", 5)
env.EventuallyExpectHealthyPodCount(selector, int(numPods))

By("adding finalizers to the nodes to prevent termination")
for _, node := range nodes {
Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).To(Succeed())
node.Finalizers = append(node.Finalizers, common.TestingFinalizer)
env.ExpectUpdated(node)
}

dep.Spec.Replicas = lo.ToPtr[int32](1)
By("making the nodes empty")
// Update the deployment to only contain 1 replica.
env.ExpectUpdated(dep)

// Ensure that we get two nodes tainted, and they have overlap during the drift
env.EventuallyExpectTaintedNodeCount("==", 2)
nodes = env.ConsistentlyExpectTaintedNodeCount("==", 2, time.Second*5)

// Remove the finalizer from each node so that we can terminate
for _, node := range nodes {
Expect(env.ExpectTestingFinalizerRemoved(node)).To(Succeed())
}

// After the deletion timestamp is set and all pods are drained
// the node should be gone
env.EventuallyExpectNotFound(nodes[0], nodes[1])

// This check ensures that we are consolidating nodes at the same time
env.EventuallyExpectTaintedNodeCount("==", 2)
nodes = env.ConsistentlyExpectTaintedNodeCount("==", 2, time.Second*5)

for _, node := range nodes {
Expect(env.ExpectTestingFinalizerRemoved(node)).To(Succeed())
}
env.EventuallyExpectNotFound(nodes[0], nodes[1])

// Expect there to only be one node remaining for the last replica
env.ExpectNodeCount("==", 1)
})
It("should respect budgets for non-empty delete consolidation", func() {
// This test will hold consolidation until we are ready to execute it
nodePool.Spec.Disruption.ConsolidateAfter = &corev1beta1.NillableDuration{}

nodePool = test.ReplaceRequirements(nodePool,
v1.NodeSelectorRequirement{
Key: v1beta1.LabelInstanceSize,
Operator: v1.NodeSelectorOpIn,
Values: []string{"2xlarge"},
},
)
// We're expecting to create 3 nodes, so we'll expect to see at most 2 nodes deleting at one time.
nodePool.Spec.Disruption.Budgets = []corev1beta1.Budget{{
Nodes: "50%",
}}
numPods = 9
dep = test.Deployment(test.DeploymentOptions{
Replicas: numPods,
PodOptions: test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "large-app"},
},
// Each 2xlarge has 8 cpu, so each node should fit no more than 3 pods.
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2100m"),
},
},
},
})
selector = labels.SelectorFromSet(dep.Spec.Selector.MatchLabels)
env.ExpectCreated(nodeClass, nodePool, dep)

env.EventuallyExpectCreatedNodeClaimCount("==", 3)
nodes := env.EventuallyExpectCreatedNodeCount("==", 3)
env.EventuallyExpectHealthyPodCount(selector, int(numPods))

By("scaling down the deployment")
// Update the deployment to a third of the replicas.
dep.Spec.Replicas = lo.ToPtr[int32](3)
env.ExpectUpdated(dep)

env.ForcePodsToSpread(nodes...)
env.EventuallyExpectHealthyPodCount(selector, 3)

By("cordoning and adding finalizer to the nodes")
// Add a finalizer to each node so that we can stop termination disruptions
for _, node := range nodes {
Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).To(Succeed())
node.Finalizers = append(node.Finalizers, common.TestingFinalizer)
env.ExpectUpdated(node)
}

By("enabling consolidation")
nodePool.Spec.Disruption.ConsolidateAfter = nil
env.ExpectUpdated(nodePool)

// Ensure that we get two nodes tainted, and they have overlap during the drift
env.EventuallyExpectTaintedNodeCount("==", 2)
nodes = env.ConsistentlyExpectTaintedNodeCount("==", 2, time.Second*5)

for _, node := range nodes {
Expect(env.ExpectTestingFinalizerRemoved(node)).To(Succeed())
}
env.EventuallyExpectNotFound(nodes[0], nodes[1])
env.ExpectNodeCount("==", 1)
})
It("should not allow consolidation if the budget is fully blocking", func() {
// We're going to define a budget that doesn't allow any consolidation to happen
nodePool.Spec.Disruption.Budgets = []corev1beta1.Budget{{
Nodes: "0",
}}

// Hostname anti-affinity to require one pod on each node
dep.Spec.Template.Spec.Affinity = &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: dep.Spec.Selector,
TopologyKey: v1.LabelHostname,
},
},
},
}
env.ExpectCreated(nodeClass, nodePool, dep)

env.EventuallyExpectCreatedNodeClaimCount("==", 5)
env.EventuallyExpectCreatedNodeCount("==", 5)
env.EventuallyExpectHealthyPodCount(selector, int(numPods))

dep.Spec.Replicas = lo.ToPtr[int32](1)
By("making the nodes empty")
// Update the deployment to only contain 1 replica.
env.ExpectUpdated(dep)

env.ConsistentlyExpectNoDisruptions(5, time.Minute)
})
It("should not allow consolidation if the budget is fully blocking during a scheduled time", func() {
// We're going to define a budget that doesn't allow any drift to happen
// This is going to be on a schedule that only lasts 30 minutes, whose window starts 15 minutes before
// the current time and extends 15 minutes past the current time
// Times need to be in UTC since the karpenter containers were built in UTC time
windowStart := time.Now().Add(-time.Minute * 15).UTC()
nodePool.Spec.Disruption.Budgets = []corev1beta1.Budget{{
Nodes: "0",
Schedule: lo.ToPtr(fmt.Sprintf("%d %d * * *", windowStart.Minute(), windowStart.Hour())),
Duration: &metav1.Duration{Duration: time.Minute * 30},
}}

// Hostname anti-affinity to require one pod on each node
dep.Spec.Template.Spec.Affinity = &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: dep.Spec.Selector,
TopologyKey: v1.LabelHostname,
},
},
},
}
env.ExpectCreated(nodeClass, nodePool, dep)

env.EventuallyExpectCreatedNodeClaimCount("==", 5)
env.EventuallyExpectCreatedNodeCount("==", 5)
env.EventuallyExpectHealthyPodCount(selector, int(numPods))

dep.Spec.Replicas = lo.ToPtr[int32](1)
By("making the nodes empty")
// Update the deployment to only contain 1 replica.
env.ExpectUpdated(dep)

env.ConsistentlyExpectNoDisruptions(5, time.Minute)
})
})
DescribeTable("should consolidate nodes (delete)", Label(debug.NoWatch), Label(debug.NoEvents),
func(spotToSpot bool) {
nodePool := test.NodePool(corev1beta1.NodePool{
Expand Down
Loading

0 comments on commit 0be1ea7

Please sign in to comment.