Skip to content

perf: shuffle disruption candidates by nodepool #2035

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Mar 18, 2025
10 changes: 5 additions & 5 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ func (c *consolidation) ShouldDisrupt(_ context.Context, cn *Candidate) bool {
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("Node does not have label %q", corev1.LabelTopologyZone))...)
return false
}
if cn.nodePool.Spec.Disruption.ConsolidateAfter.Duration == nil {
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("NodePool %q has consolidation disabled", cn.nodePool.Name))...)
if cn.NodePool.Spec.Disruption.ConsolidateAfter.Duration == nil {
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("NodePool %q has consolidation disabled", cn.NodePool.Name))...)
return false
}
// If we don't have the "WhenEmptyOrUnderutilized" policy set, we should not do any of the consolidation methods, but
// we should also not fire an event here to users since this can be confusing when the field on the NodePool
// is named "consolidationPolicy"
if cn.nodePool.Spec.Disruption.ConsolidationPolicy != v1.ConsolidationPolicyWhenEmptyOrUnderutilized {
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("NodePool %q has non-empty consolidation disabled", cn.nodePool.Name))...)
if cn.NodePool.Spec.Disruption.ConsolidationPolicy != v1.ConsolidationPolicyWhenEmptyOrUnderutilized {
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("NodePool %q has non-empty consolidation disabled", cn.NodePool.Name))...)
return false
}
// return true if consolidatable
Expand All @@ -122,7 +122,7 @@ func (c *consolidation) ShouldDisrupt(_ context.Context, cn *Candidate) bool {
// sortCandidates sorts candidates by disruption cost (where the lowest disruption cost is first) and returns the result
func (c *consolidation) sortCandidates(candidates []*Candidate) []*Candidate {
sort.Slice(candidates, func(i int, j int) bool {
return candidates[i].disruptionCost < candidates[j].disruptionCost
return candidates[i].DisruptionCost < candidates[j].DisruptionCost
})
return candidates
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/disruption/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[
}
// If there's disruptions allowed for the candidate's nodepool,
// add it to the list of candidates, and decrement the budget.
if disruptionBudgetMapping[candidate.nodePool.Name] > 0 {
if disruptionBudgetMapping[candidate.NodePool.Name] > 0 {
empty = append(empty, candidate)
disruptionBudgetMapping[candidate.nodePool.Name]--
disruptionBudgetMapping[candidate.NodePool.Name]--
}
}
// Disrupt all empty drifted candidates, as they require no scheduling simulations.
Expand All @@ -88,7 +88,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[
// If the disruption budget doesn't allow this candidate to be disrupted,
// continue to the next candidate. We don't need to decrement any budget
// counter since drift commands can only have one candidate.
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
if disruptionBudgetMapping[candidate.NodePool.Name] == 0 {
continue
}
// Check if we need to create any NodeClaims.
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/disruption/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func NewEmptiness(c consolidation) *Emptiness {
// ShouldDisrupt is a predicate used to filter candidates
func (e *Emptiness) ShouldDisrupt(_ context.Context, c *Candidate) bool {
// If consolidation is disabled, don't do anything. This emptiness should run for both WhenEmpty and WhenEmptyOrUnderutilized
if c.nodePool.Spec.Disruption.ConsolidateAfter.Duration == nil {
e.recorder.Publish(disruptionevents.Unconsolidatable(c.Node, c.NodeClaim, fmt.Sprintf("NodePool %q has consolidation disabled", c.nodePool.Name))...)
if c.NodePool.Spec.Disruption.ConsolidateAfter.Duration == nil {
e.recorder.Publish(disruptionevents.Unconsolidatable(c.Node, c.NodeClaim, fmt.Sprintf("NodePool %q has consolidation disabled", c.NodePool.Name))...)
return false
}
// return true if there are no pods and the nodeclaim is consolidatable
Expand All @@ -66,15 +66,15 @@ func (e *Emptiness) ComputeCommand(ctx context.Context, disruptionBudgetMapping
if len(candidate.reschedulablePods) > 0 {
continue
}
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
if disruptionBudgetMapping[candidate.NodePool.Name] == 0 {
// set constrainedByBudgets to true if any node was a candidate but was constrained by a budget
constrainedByBudgets = true
continue
}
// If there's disruptions allowed for the candidate's nodepool,
// add it to the list of candidates, and decrement the budget.
empty = append(empty, candidate)
disruptionBudgetMapping[candidate.nodePool.Name]--
disruptionBudgetMapping[candidate.NodePool.Name]--
}
// none empty, so do nothing
if len(empty) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
for _, candidate := range candidates {
// If there's disruptions allowed for the candidate's nodepool,
// add it to the list of candidates, and decrement the budget.
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
if disruptionBudgetMapping[candidate.NodePool.Name] == 0 {
constrainedByBudgets = true
continue
}
Expand All @@ -73,7 +73,7 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
}
// set constrainedByBudgets to true if any node was a candidate but was constrained by a budget
disruptableCandidates = append(disruptableCandidates, candidate)
disruptionBudgetMapping[candidate.nodePool.Name]--
disruptionBudgetMapping[candidate.NodePool.Name]--
}

// Only consider a maximum batch of 100 NodeClaims to save on computation.
Expand Down
71 changes: 66 additions & 5 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,33 @@ package disruption
import (
"context"
"fmt"
"sort"
"strings"
"time"

"github.com/samber/lo"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/log"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
)

const SingleNodeConsolidationTimeoutDuration = 3 * time.Minute
var SingleNodeConsolidationTimeoutDuration = 3 * time.Minute

const SingleNodeConsolidationType = "single"

// SingleNodeConsolidation is the consolidation controller that performs single-node consolidation.
type SingleNodeConsolidation struct {
consolidation
PreviouslyUnseenNodePools sets.Set[string]
}

func NewSingleNodeConsolidation(consolidation consolidation) *SingleNodeConsolidation {
return &SingleNodeConsolidation{consolidation: consolidation}
return &SingleNodeConsolidation{
consolidation: consolidation,
PreviouslyUnseenNodePools: sets.New[string](),
}
}

// ComputeCommand generates a disruption command given candidates
Expand All @@ -45,20 +54,21 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
if s.IsConsolidated() {
return Command{}, scheduling.Results{}, nil
}
candidates = s.sortCandidates(candidates)
candidates = s.SortCandidates(ctx, candidates)

v := NewValidation(s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue, s.Reason())

// Set a timeout
timeout := s.clock.Now().Add(SingleNodeConsolidationTimeoutDuration)
constrainedByBudgets := false

// binary search to find the maximum number of NodeClaims we can terminate
unseenNodePools := sets.New(lo.Map(candidates, func(c *Candidate, _ int) string { return c.NodePool.Name })...)

for i, candidate := range candidates {
// If the disruption budget doesn't allow this candidate to be disrupted,
// continue to the next candidate. We don't need to decrement any budget
// counter since single node consolidation commands can only have one candidate.
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
if disruptionBudgetMapping[candidate.NodePool.Name] == 0 {
constrainedByBudgets = true
continue
}
Expand All @@ -71,8 +81,15 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
if s.clock.Now().After(timeout) {
ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: s.ConsolidationType()})
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i))

s.PreviouslyUnseenNodePools = unseenNodePools

return Command{}, scheduling.Results{}, nil
}

// Track that we've seen this nodepool
unseenNodePools.Delete(candidate.NodePool.Name)

// compute a possible consolidation option
cmd, results, err := s.computeConsolidation(ctx, candidate)
if err != nil {
Expand All @@ -91,12 +108,16 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
}
return cmd, results, nil
}

if !constrainedByBudgets {
// if there are no candidates because of a budget, don't mark
// as consolidated, as it's possible it should be consolidatable
// the next time we try to disrupt.
s.markConsolidated()
}

s.PreviouslyUnseenNodePools = unseenNodePools

return Command{}, scheduling.Results{}, nil
}

Expand All @@ -111,3 +132,43 @@ func (s *SingleNodeConsolidation) Class() string {
func (s *SingleNodeConsolidation) ConsolidationType() string {
return SingleNodeConsolidationType
}

// sortCandidates interweaves candidates from different nodepools and prioritizes nodepools
// that timed out in previous runs
func (s *SingleNodeConsolidation) SortCandidates(ctx context.Context, candidates []*Candidate) []*Candidate {

// First sort by disruption cost as the base ordering
sort.Slice(candidates, func(i int, j int) bool {
return candidates[i].DisruptionCost < candidates[j].DisruptionCost
})

return s.shuffleCandidates(ctx, lo.GroupBy(candidates, func(c *Candidate) string { return c.NodePool.Name }))
}

func (s *SingleNodeConsolidation) shuffleCandidates(ctx context.Context, nodePoolCandidates map[string][]*Candidate) []*Candidate {
var result []*Candidate
// Log any timed out nodepools that we're prioritizing
if s.PreviouslyUnseenNodePools.Len() != 0 {
log.FromContext(ctx).V(1).Info(fmt.Sprintf("prioritizing nodepools that have not yet been considered due to timeouts in previous runs: %s", strings.Join(s.PreviouslyUnseenNodePools.UnsortedList(), ", ")))
}
sortedNodePools := s.PreviouslyUnseenNodePools.UnsortedList()
sortedNodePools = append(sortedNodePools, lo.Filter(lo.Keys(nodePoolCandidates), func(nodePoolName string, _ int) bool {
return !s.PreviouslyUnseenNodePools.Has(nodePoolName)
})...)

// Find the maximum number of candidates in any nodepool
maxCandidatesPerNodePool := lo.MaxBy(lo.Values(nodePoolCandidates), func(a, b []*Candidate) bool {
return len(a) > len(b)
})

// Interweave candidates from different nodepools
for i := range maxCandidatesPerNodePool {
for _, nodePoolName := range sortedNodePools {
if i < len(nodePoolCandidates[nodePoolName]) {
result = append(result, nodePoolCandidates[nodePoolName][i])
}
}
}

return result
}
Loading