Skip to content

Commit 78cb8ae

Browse files
perf: Remove calling List on NodeClaims and Nodes in interruption controller (#7707)
1 parent eeac9df commit 78cb8ae

File tree

4 files changed

+88
-65
lines changed

4 files changed

+88
-65
lines changed

pkg/controllers/interruption/controller.go

Lines changed: 24 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,14 @@ import (
3838
"sigs.k8s.io/karpenter/pkg/operator/injection"
3939

4040
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
41-
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
4241
"sigs.k8s.io/karpenter/pkg/utils/pretty"
4342

43+
"sigs.k8s.io/karpenter/pkg/events"
44+
4445
"github.com/aws/karpenter-provider-aws/pkg/cache"
4546
interruptionevents "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/events"
4647
"github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/messages"
4748
"github.com/aws/karpenter-provider-aws/pkg/providers/sqs"
48-
"github.com/aws/karpenter-provider-aws/pkg/utils"
49-
50-
"sigs.k8s.io/karpenter/pkg/events"
5149
)
5250

5351
type Action string
@@ -104,14 +102,7 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
104102
if len(sqsMessages) == 0 {
105103
return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil
106104
}
107-
nodeClaimInstanceIDMap, err := c.makeNodeClaimInstanceIDMap(ctx)
108-
if err != nil {
109-
return reconcile.Result{}, fmt.Errorf("making nodeclaim instance id map, %w", err)
110-
}
111-
nodeInstanceIDMap, err := c.makeNodeInstanceIDMap(ctx)
112-
if err != nil {
113-
return reconcile.Result{}, fmt.Errorf("making node instance id map, %w", err)
114-
}
105+
115106
errs := make([]error, len(sqsMessages))
116107
workqueue.ParallelizeUntil(ctx, 10, len(sqsMessages), func(i int) {
117108
msg, e := c.parseMessage(sqsMessages[i])
@@ -121,7 +112,7 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
121112
errs[i] = c.deleteMessage(ctx, sqsMessages[i])
122113
return
123114
}
124-
if e = c.handleMessage(ctx, nodeClaimInstanceIDMap, nodeInstanceIDMap, msg); e != nil {
115+
if e = c.handleMessage(ctx, msg); e != nil {
125116
errs[i] = fmt.Errorf("handling message, %w", e)
126117
return
127118
}
@@ -154,23 +145,35 @@ func (c *Controller) parseMessage(raw *sqstypes.Message) (messages.Message, erro
154145
}
155146

156147
// handleMessage takes an action against every node involved in the message that is owned by a NodePool
157-
func (c *Controller) handleMessage(ctx context.Context, nodeClaimInstanceIDMap map[string]*karpv1.NodeClaim,
158-
nodeInstanceIDMap map[string]*corev1.Node, msg messages.Message) (err error) {
159-
148+
func (c *Controller) handleMessage(ctx context.Context, msg messages.Message) (err error) {
160149
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("messageKind", msg.Kind()))
161150
ReceivedMessages.Inc(map[string]string{messageTypeLabel: string(msg.Kind())})
162151

163152
if msg.Kind() == messages.NoOpKind {
164153
return nil
165154
}
166155
for _, instanceID := range msg.EC2InstanceIDs() {
167-
nodeClaim, ok := nodeClaimInstanceIDMap[instanceID]
168-
if !ok {
156+
nodeClaimList := &karpv1.NodeClaimList{}
157+
if e := c.kubeClient.List(ctx, nodeClaimList, client.MatchingFields{"status.instanceID": instanceID}); e != nil {
158+
err = multierr.Append(err, e)
169159
continue
170160
}
171-
node := nodeInstanceIDMap[instanceID]
172-
if e := c.handleNodeClaim(ctx, msg, nodeClaim, node); e != nil {
173-
err = multierr.Append(err, e)
161+
if len(nodeClaimList.Items) == 0 {
162+
continue
163+
}
164+
for _, nodeClaim := range nodeClaimList.Items {
165+
nodeList := &corev1.NodeList{}
166+
if e := c.kubeClient.List(ctx, nodeList, client.MatchingFields{"spec.instanceID": instanceID}); e != nil {
167+
err = multierr.Append(err, e)
168+
continue
169+
}
170+
var node *corev1.Node
171+
if len(nodeList.Items) > 0 {
172+
node = &nodeList.Items[0]
173+
}
174+
if e := c.handleNodeClaim(ctx, msg, &nodeClaim, node); e != nil {
175+
err = multierr.Append(err, e)
176+
}
174177
}
175178
}
176179
MessageLatency.Observe(time.Since(msg.StartTime()).Seconds(), nil)
@@ -254,48 +257,6 @@ func (c *Controller) notifyForMessage(msg messages.Message, nodeClaim *karpv1.No
254257
}
255258
}
256259

257-
// makeNodeClaimInstanceIDMap builds a map between the instance id that is stored in the
258-
// NodeClaim .status.providerID and the NodeClaim
259-
func (c *Controller) makeNodeClaimInstanceIDMap(ctx context.Context) (map[string]*karpv1.NodeClaim, error) {
260-
m := map[string]*karpv1.NodeClaim{}
261-
nodeClaims, err := nodeclaimutils.ListManaged(ctx, c.kubeClient, c.cloudProvider)
262-
if err != nil {
263-
return nil, err
264-
}
265-
for _, nc := range nodeClaims {
266-
if nc.Status.ProviderID == "" {
267-
continue
268-
}
269-
id, err := utils.ParseInstanceID(nc.Status.ProviderID)
270-
if err != nil || id == "" {
271-
continue
272-
}
273-
m[id] = nc
274-
}
275-
return m, nil
276-
}
277-
278-
// makeNodeInstanceIDMap builds a map between the instance id that is stored in the
279-
// node .spec.providerID and the node
280-
func (c *Controller) makeNodeInstanceIDMap(ctx context.Context) (map[string]*corev1.Node, error) {
281-
m := map[string]*corev1.Node{}
282-
nodeList := &corev1.NodeList{}
283-
if err := c.kubeClient.List(ctx, nodeList); err != nil {
284-
return nil, fmt.Errorf("listing nodes, %w", err)
285-
}
286-
for i := range nodeList.Items {
287-
if nodeList.Items[i].Spec.ProviderID == "" {
288-
continue
289-
}
290-
id, err := utils.ParseInstanceID(nodeList.Items[i].Spec.ProviderID)
291-
if err != nil || id == "" {
292-
continue
293-
}
294-
m[id] = &nodeList.Items[i]
295-
}
296-
return m, nil
297-
}
298-
299260
func actionForMessage(msg messages.Message) Action {
300261
switch msg.Kind() {
301262
case messages.ScheduledChangeKind, messages.SpotInterruptionKind, messages.InstanceStoppedKind, messages.InstanceTerminatedKind:

pkg/controllers/interruption/suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func TestAPIs(t *testing.T) {
8484

8585
var _ = BeforeSuite(func() {
8686
ctx = options.ToContext(ctx, test.Options())
87-
env = coretest.NewEnvironment(coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...))
87+
env = coretest.NewEnvironment(coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...), coretest.WithFieldIndexers(test.NodeInstanceIDFieldIndexer(ctx), test.NodeClaimInstanceIDFieldIndexer(ctx)))
8888
awsEnv = test.NewEnvironment(ctx, env)
8989
fakeClock = &clock.FakeClock{}
9090
unavailableOfferingsCache = awscache.NewUnavailableOfferings()

pkg/operator/operator.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ import (
2626

2727
"github.com/aws/aws-sdk-go-v2/aws"
2828
"github.com/aws/aws-sdk-go-v2/aws/middleware"
29-
config "github.com/aws/aws-sdk-go-v2/config"
29+
"github.com/aws/aws-sdk-go-v2/config"
3030
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
3131
"github.com/aws/aws-sdk-go-v2/service/ec2"
3232
"github.com/aws/aws-sdk-go-v2/service/eks"
3333
"github.com/aws/aws-sdk-go-v2/service/iam"
3434
"github.com/aws/aws-sdk-go-v2/service/ssm"
35+
"sigs.k8s.io/controller-runtime/pkg/manager"
3536

3637
"github.com/aws/smithy-go"
3738
"github.com/patrickmn/go-cache"
@@ -66,6 +67,7 @@ import (
6667
ssmp "github.com/aws/karpenter-provider-aws/pkg/providers/ssm"
6768
"github.com/aws/karpenter-provider-aws/pkg/providers/subnet"
6869
"github.com/aws/karpenter-provider-aws/pkg/providers/version"
70+
"github.com/aws/karpenter-provider-aws/pkg/utils"
6971
)
7072

7173
func init() {
@@ -185,6 +187,10 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
185187
launchTemplateProvider,
186188
)
187189

190+
// Setup field indexers on instanceID -- specifically for the interruption controller
191+
if options.FromContext(ctx).InterruptionQueue != "" {
192+
SetupIndexers(ctx, operator.Manager)
193+
}
188194
return ctx, &Operator{
189195
Operator: operator,
190196
Config: cfg,
@@ -273,3 +279,26 @@ func KubeDNSIP(ctx context.Context, kubernetesInterface kubernetes.Interface) (n
273279
}
274280
return kubeDNSIP, nil
275281
}
282+
283+
func SetupIndexers(ctx context.Context, mgr manager.Manager) {
284+
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &karpv1.NodeClaim{}, "status.instanceID", func(o client.Object) []string {
285+
if o.(*karpv1.NodeClaim).Status.ProviderID == "" {
286+
return nil
287+
}
288+
id, e := utils.ParseInstanceID(o.(*karpv1.NodeClaim).Status.ProviderID)
289+
if e != nil || id == "" {
290+
return nil
291+
}
292+
return []string{id}
293+
}), "failed to setup nodeclaim instanceID indexer")
294+
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Node{}, "spec.instanceID", func(o client.Object) []string {
295+
if o.(*corev1.Node).Spec.ProviderID == "" {
296+
return nil
297+
}
298+
id, e := utils.ParseInstanceID(o.(*corev1.Node).Spec.ProviderID)
299+
if e != nil || id == "" {
300+
return nil
301+
}
302+
return []string{id}
303+
}), "failed to setup node instanceID indexer")
304+
}

pkg/test/environment.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"github.com/samber/lo"
2424
corev1 "k8s.io/api/core/v1"
2525
clock "k8s.io/utils/clock/testing"
26+
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
27+
"sigs.k8s.io/controller-runtime/pkg/client"
2628

2729
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
2830

@@ -39,6 +41,7 @@ import (
3941
ssmp "github.com/aws/karpenter-provider-aws/pkg/providers/ssm"
4042
"github.com/aws/karpenter-provider-aws/pkg/providers/subnet"
4143
"github.com/aws/karpenter-provider-aws/pkg/providers/version"
44+
"github.com/aws/karpenter-provider-aws/pkg/utils"
4245

4346
coretest "sigs.k8s.io/karpenter/pkg/test"
4447

@@ -216,3 +219,33 @@ func (env *Environment) Reset() {
216219
}
217220
}
218221
}
222+
223+
func NodeInstanceIDFieldIndexer(ctx context.Context) func(ctrlcache.Cache) error {
224+
return func(c ctrlcache.Cache) error {
225+
return c.IndexField(ctx, &corev1.Node{}, "spec.instanceID", func(obj client.Object) []string {
226+
if obj.(*corev1.Node).Spec.ProviderID == "" {
227+
return nil
228+
}
229+
id, e := utils.ParseInstanceID(obj.(*corev1.Node).Spec.ProviderID)
230+
if e != nil || id == "" {
231+
return nil
232+
}
233+
return []string{id}
234+
})
235+
}
236+
}
237+
238+
func NodeClaimInstanceIDFieldIndexer(ctx context.Context) func(ctrlcache.Cache) error {
239+
return func(c ctrlcache.Cache) error {
240+
return c.IndexField(ctx, &karpv1.NodeClaim{}, "status.instanceID", func(obj client.Object) []string {
241+
if obj.(*karpv1.NodeClaim).Status.ProviderID == "" {
242+
return nil
243+
}
244+
id, e := utils.ParseInstanceID(obj.(*karpv1.NodeClaim).Status.ProviderID)
245+
if e != nil || id == "" {
246+
return nil
247+
}
248+
return []string{id}
249+
})
250+
}
251+
}

0 commit comments

Comments
 (0)