Skip to content

Commit 63523ea

Browse files
authored
Merge branch 'main' into feature/support-tz-for-disruption-budgets-schedule
2 parents 2693c3e + 76aaae9 commit 63523ea

20 files changed

+589
-99
lines changed

go.mod

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.24.0
55
require (
66
github.com/Pallinder/go-randomdata v1.2.0
77
github.com/avast/retry-go v3.0.0+incompatible
8-
github.com/awslabs/operatorpkg v0.0.0-20241205163410-0fff9f28d115
8+
github.com/awslabs/operatorpkg v0.0.0-20250320000002-b05af0f15c68
99
github.com/docker/docker v28.0.1+incompatible
1010
github.com/go-logr/logr v1.4.2
1111
github.com/imdario/mergo v0.3.16
@@ -21,13 +21,13 @@ require (
2121
go.uber.org/zap v1.27.0
2222
golang.org/x/text v0.23.0
2323
golang.org/x/time v0.11.0
24-
k8s.io/api v0.32.2
25-
k8s.io/apiextensions-apiserver v0.32.2
26-
k8s.io/apimachinery v0.32.2
27-
k8s.io/client-go v0.32.2
28-
k8s.io/cloud-provider v0.32.2
29-
k8s.io/component-base v0.32.2
30-
k8s.io/csi-translation-lib v0.32.2
24+
k8s.io/api v0.32.3
25+
k8s.io/apiextensions-apiserver v0.32.3
26+
k8s.io/apimachinery v0.32.3
27+
k8s.io/client-go v0.32.3
28+
k8s.io/cloud-provider v0.32.3
29+
k8s.io/component-base v0.32.3
30+
k8s.io/csi-translation-lib v0.32.3
3131
k8s.io/klog/v2 v2.130.1
3232
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
3333
sigs.k8s.io/controller-runtime v0.20.3

go.sum

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ github.com/Pallinder/go-randomdata v1.2.0 h1:DZ41wBchNRb/0GfsePLiSwb0PHZmT67XY00
22
github.com/Pallinder/go-randomdata v1.2.0/go.mod h1:yHmJgulpD2Nfrm0cR9tI/+oAgRqCQQixsA8HyRZfV9Y=
33
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
44
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
5-
github.com/awslabs/operatorpkg v0.0.0-20241205163410-0fff9f28d115 h1:9nhjY3dzCpEmhpQ0vMlhB7wqucAiftLjAIEQu8uT2J4=
6-
github.com/awslabs/operatorpkg v0.0.0-20241205163410-0fff9f28d115/go.mod h1:TTs6HGuqmgdNyNlbdv29v1OoON+kQKVPojZgJaJVtNk=
5+
github.com/awslabs/operatorpkg v0.0.0-20250320000002-b05af0f15c68 h1:llLoYu7EeqtFrCGCJzzXIyDxvCwn/Zr+aX+sRyabXgw=
6+
github.com/awslabs/operatorpkg v0.0.0-20250320000002-b05af0f15c68/go.mod h1:Uu2TsiIC3jUXRxMiDXOsiz3ZuBLTsCj1j4B858r51bs=
77
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
88
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
99
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
@@ -195,20 +195,20 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
195195
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
196196
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
197197
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
198-
k8s.io/api v0.32.2 h1:bZrMLEkgizC24G9eViHGOPbW+aRo9duEISRIJKfdJuw=
199-
k8s.io/api v0.32.2/go.mod h1:hKlhk4x1sJyYnHENsrdCWw31FEmCijNGPJO5WzHiJ6Y=
200-
k8s.io/apiextensions-apiserver v0.32.2 h1:2YMk285jWMk2188V2AERy5yDwBYrjgWYggscghPCvV4=
201-
k8s.io/apiextensions-apiserver v0.32.2/go.mod h1:GPwf8sph7YlJT3H6aKUWtd0E+oyShk/YHWQHf/OOgCA=
202-
k8s.io/apimachinery v0.32.2 h1:yoQBR9ZGkA6Rgmhbp/yuT9/g+4lxtsGYwW6dR6BDPLQ=
203-
k8s.io/apimachinery v0.32.2/go.mod h1:GpHVgxoKlTxClKcteaeuF1Ul/lDVb74KpZcxcmLDElE=
204-
k8s.io/client-go v0.32.2 h1:4dYCD4Nz+9RApM2b/3BtVvBHw54QjMFUl1OLcJG5yOA=
205-
k8s.io/client-go v0.32.2/go.mod h1:fpZ4oJXclZ3r2nDOv+Ux3XcJutfrwjKTCHz2H3sww94=
206-
k8s.io/cloud-provider v0.32.2 h1:8EC+fCYo0r0REczSjOZcVuQPCMxXxCKlgxDbYMrzC30=
207-
k8s.io/cloud-provider v0.32.2/go.mod h1:2s8TeAXhVezp5VISaTxM6vW3yDonOZXoN4Aryz1p1PQ=
208-
k8s.io/component-base v0.32.2 h1:1aUL5Vdmu7qNo4ZsE+569PV5zFatM9hl+lb3dEea2zU=
209-
k8s.io/component-base v0.32.2/go.mod h1:PXJ61Vx9Lg+P5mS8TLd7bCIr+eMJRQTyXe8KvkrvJq0=
210-
k8s.io/csi-translation-lib v0.32.2 h1:aLzAyaoJUc5rgtLi8Xd4No1tet6UpvUsGIgRoGnPSSE=
211-
k8s.io/csi-translation-lib v0.32.2/go.mod h1:PlOKan6Vc0G6a+giQbm36plJ+E1LH+GPRLAVMQMSMcY=
198+
k8s.io/api v0.32.3 h1:Hw7KqxRusq+6QSplE3NYG4MBxZw1BZnq4aP4cJVINls=
199+
k8s.io/api v0.32.3/go.mod h1:2wEDTXADtm/HA7CCMD8D8bK4yuBUptzaRhYcYEEYA3k=
200+
k8s.io/apiextensions-apiserver v0.32.3 h1:4D8vy+9GWerlErCwVIbcQjsWunF9SUGNu7O7hiQTyPY=
201+
k8s.io/apiextensions-apiserver v0.32.3/go.mod h1:8YwcvVRMVzw0r1Stc7XfGAzB/SIVLunqApySV5V7Dss=
202+
k8s.io/apimachinery v0.32.3 h1:JmDuDarhDmA/Li7j3aPrwhpNBA94Nvk5zLeOge9HH1U=
203+
k8s.io/apimachinery v0.32.3/go.mod h1:GpHVgxoKlTxClKcteaeuF1Ul/lDVb74KpZcxcmLDElE=
204+
k8s.io/client-go v0.32.3 h1:RKPVltzopkSgHS7aS98QdscAgtgah/+zmpAogooIqVU=
205+
k8s.io/client-go v0.32.3/go.mod h1:3v0+3k4IcT9bXTc4V2rt+d2ZPPG700Xy6Oi0Gdl2PaY=
206+
k8s.io/cloud-provider v0.32.3 h1:WC7KhWrqXsU4b0E4tjS+nBectGiJbr1wuc1TpWXvtZM=
207+
k8s.io/cloud-provider v0.32.3/go.mod h1:/fwBfgRPuh16n8vLHT+PPT+Bc4LAEaJYj38opO2wsYY=
208+
k8s.io/component-base v0.32.3 h1:98WJvvMs3QZ2LYHBzvltFSeJjEx7t5+8s71P7M74u8k=
209+
k8s.io/component-base v0.32.3/go.mod h1:LWi9cR+yPAv7cu2X9rZanTiFKB2kHA+JjmhkKjCZRpI=
210+
k8s.io/csi-translation-lib v0.32.3 h1:fKdc9LMVEMk18xsgoPm1Ga8GjfhI7AM3UX8gnIeXZKs=
211+
k8s.io/csi-translation-lib v0.32.3/go.mod h1:VX6+hCKgQyFnUX3VrnXZAgYYBXkrqx4BZk9vxr9qRcE=
212212
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
213213
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
214214
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f h1:GA7//TjRY9yWGy1poLzYYJJ4JRdzg3+O6e8I+e+8T5Y=

pkg/controllers/disruption/consolidation.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,15 @@ func (c *consolidation) ShouldDisrupt(_ context.Context, cn *Candidate) bool {
104104
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("Node does not have label %q", corev1.LabelTopologyZone))...)
105105
return false
106106
}
107-
if cn.nodePool.Spec.Disruption.ConsolidateAfter.Duration == nil {
108-
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("NodePool %q has consolidation disabled", cn.nodePool.Name))...)
107+
if cn.NodePool.Spec.Disruption.ConsolidateAfter.Duration == nil {
108+
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("NodePool %q has consolidation disabled", cn.NodePool.Name))...)
109109
return false
110110
}
111111
// If we don't have the "WhenEmptyOrUnderutilized" policy set, we should not do any of the consolidation methods, but
112112
// we should also not fire an event here to users since this can be confusing when the field on the NodePool
113113
// is named "consolidationPolicy"
114-
if cn.nodePool.Spec.Disruption.ConsolidationPolicy != v1.ConsolidationPolicyWhenEmptyOrUnderutilized {
115-
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("NodePool %q has non-empty consolidation disabled", cn.nodePool.Name))...)
114+
if cn.NodePool.Spec.Disruption.ConsolidationPolicy != v1.ConsolidationPolicyWhenEmptyOrUnderutilized {
115+
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("NodePool %q has non-empty consolidation disabled", cn.NodePool.Name))...)
116116
return false
117117
}
118118
// return true if consolidatable
@@ -122,7 +122,7 @@ func (c *consolidation) ShouldDisrupt(_ context.Context, cn *Candidate) bool {
122122
// sortCandidates sorts candidates by disruption cost (where the lowest disruption cost is first) and returns the result
123123
func (c *consolidation) sortCandidates(candidates []*Candidate) []*Candidate {
124124
sort.Slice(candidates, func(i int, j int) bool {
125-
return candidates[i].disruptionCost < candidates[j].disruptionCost
125+
return candidates[i].DisruptionCost < candidates[j].DisruptionCost
126126
})
127127
return candidates
128128
}

pkg/controllers/disruption/drift.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[
7272
}
7373
// If there's disruptions allowed for the candidate's nodepool,
7474
// add it to the list of candidates, and decrement the budget.
75-
if disruptionBudgetMapping[candidate.nodePool.Name] > 0 {
75+
if disruptionBudgetMapping[candidate.NodePool.Name] > 0 {
7676
empty = append(empty, candidate)
77-
disruptionBudgetMapping[candidate.nodePool.Name]--
77+
disruptionBudgetMapping[candidate.NodePool.Name]--
7878
}
7979
}
8080
// Disrupt all empty drifted candidates, as they require no scheduling simulations.
@@ -88,7 +88,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[
8888
// If the disruption budget doesn't allow this candidate to be disrupted,
8989
// continue to the next candidate. We don't need to decrement any budget
9090
// counter since drift commands can only have one candidate.
91-
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
91+
if disruptionBudgetMapping[candidate.NodePool.Name] == 0 {
9292
continue
9393
}
9494
// Check if we need to create any NodeClaims.

pkg/controllers/disruption/emptiness.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ func NewEmptiness(c consolidation) *Emptiness {
4343
// ShouldDisrupt is a predicate used to filter candidates
4444
func (e *Emptiness) ShouldDisrupt(_ context.Context, c *Candidate) bool {
4545
// If consolidation is disabled, don't do anything. This emptiness should run for both WhenEmpty and WhenEmptyOrUnderutilized
46-
if c.nodePool.Spec.Disruption.ConsolidateAfter.Duration == nil {
47-
e.recorder.Publish(disruptionevents.Unconsolidatable(c.Node, c.NodeClaim, fmt.Sprintf("NodePool %q has consolidation disabled", c.nodePool.Name))...)
46+
if c.NodePool.Spec.Disruption.ConsolidateAfter.Duration == nil {
47+
e.recorder.Publish(disruptionevents.Unconsolidatable(c.Node, c.NodeClaim, fmt.Sprintf("NodePool %q has consolidation disabled", c.NodePool.Name))...)
4848
return false
4949
}
5050
// return true if there are no pods and the nodeclaim is consolidatable
@@ -66,15 +66,15 @@ func (e *Emptiness) ComputeCommand(ctx context.Context, disruptionBudgetMapping
6666
if len(candidate.reschedulablePods) > 0 {
6767
continue
6868
}
69-
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
69+
if disruptionBudgetMapping[candidate.NodePool.Name] == 0 {
7070
// set constrainedByBudgets to true if any node was a candidate but was constrained by a budget
7171
constrainedByBudgets = true
7272
continue
7373
}
7474
// If there's disruptions allowed for the candidate's nodepool,
7575
// add it to the list of candidates, and decrement the budget.
7676
empty = append(empty, candidate)
77-
disruptionBudgetMapping[candidate.nodePool.Name]--
77+
disruptionBudgetMapping[candidate.NodePool.Name]--
7878
}
7979
// none empty, so do nothing
8080
if len(empty) == 0 {

pkg/controllers/disruption/helpers.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,11 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
8888
return client.ObjectKeyFromObject(p), nil
8989
})
9090

91-
results := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods).TruncateInstanceTypes(scheduling.MaxInstanceTypes)
91+
results, err := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods)
92+
if err != nil {
93+
return scheduling.Results{}, fmt.Errorf("scheduling pods, %w", err)
94+
}
95+
results = results.TruncateInstanceTypes(scheduling.MaxInstanceTypes)
9296
for _, n := range results.ExistingNodes {
9397
// We consider existing nodes for scheduling. When these nodes are unmanaged, their taint logic should
9498
// tell us if we can schedule to them or not; however, if these nodes are managed, we will still schedule to them

pkg/controllers/disruption/multinodeconsolidation.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package disruption
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"math"
2324
"time"
@@ -61,7 +62,7 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
6162
for _, candidate := range candidates {
6263
// If there's disruptions allowed for the candidate's nodepool,
6364
// add it to the list of candidates, and decrement the budget.
64-
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
65+
if disruptionBudgetMapping[candidate.NodePool.Name] == 0 {
6566
constrainedByBudgets = true
6667
continue
6768
}
@@ -73,7 +74,7 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
7374
}
7475
// set constrainedByBudgets to true if any node was a candidate but was constrained by a budget
7576
disruptableCandidates = append(disruptableCandidates, candidate)
76-
disruptionBudgetMapping[candidate.nodePool.Name]--
77+
disruptionBudgetMapping[candidate.NodePool.Name]--
7778
}
7879

7980
// Only consider a maximum batch of 100 NodeClaims to save on computation.
@@ -107,6 +108,7 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
107108

108109
// firstNConsolidationOption looks at the first N NodeClaims to determine if they can all be consolidated at once. The
109110
// NodeClaims are sorted by increasing disruption order which correlates to likelihood of being able to consolidate the node
111+
// nolint:gocyclo
110112
func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context, candidates []*Candidate, max int) (Command, scheduling.Results, error) {
111113
// we always operate on at least two NodeClaims at once, for single NodeClaims standard consolidation will find all solutions
112114
if len(candidates) < 2 {
@@ -120,23 +122,26 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
120122
lastSavedCommand := Command{}
121123
lastSavedResults := scheduling.Results{}
122124
// Set a timeout
123-
timeout := m.clock.Now().Add(MultiNodeConsolidationTimeoutDuration)
124-
// binary search to find the maximum number of NodeClaims we can terminate
125+
timeoutCtx, cancel := context.WithTimeout(ctx, MultiNodeConsolidationTimeoutDuration)
126+
defer cancel()
125127
for min <= max {
126-
if m.clock.Now().After(timeout) {
127-
ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: m.ConsolidationType()})
128-
if lastSavedCommand.candidates == nil {
129-
log.FromContext(ctx).V(1).Info(fmt.Sprintf("failed to find a multi-node consolidation after timeout, last considered batch had %d", (min+max)/2))
130-
} else {
131-
log.FromContext(ctx).V(1).WithValues(lastSavedCommand.LogValues()...).Info("stopping multi-node consolidation after timeout, returning last valid command")
132-
}
133-
return lastSavedCommand, lastSavedResults, nil
134-
}
135128
mid := (min + max) / 2
136129
candidatesToConsolidate := candidates[0 : mid+1]
137130

138-
cmd, results, err := m.computeConsolidation(ctx, candidatesToConsolidate...)
131+
// Pass the timeout context to ensure sub-operations can be canceled
132+
cmd, results, err := m.computeConsolidation(timeoutCtx, candidatesToConsolidate...)
133+
// context deadline exceeded will return to the top of the loop and either return nothing or the last saved command
139134
if err != nil {
135+
if errors.Is(err, context.DeadlineExceeded) {
136+
ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: m.ConsolidationType()})
137+
if lastSavedCommand.candidates == nil {
138+
log.FromContext(ctx).V(1).Info(fmt.Sprintf("failed to find a multi-node consolidation after timeout, last considered batch had %d", (min+max)/2))
139+
return Command{}, scheduling.Results{}, nil
140+
}
141+
log.FromContext(ctx).V(1).WithValues(lastSavedCommand.LogValues()...).Info("stopping multi-node consolidation after timeout, returning last valid command")
142+
return lastSavedCommand, lastSavedResults, nil
143+
144+
}
140145
return Command{}, scheduling.Results{}, err
141146
}
142147

pkg/controllers/disruption/singlenodeconsolidation.go

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,33 @@ package disruption
1919
import (
2020
"context"
2121
"fmt"
22+
"sort"
23+
"strings"
2224
"time"
2325

26+
"github.com/samber/lo"
27+
"k8s.io/apimachinery/pkg/util/sets"
2428
"sigs.k8s.io/controller-runtime/pkg/log"
2529

2630
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
2731
"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
2832
)
2933

30-
const SingleNodeConsolidationTimeoutDuration = 3 * time.Minute
34+
var SingleNodeConsolidationTimeoutDuration = 3 * time.Minute
35+
3136
const SingleNodeConsolidationType = "single"
3237

3338
// SingleNodeConsolidation is the consolidation controller that performs single-node consolidation.
3439
type SingleNodeConsolidation struct {
3540
consolidation
41+
PreviouslyUnseenNodePools sets.Set[string]
3642
}
3743

3844
func NewSingleNodeConsolidation(consolidation consolidation) *SingleNodeConsolidation {
39-
return &SingleNodeConsolidation{consolidation: consolidation}
45+
return &SingleNodeConsolidation{
46+
consolidation: consolidation,
47+
PreviouslyUnseenNodePools: sets.New[string](),
48+
}
4049
}
4150

4251
// ComputeCommand generates a disruption command given candidates
@@ -45,20 +54,21 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
4554
if s.IsConsolidated() {
4655
return Command{}, scheduling.Results{}, nil
4756
}
48-
candidates = s.sortCandidates(candidates)
57+
candidates = s.SortCandidates(ctx, candidates)
4958

5059
v := NewValidation(s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue, s.Reason())
5160

5261
// Set a timeout
5362
timeout := s.clock.Now().Add(SingleNodeConsolidationTimeoutDuration)
5463
constrainedByBudgets := false
5564

56-
// binary search to find the maximum number of NodeClaims we can terminate
65+
unseenNodePools := sets.New(lo.Map(candidates, func(c *Candidate, _ int) string { return c.NodePool.Name })...)
66+
5767
for i, candidate := range candidates {
5868
// If the disruption budget doesn't allow this candidate to be disrupted,
5969
// continue to the next candidate. We don't need to decrement any budget
6070
// counter since single node consolidation commands can only have one candidate.
61-
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
71+
if disruptionBudgetMapping[candidate.NodePool.Name] == 0 {
6272
constrainedByBudgets = true
6373
continue
6474
}
@@ -71,8 +81,15 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
7181
if s.clock.Now().After(timeout) {
7282
ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: s.ConsolidationType()})
7383
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i))
84+
85+
s.PreviouslyUnseenNodePools = unseenNodePools
86+
7487
return Command{}, scheduling.Results{}, nil
7588
}
89+
90+
// Track that we've seen this nodepool
91+
unseenNodePools.Delete(candidate.NodePool.Name)
92+
7693
// compute a possible consolidation option
7794
cmd, results, err := s.computeConsolidation(ctx, candidate)
7895
if err != nil {
@@ -91,12 +108,16 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
91108
}
92109
return cmd, results, nil
93110
}
111+
94112
if !constrainedByBudgets {
95113
// if there are no candidates because of a budget, don't mark
96114
// as consolidated, as it's possible it should be consolidatable
97115
// the next time we try to disrupt.
98116
s.markConsolidated()
99117
}
118+
119+
s.PreviouslyUnseenNodePools = unseenNodePools
120+
100121
return Command{}, scheduling.Results{}, nil
101122
}
102123

@@ -111,3 +132,43 @@ func (s *SingleNodeConsolidation) Class() string {
111132
func (s *SingleNodeConsolidation) ConsolidationType() string {
112133
return SingleNodeConsolidationType
113134
}
135+
136+
// sortCandidates interweaves candidates from different nodepools and prioritizes nodepools
137+
// that timed out in previous runs
138+
func (s *SingleNodeConsolidation) SortCandidates(ctx context.Context, candidates []*Candidate) []*Candidate {
139+
140+
// First sort by disruption cost as the base ordering
141+
sort.Slice(candidates, func(i int, j int) bool {
142+
return candidates[i].DisruptionCost < candidates[j].DisruptionCost
143+
})
144+
145+
return s.shuffleCandidates(ctx, lo.GroupBy(candidates, func(c *Candidate) string { return c.NodePool.Name }))
146+
}
147+
148+
func (s *SingleNodeConsolidation) shuffleCandidates(ctx context.Context, nodePoolCandidates map[string][]*Candidate) []*Candidate {
149+
var result []*Candidate
150+
// Log any timed out nodepools that we're prioritizing
151+
if s.PreviouslyUnseenNodePools.Len() != 0 {
152+
log.FromContext(ctx).V(1).Info(fmt.Sprintf("prioritizing nodepools that have not yet been considered due to timeouts in previous runs: %s", strings.Join(s.PreviouslyUnseenNodePools.UnsortedList(), ", ")))
153+
}
154+
sortedNodePools := s.PreviouslyUnseenNodePools.UnsortedList()
155+
sortedNodePools = append(sortedNodePools, lo.Filter(lo.Keys(nodePoolCandidates), func(nodePoolName string, _ int) bool {
156+
return !s.PreviouslyUnseenNodePools.Has(nodePoolName)
157+
})...)
158+
159+
// Find the maximum number of candidates in any nodepool
160+
maxCandidatesPerNodePool := lo.MaxBy(lo.Values(nodePoolCandidates), func(a, b []*Candidate) bool {
161+
return len(a) > len(b)
162+
})
163+
164+
// Interweave candidates from different nodepools
165+
for i := range maxCandidatesPerNodePool {
166+
for _, nodePoolName := range sortedNodePools {
167+
if i < len(nodePoolCandidates[nodePoolName]) {
168+
result = append(result, nodePoolCandidates[nodePoolName][i])
169+
}
170+
}
171+
}
172+
173+
return result
174+
}

0 commit comments

Comments
 (0)