-
Notifications
You must be signed in to change notification settings - Fork 145
Expand file tree
/
Copy pathclassify.go
More file actions
263 lines (234 loc) · 9.14 KB
/
classify.go
File metadata and controls
263 lines (234 loc) · 9.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
package clusterinfo
import (
"context"
"fmt"
"log"
"maps"
"regexp"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/service/autoscaling"
astypes "github.com/aws/aws-sdk-go-v2/service/autoscaling/types"
"github.com/samber/lo"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
const (
nodeListChunkSize = 100
// describeASGInstancesMaxIDs is the documented per-call limit of
// autoscaling:DescribeAutoScalingInstances. Sending more triggers a
// ValidationError at the API.
describeASGInstancesMaxIDs = 50
)
// awsProviderIDRegexp matches the AWS provider ID for EC2-backed nodes.
// Format: aws:///<az>/i-<hex>. Fargate nodes use a different shape and
// must therefore be classified by label before reaching this regex.
var awsProviderIDRegexp = regexp.MustCompile(`^aws:///[^/]+/(i-[0-9a-f]+)$`)
// AutoscalingDescriber is the subset of *autoscaling.Client used by Classify.
// Defined as an interface so tests can substitute a fake without spinning up
// AWS SDK middleware.
type AutoscalingDescriber interface {
DescribeAutoScalingInstances(ctx context.Context, in *autoscaling.DescribeAutoScalingInstancesInput, opts ...func(*autoscaling.Options)) (*autoscaling.DescribeAutoScalingInstancesOutput, error)
}
// Classify inspects every node in the cluster, groups them by management
// method, and returns the resulting snapshot.
func Classify(ctx context.Context, k8sClient kubernetes.Interface, asg AutoscalingDescriber, clusterName string) (*ClusterInfo, error) {
info := &ClusterInfo{
APIVersion: APIVersion,
ClusterName: clusterName,
GeneratedAt: time.Now().UTC(),
NodeManagement: map[NodeManager]map[string][]string{},
}
asgCandidates, err := classifyByLabels(ctx, k8sClient, info)
if err != nil {
return nil, err
}
if err = resolveASGs(ctx, asg, asgCandidates, info); err != nil {
return nil, err
}
info.ClusterAutoscaler, err = detectClusterAutoscaler(ctx, k8sClient)
if err != nil {
return nil, fmt.Errorf("failed to detect cluster-autoscaler: %w", err)
}
return info, nil
}
// asgCandidate is a node that needs an AWS API call to determine whether
// it's in an ASG (asg bucket) or not (standalone bucket).
type asgCandidate struct {
instanceID string
nodeName string
}
// classifyByLabels walks all nodes and applies the label-only branches of the
// decision tree (Fargate, Karpenter, EKS managed node group, unknown). Nodes
// with an AWS EC2 providerID that don't match any of the above are returned
// as ASG candidates for resolveASGs to bucket as asg or standalone.
func classifyByLabels(ctx context.Context, k8sClient kubernetes.Interface, info *ClusterInfo) ([]asgCandidate, error) {
var candidates []asgCandidate
var cont string
for {
list, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{
Limit: nodeListChunkSize,
Continue: cont,
})
if err != nil {
return nil, fmt.Errorf("failed to list nodes: %w", err)
}
for _, node := range list.Items {
if mgr, entity, ok := classifyNodeByLabel(&node); ok {
addToBucket(info, mgr, entity, node.Name)
continue
}
matches := awsProviderIDRegexp.FindStringSubmatch(node.Spec.ProviderID)
if len(matches) == 2 {
candidates = append(candidates, asgCandidate{
instanceID: matches[1],
nodeName: node.Name,
})
} else {
addToBucket(info, NodeManagerUnknown, node.Spec.ProviderID, node.Name)
}
}
cont = list.Continue
if cont == "" {
return candidates, nil
}
}
}
// classifyNodeByLabel applies steps 1-3 of the decision tree using only the
// Node labels and name. Returns false when the node needs an AWS API lookup
// or the unknown-bucket fallback.
func classifyNodeByLabel(node *corev1.Node) (NodeManager, string, bool) {
if node.Labels["eks.amazonaws.com/compute-type"] == "fargate" || strings.HasPrefix(node.Name, "fargate-ip-") {
return NodeManagerFargate, node.Labels["eks.amazonaws.com/fargate-profile"], true
}
if v := node.Labels["karpenter.sh/nodepool"]; v != "" {
return NodeManagerKarpenter, v, true
}
if v := node.Labels["karpenter.k8s.aws/ec2nodeclass"]; v != "" {
return NodeManagerKarpenter, v, true
}
// Legacy Karpenter v0.x (pre-NodePool) uses Provisioner instead.
if v := node.Labels["karpenter.sh/provisioner-name"]; v != "" {
return NodeManagerKarpenter, v, true
}
if v := node.Labels["eks.amazonaws.com/nodegroup"]; v != "" {
return NodeManagerEKSManagedNodeGroup, v, true
}
return "", "", false
}
// resolveASGs batches DescribeAutoScalingInstances calls (50 IDs per call,
// the documented limit) to map instance IDs to ASGs. Instances reported
// without an AutoScalingGroupName fall into the standalone bucket.
func resolveASGs(ctx context.Context, asg AutoscalingDescriber, candidates []asgCandidate, info *ClusterInfo) error {
byInstance := make(map[string]string, len(candidates))
for _, batch := range lo.Chunk(candidates, describeASGInstancesMaxIDs) {
ids := lo.Map(batch, func(c asgCandidate, _ int) string { return c.instanceID })
out, err := asg.DescribeAutoScalingInstances(ctx, &autoscaling.DescribeAutoScalingInstancesInput{
InstanceIds: ids,
})
if err != nil {
return fmt.Errorf("failed to describe autoscaling instances: %w", err)
}
maps.Copy(byInstance, lo.FilterSliceToMap(out.AutoScalingInstances, func(ai astypes.AutoScalingInstanceDetails) (string, string, bool) {
if ai.InstanceId == nil || ai.AutoScalingGroupName == nil {
return "", "", false
}
return *ai.InstanceId, *ai.AutoScalingGroupName, true
}))
}
for _, c := range candidates {
if name := byInstance[c.instanceID]; name != "" {
addToBucket(info, NodeManagerASG, name, c.nodeName)
} else {
addToBucket(info, NodeManagerStandalone, "", c.nodeName)
}
}
return nil
}
func addToBucket(info *ClusterInfo, mgr NodeManager, entity, nodeName string) {
bucket := info.NodeManagement[mgr]
if bucket == nil {
bucket = map[string][]string{}
info.NodeManagement[mgr] = bucket
}
bucket[entity] = append(bucket[entity], nodeName)
}
// detectClusterAutoscaler scans Deployments cluster-wide and returns the
// first match. A match is any Deployment with name "cluster-autoscaler", a
// well-known label, or a container image referencing "cluster-autoscaler".
// Multiple matches yield a warning but only the first is recorded.
func detectClusterAutoscaler(ctx context.Context, k8sClient kubernetes.Interface) (ClusterAutoscaler, error) {
list, err := k8sClient.AppsV1().Deployments(corev1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return ClusterAutoscaler{}, err
}
matches := lo.Filter(list.Items, isClusterAutoscaler)
if len(matches) == 0 {
return ClusterAutoscaler{}, nil
}
if len(matches) > 1 {
log.Printf("Warning: %d Deployments match cluster-autoscaler heuristics; recording the first one (%s/%s).",
len(matches), matches[0].Namespace, matches[0].Name)
}
return ClusterAutoscaler{
Present: true,
Namespace: matches[0].Namespace,
Name: matches[0].Name,
Version: extractClusterAutoscalerVersion(matches[0]),
}, nil
}
// isClusterAutoscaler matches lo.Filter's predicate signature so it can be
// passed directly without a wrapper.
func isClusterAutoscaler(d appsv1.Deployment, _ int) bool {
// A Deployment scaled to zero is effectively disabled; ignoring it lets
// users who already stopped CA (per the Karpenter migration guide) get
// `Present: false` in the snapshot. A nil Replicas defaults to 1 per the
// Kubernetes API, so it counts as active.
if d.Spec.Replicas != nil && *d.Spec.Replicas == 0 {
return false
}
if d.Name == "cluster-autoscaler" ||
d.Labels["app.kubernetes.io/name"] == "cluster-autoscaler" ||
d.Labels["k8s-app"] == "cluster-autoscaler" {
return true
}
return lo.SomeBy(d.Spec.Template.Spec.Containers, func(c corev1.Container) bool {
return strings.Contains(c.Image, "cluster-autoscaler")
})
}
// extractClusterAutoscalerVersion returns the running cluster-autoscaler
// version. Prefers the image tag of the matching container (the source of
// truth) and falls back to the `app.kubernetes.io/version` label on the
// Deployment or its pod template (set by most Helm charts). Empty when
// neither is available — e.g. an image referenced by digest only and no
// version label.
func extractClusterAutoscalerVersion(d appsv1.Deployment) string {
for _, c := range d.Spec.Template.Spec.Containers {
if !strings.Contains(c.Image, "cluster-autoscaler") {
continue
}
if tag := imageTag(c.Image); tag != "" {
return tag
}
}
if v := d.Labels["app.kubernetes.io/version"]; v != "" {
return v
}
return d.Spec.Template.Labels["app.kubernetes.io/version"]
}
// imageTag extracts the tag portion of an OCI image reference, stripping
// any `@sha256:...` digest. Returns empty when no tag is set (for instance,
// digest-only references or bare image names).
func imageTag(image string) string {
if i := strings.Index(image, "@"); i >= 0 {
image = image[:i]
}
// The last colon is the tag separator only if it is not followed by a
// path component — otherwise it's a registry port (e.g. `localhost:5000/foo`).
if i := strings.LastIndex(image, ":"); i >= 0 && !strings.Contains(image[i+1:], "/") {
return image[i+1:]
}
return ""
}