Skip to content

Commit

Permalink
add testing
Browse files Browse the repository at this point in the history
  • Loading branch information
rschalo committed Mar 6, 2025
1 parent c30678f commit b04f8fa
Show file tree
Hide file tree
Showing 5 changed files with 435 additions and 105 deletions.
202 changes: 109 additions & 93 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
)

const SingleNodeConsolidationTimeoutDuration = 3 * time.Minute
var SingleNodeConsolidationTimeoutDuration = 3 * time.Minute

const SingleNodeConsolidationType = "single"

// SingleNodeConsolidation is the consolidation controller that performs single-node consolidation.
Expand All @@ -45,85 +46,6 @@ func NewSingleNodeConsolidation(consolidation consolidation) *SingleNodeConsolid
}
}

func (s *SingleNodeConsolidation) groupCandidatesByNodePool(candidates []*Candidate) (map[string][]*Candidate, []string) {
nodePoolCandidates := make(map[string][]*Candidate)
nodePoolNames := []string{}

for _, candidate := range candidates {
nodePoolName := candidate.nodePool.Name
if _, exists := nodePoolCandidates[nodePoolName]; !exists {
nodePoolNames = append(nodePoolNames, nodePoolName)
}
nodePoolCandidates[nodePoolName] = append(nodePoolCandidates[nodePoolName], candidate)
}
return nodePoolCandidates, nodePoolNames
}

func (s *SingleNodeConsolidation) sortNodePoolsByTimeout(ctx context.Context, nodePoolNames []string) {
// Log the timed out nodepools that we're prioritizing
timedOutNodePools := []string{}
for np := range s.nodePoolsTimedOut {
timedOutNodePools = append(timedOutNodePools, np)
}
if len(timedOutNodePools) > 0 {
log.FromContext(ctx).V(1).Info("prioritizing nodepools that have not yet been considered due to timeouts in previous runs: %v", timedOutNodePools)
}

// Prioritize nodepools that timed out in previous runs
sort.Slice(nodePoolNames, func(i, j int) bool {
// If nodepool i timed out but j didn't, i comes first
if s.nodePoolsTimedOut[nodePoolNames[i]] && !s.nodePoolsTimedOut[nodePoolNames[j]] {
return true
}
// If nodepool j timed out but i didn't, j comes first
if !s.nodePoolsTimedOut[nodePoolNames[i]] && s.nodePoolsTimedOut[nodePoolNames[j]] {
return false
}
// If both or neither timed out, keep original order
return i < j
})
}

func (s *SingleNodeConsolidation) shuffleCandidates(nodePoolCandidates map[string][]*Candidate, nodePoolNames []string) []*Candidate {
result := make([]*Candidate, 0)
maxCandidatesPerNodePool := 0

// Find the maximum number of candidates in any nodepool
for _, nodePoolName := range nodePoolNames {
if len(nodePoolCandidates[nodePoolName]) > maxCandidatesPerNodePool {
maxCandidatesPerNodePool = len(nodePoolCandidates[nodePoolName])
}
}

// Interweave candidates from different nodepools
for i := range maxCandidatesPerNodePool {
for _, nodePoolName := range nodePoolNames {
if i < len(nodePoolCandidates[nodePoolName]) {
result = append(result, nodePoolCandidates[nodePoolName][i])
}
}
}

return result
}

// sortCandidates interweaves candidates from different nodepools and prioritizes nodepools
// that timed out in previous runs
func (s *SingleNodeConsolidation) sortCandidates(candidates []*Candidate) []*Candidate {
ctx := context.Background()

// First sort by disruption cost as the base ordering
sort.Slice(candidates, func(i int, j int) bool {
return candidates[i].disruptionCost < candidates[j].disruptionCost
})

nodePoolCandidates, nodePoolNames := s.groupCandidatesByNodePool(candidates)

s.sortNodePoolsByTimeout(ctx, nodePoolNames)

return s.shuffleCandidates(nodePoolCandidates, nodePoolNames)
}

// ComputeCommand generates a disruption command given candidates
// nolint:gocyclo
func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, scheduling.Results, error) {
Expand All @@ -146,6 +68,19 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
}

for i, candidate := range candidates {
if s.clock.Now().After(timeout) {
ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: s.ConsolidationType()})
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i))

// Mark all nodepools that we haven't seen yet as timed out
for _, c := range candidates[i:] {
if !nodePoolsSeen[c.nodePool.Name] {
s.nodePoolsTimedOut[c.nodePool.Name] = true
}
}

return Command{}, scheduling.Results{}, nil
}
// Track that we've considered this nodepool
nodePoolsSeen[candidate.nodePool.Name] = true

Expand All @@ -162,19 +97,6 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
if len(candidate.reschedulablePods) == 0 {
continue
}
if s.clock.Now().After(timeout) {
ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: s.ConsolidationType()})
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i))

// Mark all nodepools that we haven't seen yet as timed out
for _, c := range candidates[i:] {
if !nodePoolsSeen[c.nodePool.Name] {
s.nodePoolsTimedOut[c.nodePool.Name] = true
}
}

return Command{}, scheduling.Results{}, nil
}
// compute a possible consolidation option
cmd, results, err := s.computeConsolidation(ctx, candidate)
if err != nil {
Expand Down Expand Up @@ -229,3 +151,97 @@ func (s *SingleNodeConsolidation) Class() string {
func (s *SingleNodeConsolidation) ConsolidationType() string {
return SingleNodeConsolidationType
}

func (s *SingleNodeConsolidation) groupCandidatesByNodePool(candidates []*Candidate) (map[string][]*Candidate, []string) {
nodePoolCandidates := make(map[string][]*Candidate)
nodePoolNames := []string{}

for _, candidate := range candidates {
nodePoolName := candidate.nodePool.Name
if _, exists := nodePoolCandidates[nodePoolName]; !exists {
nodePoolNames = append(nodePoolNames, nodePoolName)
}
nodePoolCandidates[nodePoolName] = append(nodePoolCandidates[nodePoolName], candidate)
}
return nodePoolCandidates, nodePoolNames
}

func (s *SingleNodeConsolidation) sortNodePoolsByTimeout(ctx context.Context, nodePoolNames []string) {
// Log the timed out nodepools that we're prioritizing
timedOutNodePools := []string{}
for np := range s.nodePoolsTimedOut {
timedOutNodePools = append(timedOutNodePools, np)
}
if len(timedOutNodePools) > 0 {
log.FromContext(ctx).V(1).Info("prioritizing nodepools that have not yet been considered due to timeouts in previous runs: %v", timedOutNodePools)
}

// Prioritize nodepools that timed out in previous runs
sort.Slice(nodePoolNames, func(i, j int) bool {
// If nodepool i timed out but j didn't, i comes first
if s.nodePoolsTimedOut[nodePoolNames[i]] && !s.nodePoolsTimedOut[nodePoolNames[j]] {
return true
}
// If nodepool j timed out but i didn't, j comes first
if !s.nodePoolsTimedOut[nodePoolNames[i]] && s.nodePoolsTimedOut[nodePoolNames[j]] {
return false
}
// If both or neither timed out, keep original order
return i < j
})
}

func (s *SingleNodeConsolidation) shuffleCandidates(nodePoolCandidates map[string][]*Candidate, nodePoolNames []string) []*Candidate {
result := make([]*Candidate, 0)
maxCandidatesPerNodePool := 0

// Find the maximum number of candidates in any nodepool
for _, nodePoolName := range nodePoolNames {
if len(nodePoolCandidates[nodePoolName]) > maxCandidatesPerNodePool {
maxCandidatesPerNodePool = len(nodePoolCandidates[nodePoolName])
}
}

// Interweave candidates from different nodepools
for i := range maxCandidatesPerNodePool {
for _, nodePoolName := range nodePoolNames {
if i < len(nodePoolCandidates[nodePoolName]) {
result = append(result, nodePoolCandidates[nodePoolName][i])
}
}
}

return result
}

// sortCandidates interweaves candidates from different nodepools and prioritizes nodepools
// that timed out in previous runs
func (s *SingleNodeConsolidation) sortCandidates(candidates []*Candidate) []*Candidate {
ctx := context.Background()

// First sort by disruption cost as the base ordering
sort.Slice(candidates, func(i int, j int) bool {
return candidates[i].disruptionCost < candidates[j].disruptionCost
})

nodePoolCandidates, nodePoolNames := s.groupCandidatesByNodePool(candidates)

s.sortNodePoolsByTimeout(ctx, nodePoolNames)

return s.shuffleCandidates(nodePoolCandidates, nodePoolNames)
}

// SortCandidates is a public wrapper around sortCandidates for testing
func (s *SingleNodeConsolidation) SortCandidates(candidates []*Candidate) []*Candidate {
return s.sortCandidates(candidates)
}

// MarkNodePoolTimedOut marks a nodepool as timed out for testing
func (s *SingleNodeConsolidation) MarkNodePoolTimedOut(nodePoolName string) {
s.nodePoolsTimedOut[nodePoolName] = true
}

// IsNodePoolTimedOut checks if a nodepool is marked as timed out for testing
func (s *SingleNodeConsolidation) IsNodePoolTimedOut(nodePoolName string) bool {
return s.nodePoolsTimedOut[nodePoolName]
}
Loading

0 comments on commit b04f8fa

Please sign in to comment.