From 8afb11a09ec767641cfb1ad58612da4735ff0102 Mon Sep 17 00:00:00 2001 From: Reed Schalo Date: Wed, 26 Feb 2025 13:41:47 -0800 Subject: [PATCH 1/7] perf: shuffle disruption candidates by nodepool --- .../disruption/singlenodeconsolidation.go | 152 +++++++++++++++++- 1 file changed, 149 insertions(+), 3 deletions(-) diff --git a/pkg/controllers/disruption/singlenodeconsolidation.go b/pkg/controllers/disruption/singlenodeconsolidation.go index ad0196e15e..e53f3fbeb4 100644 --- a/pkg/controllers/disruption/singlenodeconsolidation.go +++ b/pkg/controllers/disruption/singlenodeconsolidation.go @@ -19,6 +19,7 @@ package disruption import ( "context" "fmt" + "sort" "time" "sigs.k8s.io/controller-runtime/pkg/log" @@ -33,10 +34,104 @@ const SingleNodeConsolidationType = "single" // SingleNodeConsolidation is the consolidation controller that performs single-node consolidation. type SingleNodeConsolidation struct { consolidation + // nodePoolsTimedOut tracks which nodepools were not fully considered due to timeout + nodePoolsTimedOut map[string]bool } func NewSingleNodeConsolidation(consolidation consolidation) *SingleNodeConsolidation { - return &SingleNodeConsolidation{consolidation: consolidation} + return &SingleNodeConsolidation{ + consolidation: consolidation, + nodePoolsTimedOut: make(map[string]bool), + } +} + +// Helper function to group candidates by nodepool +func (s *SingleNodeConsolidation) groupCandidatesByNodePool(candidates []*Candidate) (map[string][]*Candidate, []string) { + nodePoolCandidates := make(map[string][]*Candidate) + nodePoolNames := []string{} + + for _, candidate := range candidates { + nodePoolName := candidate.nodePool.Name + if _, exists := nodePoolCandidates[nodePoolName]; !exists { + nodePoolNames = append(nodePoolNames, nodePoolName) + } + nodePoolCandidates[nodePoolName] = append(nodePoolCandidates[nodePoolName], candidate) + } + return nodePoolCandidates, nodePoolNames +} + +// Helper function to sort nodepools with timed out ones prioritized +func (s *SingleNodeConsolidation) sortNodePoolsByTimeout(ctx context.Context, nodePoolNames []string) { + logger := log.FromContext(ctx) + + // Log the timed out nodepools that we're prioritizing + timedOutNodePools := []string{} + for np := range s.nodePoolsTimedOut { + timedOutNodePools = append(timedOutNodePools, np) + } + if len(timedOutNodePools) > 0 { + logger.V(1).Info("Prioritizing nodepools that timed out in previous runs", "nodepools", timedOutNodePools) + } + + // Prioritize nodepools that timed out in previous runs + sort.Slice(nodePoolNames, func(i, j int) bool { + // If nodepool i timed out but j didn't, i comes first + if s.nodePoolsTimedOut[nodePoolNames[i]] && !s.nodePoolsTimedOut[nodePoolNames[j]] { + return true + } + // If nodepool j timed out but i didn't, j comes first + if !s.nodePoolsTimedOut[nodePoolNames[i]] && s.nodePoolsTimedOut[nodePoolNames[j]] { + return false + } + // If both or neither timed out, keep original order + return i < j + }) + + logger.V(1).Info("Nodepool order after prioritization", "nodepools", nodePoolNames) +} + +// Helper function to interweave candidates from different nodepools +func (s *SingleNodeConsolidation) interweaveCandidates(nodePoolCandidates map[string][]*Candidate, nodePoolNames []string) []*Candidate { + result := make([]*Candidate, 0) + maxCandidatesPerNodePool := 0 + + // Find the maximum number of candidates in any nodepool + for _, nodePoolName := range nodePoolNames { + if len(nodePoolCandidates[nodePoolName]) > maxCandidatesPerNodePool { + maxCandidatesPerNodePool = len(nodePoolCandidates[nodePoolName]) + } + } + + // Interweave candidates from different nodepools + for i := range maxCandidatesPerNodePool { + for _, nodePoolName := range nodePoolNames { + if i < len(nodePoolCandidates[nodePoolName]) { + result = append(result, nodePoolCandidates[nodePoolName][i]) + } + } + } + + return result +} + +// sortCandidates interweaves candidates from different nodepools and prioritizes nodepools +// that timed out in previous runs +func (s *SingleNodeConsolidation) sortCandidates(candidates []*Candidate) []*Candidate { + ctx := context.Background() + + // First sort by disruption cost as the base ordering + sort.Slice(candidates, func(i int, j int) bool { + return candidates[i].disruptionCost < candidates[j].disruptionCost + }) + + // Group candidates by nodepool + nodePoolCandidates, nodePoolNames := s.groupCandidatesByNodePool(candidates) + + // Sort nodepools with timed out ones prioritized + s.sortNodePoolsByTimeout(ctx, nodePoolNames) + + // Interweave candidates from different nodepools + return s.interweaveCandidates(nodePoolCandidates, nodePoolNames) } // ComputeCommand generates a disruption command given candidates @@ -53,8 +148,19 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption timeout := s.clock.Now().Add(SingleNodeConsolidationTimeoutDuration) constrainedByBudgets := false - // binary search to find the maximum number of NodeClaims we can terminate + // Track which nodepools we've seen in this run + nodePoolsSeen := make(map[string]bool) + + // Get all unique nodepool names from candidates + allNodePools := make(map[string]bool) + for _, candidate := range candidates { + allNodePools[candidate.nodePool.Name] = true + } + for i, candidate := range candidates { + // Track that we've considered this nodepool + nodePoolsSeen[candidate.nodePool.Name] = true + // If the disruption budget doesn't allow this candidate to be disrupted, // continue to the next candidate. We don't need to decrement any budget // counter since single node consolidation commands can only have one candidate. @@ -70,7 +176,23 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption } if s.clock.Now().After(timeout) { ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: s.ConsolidationType()}) - log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i)) + logger := log.FromContext(ctx) + logger.V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i)) + + // Mark all nodepools that we haven't seen yet as timed out + timedOutNodePools := []string{} + for _, c := range candidates[i:] { + if !nodePoolsSeen[c.nodePool.Name] { + s.nodePoolsTimedOut[c.nodePool.Name] = true + timedOutNodePools = append(timedOutNodePools, c.nodePool.Name) + } + } + + // Log the nodepools that were timed out + if len(timedOutNodePools) > 0 { + logger.V(1).Info("Marking nodepools as timed out for prioritization in next run", "nodepools", timedOutNodePools) + } + return Command{}, scheduling.Results{}, nil } // compute a possible consolidation option @@ -82,6 +204,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption if cmd.Decision() == NoOpDecision { continue } + // might have some edge cases where if there is an error, we should remove the nodepool from the list of "seen" nodepools if err := v.IsValid(ctx, cmd, consolidationTTL); err != nil { if IsValidationError(err) { log.FromContext(ctx).V(1).WithValues(cmd.LogValues()...).Info("abandoning single-node consolidation attempt due to pod churn, command is no longer valid") @@ -91,6 +214,29 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption } return cmd, results, nil } + + // Check if we've considered all nodepools + allNodePoolsConsidered := true + for nodePool := range allNodePools { + if !nodePoolsSeen[nodePool] { + allNodePoolsConsidered = false + break + } + } + + // If we've considered all nodepools, reset the timed out nodepools + if allNodePoolsConsidered { + logger := log.FromContext(ctx) + timedOutNodePools := []string{} + for np := range s.nodePoolsTimedOut { + timedOutNodePools = append(timedOutNodePools, np) + } + if len(timedOutNodePools) > 0 { + logger.V(1).Info("Resetting timed out nodepools as all nodepools have been considered", "nodepools", timedOutNodePools) + } + s.nodePoolsTimedOut = make(map[string]bool) + } + if !constrainedByBudgets { // if there are no candidates because of a budget, don't mark // as consolidated, as it's possible it should be consolidatable From 88dbdfa5e9ee17cf1ec09e0cbc5cea0b668fdbd0 Mon Sep 17 00:00:00 2001 From: Reed Schalo Date: Wed, 26 Feb 2025 16:14:37 -0800 Subject: [PATCH 2/7] reduce logging --- .../disruption/singlenodeconsolidation.go | 34 +++---------------- 1 file changed, 4 insertions(+), 30 deletions(-) diff --git a/pkg/controllers/disruption/singlenodeconsolidation.go b/pkg/controllers/disruption/singlenodeconsolidation.go index e53f3fbeb4..4cfd19c3c1 100644 --- a/pkg/controllers/disruption/singlenodeconsolidation.go +++ b/pkg/controllers/disruption/singlenodeconsolidation.go @@ -45,7 +45,6 @@ func NewSingleNodeConsolidation(consolidation consolidation) *SingleNodeConsolid } } -// Helper function to group candidates by nodepool func (s *SingleNodeConsolidation) groupCandidatesByNodePool(candidates []*Candidate) (map[string][]*Candidate, []string) { nodePoolCandidates := make(map[string][]*Candidate) nodePoolNames := []string{} @@ -60,17 +59,14 @@ func (s *SingleNodeConsolidation) groupCandidatesByNodePool(candidates []*Candid return nodePoolCandidates, nodePoolNames } -// Helper function to sort nodepools with timed out ones prioritized func (s *SingleNodeConsolidation) sortNodePoolsByTimeout(ctx context.Context, nodePoolNames []string) { - logger := log.FromContext(ctx) - // Log the timed out nodepools that we're prioritizing timedOutNodePools := []string{} for np := range s.nodePoolsTimedOut { timedOutNodePools = append(timedOutNodePools, np) } if len(timedOutNodePools) > 0 { - logger.V(1).Info("Prioritizing nodepools that timed out in previous runs", "nodepools", timedOutNodePools) + log.FromContext(ctx).V(1).Info("prioritizing nodepools that have not yet been considered due to timeouts in previous runs: %v", timedOutNodePools) } // Prioritize nodepools that timed out in previous runs @@ -86,12 +82,9 @@ func (s *SingleNodeConsolidation) sortNodePoolsByTimeout(ctx context.Context, no // If both or neither timed out, keep original order return i < j }) - - logger.V(1).Info("Nodepool order after prioritization", "nodepools", nodePoolNames) } -// Helper function to interweave candidates from different nodepools -func (s *SingleNodeConsolidation) interweaveCandidates(nodePoolCandidates map[string][]*Candidate, nodePoolNames []string) []*Candidate { +func (s *SingleNodeConsolidation) shuffleCandidates(nodePoolCandidates map[string][]*Candidate, nodePoolNames []string) []*Candidate { result := make([]*Candidate, 0) maxCandidatesPerNodePool := 0 @@ -124,14 +117,11 @@ func (s *SingleNodeConsolidation) sortCandidates(candidates []*Candidate) []*Can return candidates[i].disruptionCost < candidates[j].disruptionCost }) - // Group candidates by nodepool nodePoolCandidates, nodePoolNames := s.groupCandidatesByNodePool(candidates) - // Sort nodepools with timed out ones prioritized s.sortNodePoolsByTimeout(ctx, nodePoolNames) - // Interweave candidates from different nodepools - return s.interweaveCandidates(nodePoolCandidates, nodePoolNames) + return s.shuffleCandidates(nodePoolCandidates, nodePoolNames) } // ComputeCommand generates a disruption command given candidates @@ -176,23 +166,15 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption } if s.clock.Now().After(timeout) { ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: s.ConsolidationType()}) - logger := log.FromContext(ctx) - logger.V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i)) + log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i)) // Mark all nodepools that we haven't seen yet as timed out - timedOutNodePools := []string{} for _, c := range candidates[i:] { if !nodePoolsSeen[c.nodePool.Name] { s.nodePoolsTimedOut[c.nodePool.Name] = true - timedOutNodePools = append(timedOutNodePools, c.nodePool.Name) } } - // Log the nodepools that were timed out - if len(timedOutNodePools) > 0 { - logger.V(1).Info("Marking nodepools as timed out for prioritization in next run", "nodepools", timedOutNodePools) - } - return Command{}, scheduling.Results{}, nil } // compute a possible consolidation option @@ -226,14 +208,6 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption // If we've considered all nodepools, reset the timed out nodepools if allNodePoolsConsidered { - logger := log.FromContext(ctx) - timedOutNodePools := []string{} - for np := range s.nodePoolsTimedOut { - timedOutNodePools = append(timedOutNodePools, np) - } - if len(timedOutNodePools) > 0 { - logger.V(1).Info("Resetting timed out nodepools as all nodepools have been considered", "nodepools", timedOutNodePools) - } s.nodePoolsTimedOut = make(map[string]bool) } From c30678ff0f7282138f40e9a9490881fbd5c0c9ac Mon Sep 17 00:00:00 2001 From: Reed Schalo Date: Wed, 26 Feb 2025 20:58:06 -0800 Subject: [PATCH 3/7] remove comments --- pkg/controllers/disruption/singlenodeconsolidation.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/controllers/disruption/singlenodeconsolidation.go b/pkg/controllers/disruption/singlenodeconsolidation.go index 4cfd19c3c1..083e38ad12 100644 --- a/pkg/controllers/disruption/singlenodeconsolidation.go +++ b/pkg/controllers/disruption/singlenodeconsolidation.go @@ -138,10 +138,8 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption timeout := s.clock.Now().Add(SingleNodeConsolidationTimeoutDuration) constrainedByBudgets := false - // Track which nodepools we've seen in this run nodePoolsSeen := make(map[string]bool) - // Get all unique nodepool names from candidates allNodePools := make(map[string]bool) for _, candidate := range candidates { allNodePools[candidate.nodePool.Name] = true From b04f8fa419907bffab1928aa97a30234830078dc Mon Sep 17 00:00:00 2001 From: Reed Schalo Date: Thu, 6 Mar 2025 13:08:48 -0800 Subject: [PATCH 4/7] add testing --- .../disruption/singlenodeconsolidation.go | 202 ++++++------ .../singlenodeconsolidation_test.go | 292 ++++++++++++++++++ pkg/controllers/disruption/suite_test.go | 20 +- pkg/controllers/disruption/types.go | 15 + pkg/test/expectations/expectations.go | 11 + 5 files changed, 435 insertions(+), 105 deletions(-) create mode 100644 pkg/controllers/disruption/singlenodeconsolidation_test.go diff --git a/pkg/controllers/disruption/singlenodeconsolidation.go b/pkg/controllers/disruption/singlenodeconsolidation.go index 083e38ad12..a375a76cb2 100644 --- a/pkg/controllers/disruption/singlenodeconsolidation.go +++ b/pkg/controllers/disruption/singlenodeconsolidation.go @@ -28,7 +28,8 @@ import ( "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" ) -const SingleNodeConsolidationTimeoutDuration = 3 * time.Minute +var SingleNodeConsolidationTimeoutDuration = 3 * time.Minute + const SingleNodeConsolidationType = "single" // SingleNodeConsolidation is the consolidation controller that performs single-node consolidation. @@ -45,85 +46,6 @@ func NewSingleNodeConsolidation(consolidation consolidation) *SingleNodeConsolid } } -func (s *SingleNodeConsolidation) groupCandidatesByNodePool(candidates []*Candidate) (map[string][]*Candidate, []string) { - nodePoolCandidates := make(map[string][]*Candidate) - nodePoolNames := []string{} - - for _, candidate := range candidates { - nodePoolName := candidate.nodePool.Name - if _, exists := nodePoolCandidates[nodePoolName]; !exists { - nodePoolNames = append(nodePoolNames, nodePoolName) - } - nodePoolCandidates[nodePoolName] = append(nodePoolCandidates[nodePoolName], candidate) - } - return nodePoolCandidates, nodePoolNames -} - -func (s *SingleNodeConsolidation) sortNodePoolsByTimeout(ctx context.Context, nodePoolNames []string) { - // Log the timed out nodepools that we're prioritizing - timedOutNodePools := []string{} - for np := range s.nodePoolsTimedOut { - timedOutNodePools = append(timedOutNodePools, np) - } - if len(timedOutNodePools) > 0 { - log.FromContext(ctx).V(1).Info("prioritizing nodepools that have not yet been considered due to timeouts in previous runs: %v", timedOutNodePools) - } - - // Prioritize nodepools that timed out in previous runs - sort.Slice(nodePoolNames, func(i, j int) bool { - // If nodepool i timed out but j didn't, i comes first - if s.nodePoolsTimedOut[nodePoolNames[i]] && !s.nodePoolsTimedOut[nodePoolNames[j]] { - return true - } - // If nodepool j timed out but i didn't, j comes first - if !s.nodePoolsTimedOut[nodePoolNames[i]] && s.nodePoolsTimedOut[nodePoolNames[j]] { - return false - } - // If both or neither timed out, keep original order - return i < j - }) -} - -func (s *SingleNodeConsolidation) shuffleCandidates(nodePoolCandidates map[string][]*Candidate, nodePoolNames []string) []*Candidate { - result := make([]*Candidate, 0) - maxCandidatesPerNodePool := 0 - - // Find the maximum number of candidates in any nodepool - for _, nodePoolName := range nodePoolNames { - if len(nodePoolCandidates[nodePoolName]) > maxCandidatesPerNodePool { - maxCandidatesPerNodePool = len(nodePoolCandidates[nodePoolName]) - } - } - - // Interweave candidates from different nodepools - for i := range maxCandidatesPerNodePool { - for _, nodePoolName := range nodePoolNames { - if i < len(nodePoolCandidates[nodePoolName]) { - result = append(result, nodePoolCandidates[nodePoolName][i]) - } - } - } - - return result -} - -// sortCandidates interweaves candidates from different nodepools and prioritizes nodepools -// that timed out in previous runs -func (s *SingleNodeConsolidation) sortCandidates(candidates []*Candidate) []*Candidate { - ctx := context.Background() - - // First sort by disruption cost as the base ordering - sort.Slice(candidates, func(i int, j int) bool { - return candidates[i].disruptionCost < candidates[j].disruptionCost - }) - - nodePoolCandidates, nodePoolNames := s.groupCandidatesByNodePool(candidates) - - s.sortNodePoolsByTimeout(ctx, nodePoolNames) - - return s.shuffleCandidates(nodePoolCandidates, nodePoolNames) -} - // ComputeCommand generates a disruption command given candidates // nolint:gocyclo func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, scheduling.Results, error) { @@ -146,6 +68,19 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption } for i, candidate := range candidates { + if s.clock.Now().After(timeout) { + ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: s.ConsolidationType()}) + log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i)) + + // Mark all nodepools that we haven't seen yet as timed out + for _, c := range candidates[i:] { + if !nodePoolsSeen[c.nodePool.Name] { + s.nodePoolsTimedOut[c.nodePool.Name] = true + } + } + + return Command{}, scheduling.Results{}, nil + } // Track that we've considered this nodepool nodePoolsSeen[candidate.nodePool.Name] = true @@ -162,19 +97,6 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption if len(candidate.reschedulablePods) == 0 { continue } - if s.clock.Now().After(timeout) { - ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: s.ConsolidationType()}) - log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i)) - - // Mark all nodepools that we haven't seen yet as timed out - for _, c := range candidates[i:] { - if !nodePoolsSeen[c.nodePool.Name] { - s.nodePoolsTimedOut[c.nodePool.Name] = true - } - } - - return Command{}, scheduling.Results{}, nil - } // compute a possible consolidation option cmd, results, err := s.computeConsolidation(ctx, candidate) if err != nil { @@ -229,3 +151,97 @@ func (s *SingleNodeConsolidation) Class() string { func (s *SingleNodeConsolidation) ConsolidationType() string { return SingleNodeConsolidationType } + +func (s *SingleNodeConsolidation) groupCandidatesByNodePool(candidates []*Candidate) (map[string][]*Candidate, []string) { + nodePoolCandidates := make(map[string][]*Candidate) + nodePoolNames := []string{} + + for _, candidate := range candidates { + nodePoolName := candidate.nodePool.Name + if _, exists := nodePoolCandidates[nodePoolName]; !exists { + nodePoolNames = append(nodePoolNames, nodePoolName) + } + nodePoolCandidates[nodePoolName] = append(nodePoolCandidates[nodePoolName], candidate) + } + return nodePoolCandidates, nodePoolNames +} + +func (s *SingleNodeConsolidation) sortNodePoolsByTimeout(ctx context.Context, nodePoolNames []string) { + // Log the timed out nodepools that we're prioritizing + timedOutNodePools := []string{} + for np := range s.nodePoolsTimedOut { + timedOutNodePools = append(timedOutNodePools, np) + } + if len(timedOutNodePools) > 0 { + log.FromContext(ctx).V(1).Info("prioritizing nodepools that have not yet been considered due to timeouts in previous runs: %v", timedOutNodePools) + } + + // Prioritize nodepools that timed out in previous runs + sort.Slice(nodePoolNames, func(i, j int) bool { + // If nodepool i timed out but j didn't, i comes first + if s.nodePoolsTimedOut[nodePoolNames[i]] && !s.nodePoolsTimedOut[nodePoolNames[j]] { + return true + } + // If nodepool j timed out but i didn't, j comes first + if !s.nodePoolsTimedOut[nodePoolNames[i]] && s.nodePoolsTimedOut[nodePoolNames[j]] { + return false + } + // If both or neither timed out, keep original order + return i < j + }) +} + +func (s *SingleNodeConsolidation) shuffleCandidates(nodePoolCandidates map[string][]*Candidate, nodePoolNames []string) []*Candidate { + result := make([]*Candidate, 0) + maxCandidatesPerNodePool := 0 + + // Find the maximum number of candidates in any nodepool + for _, nodePoolName := range nodePoolNames { + if len(nodePoolCandidates[nodePoolName]) > maxCandidatesPerNodePool { + maxCandidatesPerNodePool = len(nodePoolCandidates[nodePoolName]) + } + } + + // Interweave candidates from different nodepools + for i := range maxCandidatesPerNodePool { + for _, nodePoolName := range nodePoolNames { + if i < len(nodePoolCandidates[nodePoolName]) { + result = append(result, nodePoolCandidates[nodePoolName][i]) + } + } + } + + return result +} + +// sortCandidates interweaves candidates from different nodepools and prioritizes nodepools +// that timed out in previous runs +func (s *SingleNodeConsolidation) sortCandidates(candidates []*Candidate) []*Candidate { + ctx := context.Background() + + // First sort by disruption cost as the base ordering + sort.Slice(candidates, func(i int, j int) bool { + return candidates[i].disruptionCost < candidates[j].disruptionCost + }) + + nodePoolCandidates, nodePoolNames := s.groupCandidatesByNodePool(candidates) + + s.sortNodePoolsByTimeout(ctx, nodePoolNames) + + return s.shuffleCandidates(nodePoolCandidates, nodePoolNames) +} + +// SortCandidates is a public wrapper around sortCandidates for testing +func (s *SingleNodeConsolidation) SortCandidates(candidates []*Candidate) []*Candidate { + return s.sortCandidates(candidates) +} + +// MarkNodePoolTimedOut marks a nodepool as timed out for testing +func (s *SingleNodeConsolidation) MarkNodePoolTimedOut(nodePoolName string) { + s.nodePoolsTimedOut[nodePoolName] = true +} + +// IsNodePoolTimedOut checks if a nodepool is marked as timed out for testing +func (s *SingleNodeConsolidation) IsNodePoolTimedOut(nodePoolName string) bool { + return s.nodePoolsTimedOut[nodePoolName] +} diff --git a/pkg/controllers/disruption/singlenodeconsolidation_test.go b/pkg/controllers/disruption/singlenodeconsolidation_test.go new file mode 100644 index 0000000000..f9742b9266 --- /dev/null +++ b/pkg/controllers/disruption/singlenodeconsolidation_test.go @@ -0,0 +1,292 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disruption_test + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" + "sigs.k8s.io/karpenter/pkg/controllers/disruption" + "sigs.k8s.io/karpenter/pkg/test" + . "sigs.k8s.io/karpenter/pkg/test/expectations" + "sigs.k8s.io/karpenter/pkg/utils/pdb" +) + +var nodePool1, nodePool2, nodePool3 *v1.NodePool +var consolidation *disruption.SingleNodeConsolidation +var nodePoolMap map[string]*v1.NodePool +var nodePoolInstanceTypeMap map[string]map[string]*cloudprovider.InstanceType + +var _ = Describe("SingleNodeConsolidation", func() { + BeforeEach(func() { + nodePool1 = test.NodePool(v1.NodePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nodepool-1", + }, + Spec: v1.NodePoolSpec{ + Disruption: v1.Disruption{ + ConsolidationPolicy: v1.ConsolidationPolicyWhenEmptyOrUnderutilized, + ConsolidateAfter: v1.MustParseNillableDuration("0s"), + }, + }, + }) + nodePool2 = test.NodePool(v1.NodePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nodepool-2", + }, + Spec: v1.NodePoolSpec{ + Disruption: v1.Disruption{ + ConsolidationPolicy: v1.ConsolidationPolicyWhenEmptyOrUnderutilized, + ConsolidateAfter: v1.MustParseNillableDuration("0s"), + }, + }, + }) + nodePool3 = test.NodePool(v1.NodePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nodepool-3", + }, + Spec: v1.NodePoolSpec{ + Disruption: v1.Disruption{ + ConsolidationPolicy: v1.ConsolidationPolicyWhenEmptyOrUnderutilized, + ConsolidateAfter: v1.MustParseNillableDuration("0s"), + }, + }, + }) + ExpectApplied(ctx, env.Client, nodePool1, nodePool2, nodePool3) + + // Set up NodePool maps for candidate creation + nodePoolMap = map[string]*v1.NodePool{ + nodePool1.Name: nodePool1, + nodePool2.Name: nodePool2, + nodePool3.Name: nodePool3, + } + nodePoolInstanceTypeMap = map[string]map[string]*cloudprovider.InstanceType{ + nodePool1.Name: {leastExpensiveInstance.Name: leastExpensiveInstance}, + nodePool2.Name: {leastExpensiveInstance.Name: leastExpensiveInstance}, + nodePool3.Name: {leastExpensiveInstance.Name: leastExpensiveInstance}, + } + + // Create a base consolidation + baseConsolidation := disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue) + consolidation = disruption.NewSingleNodeConsolidation(baseConsolidation) + }) + + AfterEach(func() { + disruption.SingleNodeConsolidationTimeoutDuration = 3 * time.Minute + ExpectCleanedUp(ctx, env.Client) + }) + + Context("candidate shuffling", func() { + It("should sort candidates by disruption cost", func() { + candidates, err := createCandidates(1.0, 3) + Expect(err).To(BeNil()) + + sortedCandidates := consolidation.SortCandidates(candidates) + + // Verify candidates are sorted by disruption cost + Expect(sortedCandidates).To(HaveLen(9)) + for i := 0; i < len(sortedCandidates)-1; i++ { + Expect(sortedCandidates[i].DisruptionCost()).To(BeNumerically("<=", sortedCandidates[i+1].DisruptionCost())) + } + }) + + It("should prioritize nodepools that timed out in previous runs", func() { + candidates, err := createCandidates(1.0, 3) + Expect(err).To(BeNil()) + + consolidation.MarkNodePoolTimedOut(nodePool2.Name) + + sortedCandidates := consolidation.SortCandidates(candidates) + + Expect(sortedCandidates).To(HaveLen(9)) + Expect(sortedCandidates[0].NodePool().Name).To(Equal(nodePool2.Name)) + }) + + It("should interweave candidates from different nodepools", func() { + // Create candidates with different disruption costs + // We'll create 3 sets of candidates with costs 1.0, 2.0, and 3.0 + // Use 1 node per nodepool to make the test more predictable + candidates1, err := createCandidates(1.0, 1) + Expect(err).To(BeNil()) + + candidates2, err := createCandidates(2.0, 1) + Expect(err).To(BeNil()) + + candidates3, err := createCandidates(3.0, 1) + Expect(err).To(BeNil()) + + // Combine all candidates + allCandidates := append(candidates1, append(candidates2, candidates3...)...) + + // Sort candidates + sortedCandidates := consolidation.SortCandidates(allCandidates) + + // Verify candidates are interweaved from different nodepools + // First we should have all candidates with disruption cost 1, then 2, then 3 + // Within each cost group, we should have one from each nodepool + Expect(sortedCandidates).To(HaveLen(9)) + + // Check first three candidates (all with cost 1) + nodePoolsInFirstGroup := []string{ + sortedCandidates[0].NodePool().Name, + sortedCandidates[1].NodePool().Name, + sortedCandidates[2].NodePool().Name, + } + Expect(nodePoolsInFirstGroup).To(ConsistOf(nodePool1.Name, nodePool2.Name, nodePool3.Name)) + + // Check next three candidates (all with cost 2) + nodePoolsInSecondGroup := []string{ + sortedCandidates[3].NodePool().Name, + sortedCandidates[4].NodePool().Name, + sortedCandidates[5].NodePool().Name, + } + Expect(nodePoolsInSecondGroup).To(ConsistOf(nodePool1.Name, nodePool2.Name, nodePool3.Name)) + + // Check last three candidates (all with cost 3) + nodePoolsInThirdGroup := []string{ + sortedCandidates[6].NodePool().Name, + sortedCandidates[7].NodePool().Name, + sortedCandidates[8].NodePool().Name, + } + Expect(nodePoolsInThirdGroup).To(ConsistOf(nodePool1.Name, nodePool2.Name, nodePool3.Name)) + }) + + It("should reset timed out nodepools when all nodepools are evaluated", func() { + // Create candidates from different nodepools + candidates, err := createCandidates(1.0, 1) + Expect(err).To(BeNil()) + + // Mark nodePool2 as timed out + consolidation.MarkNodePoolTimedOut(nodePool2.Name) + Expect(consolidation.IsNodePoolTimedOut(nodePool2.Name)).To(BeTrue()) + + // Create a budget mapping that allows all disruptions + budgetMapping := map[string]int{ + nodePool1.Name: 1, + nodePool2.Name: 1, + nodePool3.Name: 1, + } + + // Call ComputeCommand which should process all nodepools + _, _, _ = consolidation.ComputeCommand(ctx, budgetMapping, candidates...) + + // Verify nodePool2 is no longer marked as timed out + Expect(consolidation.IsNodePoolTimedOut(nodePool2.Name)).To(BeFalse()) + }) + + It("should mark nodepools as timed out when timeout occurs", func() { + disruption.SingleNodeConsolidationTimeoutDuration = -5 * time.Second + // Create many candidates to trigger timeout + candidates, err := createCandidates(1.0, 10) + Expect(err).To(BeNil()) + + // Create a budget mapping that allows all disruptions + budgetMapping := map[string]int{ + nodePool1.Name: 30, + nodePool2.Name: 30, + nodePool3.Name: 30, + } + + _, _, _ = consolidation.ComputeCommand(ctx, budgetMapping, candidates...) + + // Verify all nodepools are marked as timed out + // since we timed out before processing any candidates + Expect(consolidation.IsNodePoolTimedOut(nodePool1.Name)).To(BeTrue()) + Expect(consolidation.IsNodePoolTimedOut(nodePool2.Name)).To(BeTrue()) + Expect(consolidation.IsNodePoolTimedOut(nodePool3.Name)).To(BeTrue()) + }) + }) +}) + +func createCandidates(disruptionCost float64, nodesPerNodePool ...int) ([]*disruption.Candidate, error) { + // Default to 3 nodes per nodepool if not specified + numNodesPerNodePool := 3 + if len(nodesPerNodePool) > 0 && nodesPerNodePool[0] > 0 { + numNodesPerNodePool = nodesPerNodePool[0] + } + + // Create NodeClaims for each NodePool + nodeClaims := []*v1.NodeClaim{} + + // Create NodeClaims and Nodes for each NodePool + for _, nodePool := range []*v1.NodePool{nodePool1, nodePool2, nodePool3} { + for i := 0; i < numNodesPerNodePool; i++ { + nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + corev1.LabelInstanceTypeStable: leastExpensiveInstance.Name, + v1.CapacityTypeLabelKey: leastExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: leastExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("32")}, + }, + }) + pod := test.Pod() + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod) + ExpectManualBinding(ctx, env.Client, pod, node) + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable) + ExpectApplied(ctx, env.Client, nodeClaim) + + // Ensure the state is updated after all changes + ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) + ExpectReconcileSucceeded(ctx, nodeClaimStateController, client.ObjectKeyFromObject(nodeClaim)) + + nodeClaims = append(nodeClaims, nodeClaim) + } + } + + limits, err := pdb.NewLimits(ctx, fakeClock, env.Client) + if err != nil { + return nil, err + } + + candidates := []*disruption.Candidate{} + for _, nodeClaim := range nodeClaims { + stateNode := ExpectStateNodeExistsForNodeClaim(cluster, nodeClaim) + candidate, err := disruption.NewCandidate( + ctx, + env.Client, + recorder, + fakeClock, + stateNode, + limits, + nodePoolMap, + nodePoolInstanceTypeMap, + queue, + disruption.GracefulDisruptionClass, + ) + if err != nil { + return nil, err + } + candidate.SetDisruptionCost(disruptionCost) + candidates = append(candidates, candidate) + } + + return candidates, nil +} diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index 42cd1afd38..4a3232bef9 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -24,10 +24,6 @@ import ( "testing" "time" - "k8s.io/client-go/util/workqueue" - clockiface "k8s.io/utils/clock" - - "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/metrics" "sigs.k8s.io/karpenter/pkg/test/v1alpha1" @@ -97,7 +93,7 @@ var _ = BeforeSuite(func() { nodeClaimStateController = informer.NewNodeClaimController(env.Client, cloudProvider, cluster) recorder = test.NewEventRecorder() prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, fakeClock) - queue = NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov) + queue = ExpectNewTestingQueue(env.Client, recorder, cluster, fakeClock, prov) disruptionController = disruption.NewController(fakeClock, env.Client, prov, cloudProvider, recorder, cluster, queue) }) @@ -117,7 +113,7 @@ var _ = BeforeEach(func() { } fakeClock.SetTime(time.Now()) cluster.Reset() - *queue = lo.FromPtr(NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)) + *queue = lo.FromPtr(ExpectNewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)) cluster.MarkUnconsolidated() // Reset Feature Flags to test defaults @@ -2147,10 +2143,10 @@ func ExpectMakeNewNodeClaimsReady(ctx context.Context, c client.Client, wg *sync }() } -func NewTestingQueue(kubeClient client.Client, recorder events.Recorder, cluster *state.Cluster, clock clockiface.Clock, - provisioner *provisioning.Provisioner) *orchestration.Queue { +// func NewTestingQueue(kubeClient client.Client, recorder events.Recorder, cluster *state.Cluster, clock clockiface.Clock, +// provisioner *provisioning.Provisioner) *orchestration.Queue { - q := orchestration.NewQueue(kubeClient, recorder, cluster, clock, provisioner) - q.TypedRateLimitingInterface = test.NewTypedRateLimitingInterface[*orchestration.Command](workqueue.TypedQueueConfig[*orchestration.Command]{Name: "disruption.workqueue"}) - return q -} +// q := orchestration.NewQueue(kubeClient, recorder, cluster, clock, provisioner) +// q.TypedRateLimitingInterface = test.NewTypedRateLimitingInterface[*orchestration.Command](workqueue.TypedQueueConfig[*orchestration.Command]{Name: "disruption.workqueue"}) +// return q +// } diff --git a/pkg/controllers/disruption/types.go b/pkg/controllers/disruption/types.go index 7ef8059d0c..4ae575d4c4 100644 --- a/pkg/controllers/disruption/types.go +++ b/pkg/controllers/disruption/types.go @@ -171,3 +171,18 @@ func (c Command) LogValues() []any { "replacement-nodes", replacementNodes, } } + +// SetDisruptionCost sets the disruption cost of a candidate for testing +func (c *Candidate) SetDisruptionCost(cost float64) { + c.disruptionCost = cost +} + +// DisruptionCost returns the disruption cost of a candidate for testing +func (c *Candidate) DisruptionCost() float64 { + return c.disruptionCost +} + +// NodePool returns the nodepool of a candidate for testing +func (c *Candidate) NodePool() *v1.NodePool { + return c.nodePool +} diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index 060ec78b43..6333d3436f 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -48,6 +48,8 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/util/workqueue" + clockiface "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -56,11 +58,13 @@ import ( v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" + "sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration" "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/lifecycle" "sigs.k8s.io/karpenter/pkg/controllers/provisioning" "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/controllers/state/informer" + "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/metrics" pscheduling "sigs.k8s.io/karpenter/pkg/scheduling" "sigs.k8s.io/karpenter/pkg/test" @@ -734,3 +738,10 @@ func ConsistentlyExpectNotTerminating(ctx context.Context, c client.Client, objs } }, time.Second).Should(Succeed()) } + +func ExpectNewTestingQueue(c client.Client, recorder events.Recorder, cluster *state.Cluster, clock clockiface.Clock, provisioner *provisioning.Provisioner) *orchestration.Queue { + GinkgoHelper() + queue := orchestration.NewQueue(c, recorder, cluster, clock, provisioner) + queue.TypedRateLimitingInterface = test.NewTypedRateLimitingInterface(workqueue.TypedQueueConfig[*orchestration.Command]{Name: "disruption.workqueue"}) + return queue +} From 4d94c4cf9283ecc3c20e33ad92aa8d2b680bbab5 Mon Sep 17 00:00:00 2001 From: Reed Schalo Date: Thu, 6 Mar 2025 13:24:39 -0800 Subject: [PATCH 5/7] cleaning up --- .../disruption/singlenodeconsolidation.go | 37 ++++++++++--------- .../singlenodeconsolidation_test.go | 2 +- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/pkg/controllers/disruption/singlenodeconsolidation.go b/pkg/controllers/disruption/singlenodeconsolidation.go index a375a76cb2..c942b02872 100644 --- a/pkg/controllers/disruption/singlenodeconsolidation.go +++ b/pkg/controllers/disruption/singlenodeconsolidation.go @@ -35,7 +35,7 @@ const SingleNodeConsolidationType = "single" // SingleNodeConsolidation is the consolidation controller that performs single-node consolidation. type SingleNodeConsolidation struct { consolidation - // nodePoolsTimedOut tracks which nodepools were not fully considered due to timeout + // nodePoolsTimedOut tracks which nodepools were not considered due to the timeout nodePoolsTimedOut map[string]bool } @@ -68,6 +68,19 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption } for i, candidate := range candidates { + // If the disruption budget doesn't allow this candidate to be disrupted, + // continue to the next candidate. We don't need to decrement any budget + // counter since single node consolidation commands can only have one candidate. + if disruptionBudgetMapping[candidate.nodePool.Name] == 0 { + constrainedByBudgets = true + continue + } + // Filter out empty candidates. If there was an empty node that wasn't consolidated before this, we should + // assume that it was due to budgets. If we don't filter out budgets, users who set a budget for `empty` + // can find their nodes disrupted here. + if len(candidate.reschedulablePods) == 0 { + continue + } if s.clock.Now().After(timeout) { ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: s.ConsolidationType()}) log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i)) @@ -81,22 +94,10 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption return Command{}, scheduling.Results{}, nil } + // Track that we've considered this nodepool nodePoolsSeen[candidate.nodePool.Name] = true - // If the disruption budget doesn't allow this candidate to be disrupted, - // continue to the next candidate. We don't need to decrement any budget - // counter since single node consolidation commands can only have one candidate. - if disruptionBudgetMapping[candidate.nodePool.Name] == 0 { - constrainedByBudgets = true - continue - } - // Filter out empty candidates. If there was an empty node that wasn't consolidated before this, we should - // assume that it was due to budgets. If we don't filter out budgets, users who set a budget for `empty` - // can find their nodes disrupted here. - if len(candidate.reschedulablePods) == 0 { - continue - } // compute a possible consolidation option cmd, results, err := s.computeConsolidation(ctx, candidate) if err != nil { @@ -106,7 +107,6 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption if cmd.Decision() == NoOpDecision { continue } - // might have some edge cases where if there is an error, we should remove the nodepool from the list of "seen" nodepools if err := v.IsValid(ctx, cmd, consolidationTTL); err != nil { if IsValidationError(err) { log.FromContext(ctx).V(1).WithValues(cmd.LogValues()...).Info("abandoning single-node consolidation attempt due to pod churn, command is no longer valid") @@ -167,14 +167,15 @@ func (s *SingleNodeConsolidation) groupCandidatesByNodePool(candidates []*Candid } func (s *SingleNodeConsolidation) sortNodePoolsByTimeout(ctx context.Context, nodePoolNames []string) { + if len(s.nodePoolsTimedOut) == 0 { + return + } // Log the timed out nodepools that we're prioritizing timedOutNodePools := []string{} for np := range s.nodePoolsTimedOut { timedOutNodePools = append(timedOutNodePools, np) } - if len(timedOutNodePools) > 0 { - log.FromContext(ctx).V(1).Info("prioritizing nodepools that have not yet been considered due to timeouts in previous runs: %v", timedOutNodePools) - } + log.FromContext(ctx).V(1).Info("prioritizing nodepools that have not yet been considered due to timeouts in previous runs: %v", timedOutNodePools) // Prioritize nodepools that timed out in previous runs sort.Slice(nodePoolNames, func(i, j int) bool { diff --git a/pkg/controllers/disruption/singlenodeconsolidation_test.go b/pkg/controllers/disruption/singlenodeconsolidation_test.go index f9742b9266..60829dfec6 100644 --- a/pkg/controllers/disruption/singlenodeconsolidation_test.go +++ b/pkg/controllers/disruption/singlenodeconsolidation_test.go @@ -88,7 +88,7 @@ var _ = Describe("SingleNodeConsolidation", func() { nodePool3.Name: {leastExpensiveInstance.Name: leastExpensiveInstance}, } - // Create a base consolidation + // Create a single node consolidation controller baseConsolidation := disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue) consolidation = disruption.NewSingleNodeConsolidation(baseConsolidation) }) From f1dd03fee8e8783448a5da26833b796f6340602a Mon Sep 17 00:00:00 2001 From: Reed Schalo Date: Thu, 6 Mar 2025 13:35:25 -0800 Subject: [PATCH 6/7] test addition --- .../disruption/singlenodeconsolidation_test.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pkg/controllers/disruption/singlenodeconsolidation_test.go b/pkg/controllers/disruption/singlenodeconsolidation_test.go index 60829dfec6..c0b7ef5c59 100644 --- a/pkg/controllers/disruption/singlenodeconsolidation_test.go +++ b/pkg/controllers/disruption/singlenodeconsolidation_test.go @@ -95,6 +95,7 @@ var _ = Describe("SingleNodeConsolidation", func() { AfterEach(func() { disruption.SingleNodeConsolidationTimeoutDuration = 3 * time.Minute + fakeClock.SetTime(time.Now()) ExpectCleanedUp(ctx, env.Client) }) @@ -138,7 +139,7 @@ var _ = Describe("SingleNodeConsolidation", func() { Expect(err).To(BeNil()) // Combine all candidates - allCandidates := append(candidates1, append(candidates2, candidates3...)...) + allCandidates := append(candidates3, append(candidates2, candidates1...)...) // Sort candidates sortedCandidates := consolidation.SortCandidates(allCandidates) @@ -155,6 +156,9 @@ var _ = Describe("SingleNodeConsolidation", func() { sortedCandidates[2].NodePool().Name, } Expect(nodePoolsInFirstGroup).To(ConsistOf(nodePool1.Name, nodePool2.Name, nodePool3.Name)) + for i := 0; i < 3; i++ { + Expect(sortedCandidates[i].DisruptionCost()).To(Equal(1.0)) + } // Check next three candidates (all with cost 2) nodePoolsInSecondGroup := []string{ @@ -163,6 +167,9 @@ var _ = Describe("SingleNodeConsolidation", func() { sortedCandidates[5].NodePool().Name, } Expect(nodePoolsInSecondGroup).To(ConsistOf(nodePool1.Name, nodePool2.Name, nodePool3.Name)) + for i := 3; i < 6; i++ { + Expect(sortedCandidates[i].DisruptionCost()).To(Equal(2.0)) + } // Check last three candidates (all with cost 3) nodePoolsInThirdGroup := []string{ @@ -171,6 +178,9 @@ var _ = Describe("SingleNodeConsolidation", func() { sortedCandidates[8].NodePool().Name, } Expect(nodePoolsInThirdGroup).To(ConsistOf(nodePool1.Name, nodePool2.Name, nodePool3.Name)) + for i := 6; i < 9; i++ { + Expect(sortedCandidates[i].DisruptionCost()).To(Equal(3.0)) + } }) It("should reset timed out nodepools when all nodepools are evaluated", func() { From d1008d85f5b4d6a01ea19917ba7f1fb81e705458 Mon Sep 17 00:00:00 2001 From: Reed Schalo Date: Thu, 6 Mar 2025 14:14:17 -0800 Subject: [PATCH 7/7] clean out commented code --- pkg/controllers/disruption/suite_test.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index 4a3232bef9..b16143f51e 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -2142,11 +2142,3 @@ func ExpectMakeNewNodeClaimsReady(ctx context.Context, c client.Client, wg *sync } }() } - -// func NewTestingQueue(kubeClient client.Client, recorder events.Recorder, cluster *state.Cluster, clock clockiface.Clock, -// provisioner *provisioning.Provisioner) *orchestration.Queue { - -// q := orchestration.NewQueue(kubeClient, recorder, cluster, clock, provisioner) -// q.TypedRateLimitingInterface = test.NewTypedRateLimitingInterface[*orchestration.Command](workqueue.TypedQueueConfig[*orchestration.Command]{Name: "disruption.workqueue"}) -// return q -// }