-
Notifications
You must be signed in to change notification settings - Fork 239
/
Copy pathemptynodeconsolidation.go
119 lines (101 loc) · 4.01 KB
/
emptynodeconsolidation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package disruption
import (
"context"
"errors"
"github.com/samber/lo"
"knative.dev/pkg/logging"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/metrics"
)
// EmptyNodeConsolidation is the consolidation controller that performs multi-nodeclaim consolidation of entirely empty nodes
type EmptyNodeConsolidation struct {
consolidation
}
func NewEmptyNodeConsolidation(consolidation consolidation) *EmptyNodeConsolidation {
return &EmptyNodeConsolidation{consolidation: consolidation}
}
// ComputeCommand generates a disruption command given candidates
//
//nolint:gocyclo
func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, scheduling.Results, error) {
if c.IsConsolidated() {
return Command{}, scheduling.Results{}, nil
}
candidates = c.sortCandidates(candidates)
EligibleNodesGauge.With(map[string]string{
methodLabel: c.Type(),
consolidationTypeLabel: c.ConsolidationType(),
}).Set(float64(len(candidates)))
empty := make([]*Candidate, 0, len(candidates))
constrainedByBudgets := false
for _, candidate := range candidates {
if len(candidate.reschedulablePods) > 0 {
continue
}
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
// set constrainedByBudgets to true if any node was a candidate but was constrained by a budget
constrainedByBudgets = true
continue
}
// If there's disruptions allowed for the candidate's nodepool,
// add it to the list of candidates, and decrement the budget.
empty = append(empty, candidate)
disruptionBudgetMapping[candidate.nodePool.Name]--
}
// none empty, so do nothing
if len(empty) == 0 {
// if there are no candidates, but a nodepool had a fully blocking budget,
// don't mark the cluster as consolidated, as it's possible this nodepool
// should be consolidated the next time we try to disrupt.
if !constrainedByBudgets {
c.markConsolidated()
}
return Command{}, scheduling.Results{}, nil
}
cmd := Command{
candidates: empty,
}
// Empty Node Consolidation doesn't use Validation as we get to take advantage of cluster.IsNodeNominated. This
// lets us avoid a scheduling simulation (which is performed periodically while pending pods exist and drives
// cluster.IsNodeNominated already).
select {
case <-ctx.Done():
return Command{}, scheduling.Results{}, errors.New("interrupted")
case <-c.clock.After(consolidationTTL):
}
v := NewValidation(c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider, c.recorder, c.queue)
validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...)
if err != nil {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
return Command{}, scheduling.Results{}, err
}
// TODO (jmdeal@): better encapsulate within validation
if lo.ContainsBy(validatedCandidates, func(c *Candidate) bool {
return len(c.reschedulablePods) != 0
}) {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
return cmd, scheduling.Results{}, nil
}
func (c *EmptyNodeConsolidation) Type() string {
return metrics.ConsolidationReason
}
func (c *EmptyNodeConsolidation) ConsolidationType() string {
return "empty"
}