@@ -38,16 +38,14 @@ import (
38
38
"sigs.k8s.io/karpenter/pkg/operator/injection"
39
39
40
40
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
41
- nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
42
41
"sigs.k8s.io/karpenter/pkg/utils/pretty"
43
42
43
+ "sigs.k8s.io/karpenter/pkg/events"
44
+
44
45
"github.com/aws/karpenter-provider-aws/pkg/cache"
45
46
interruptionevents "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/events"
46
47
"github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/messages"
47
48
"github.com/aws/karpenter-provider-aws/pkg/providers/sqs"
48
- "github.com/aws/karpenter-provider-aws/pkg/utils"
49
-
50
- "sigs.k8s.io/karpenter/pkg/events"
51
49
)
52
50
53
51
type Action string
@@ -104,14 +102,7 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
104
102
if len (sqsMessages ) == 0 {
105
103
return reconcile.Result {RequeueAfter : singleton .RequeueImmediately }, nil
106
104
}
107
- nodeClaimInstanceIDMap , err := c .makeNodeClaimInstanceIDMap (ctx )
108
- if err != nil {
109
- return reconcile.Result {}, fmt .Errorf ("making nodeclaim instance id map, %w" , err )
110
- }
111
- nodeInstanceIDMap , err := c .makeNodeInstanceIDMap (ctx )
112
- if err != nil {
113
- return reconcile.Result {}, fmt .Errorf ("making node instance id map, %w" , err )
114
- }
105
+
115
106
errs := make ([]error , len (sqsMessages ))
116
107
workqueue .ParallelizeUntil (ctx , 10 , len (sqsMessages ), func (i int ) {
117
108
msg , e := c .parseMessage (sqsMessages [i ])
@@ -121,7 +112,7 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
121
112
errs [i ] = c .deleteMessage (ctx , sqsMessages [i ])
122
113
return
123
114
}
124
- if e = c .handleMessage (ctx , nodeClaimInstanceIDMap , nodeInstanceIDMap , msg ); e != nil {
115
+ if e = c .handleMessage (ctx , msg ); e != nil {
125
116
errs [i ] = fmt .Errorf ("handling message, %w" , e )
126
117
return
127
118
}
@@ -154,23 +145,35 @@ func (c *Controller) parseMessage(raw *sqstypes.Message) (messages.Message, erro
154
145
}
155
146
156
147
// handleMessage takes an action against every node involved in the message that is owned by a NodePool
157
- func (c * Controller ) handleMessage (ctx context.Context , nodeClaimInstanceIDMap map [string ]* karpv1.NodeClaim ,
158
- nodeInstanceIDMap map [string ]* corev1.Node , msg messages.Message ) (err error ) {
159
-
148
+ func (c * Controller ) handleMessage (ctx context.Context , msg messages.Message ) (err error ) {
160
149
ctx = log .IntoContext (ctx , log .FromContext (ctx ).WithValues ("messageKind" , msg .Kind ()))
161
150
ReceivedMessages .Inc (map [string ]string {messageTypeLabel : string (msg .Kind ())})
162
151
163
152
if msg .Kind () == messages .NoOpKind {
164
153
return nil
165
154
}
166
155
for _ , instanceID := range msg .EC2InstanceIDs () {
167
- nodeClaim , ok := nodeClaimInstanceIDMap [instanceID ]
168
- if ! ok {
156
+ nodeClaimList := & karpv1.NodeClaimList {}
157
+ if e := c .kubeClient .List (ctx , nodeClaimList , client.MatchingFields {"status.instanceID" : instanceID }); e != nil {
158
+ err = multierr .Append (err , e )
169
159
continue
170
160
}
171
- node := nodeInstanceIDMap [instanceID ]
172
- if e := c .handleNodeClaim (ctx , msg , nodeClaim , node ); e != nil {
173
- err = multierr .Append (err , e )
161
+ if len (nodeClaimList .Items ) == 0 {
162
+ continue
163
+ }
164
+ for _ , nodeClaim := range nodeClaimList .Items {
165
+ nodeList := & corev1.NodeList {}
166
+ if e := c .kubeClient .List (ctx , nodeList , client.MatchingFields {"spec.instanceID" : instanceID }); e != nil {
167
+ err = multierr .Append (err , e )
168
+ continue
169
+ }
170
+ var node * corev1.Node
171
+ if len (nodeList .Items ) > 0 {
172
+ node = & nodeList .Items [0 ]
173
+ }
174
+ if e := c .handleNodeClaim (ctx , msg , & nodeClaim , node ); e != nil {
175
+ err = multierr .Append (err , e )
176
+ }
174
177
}
175
178
}
176
179
MessageLatency .Observe (time .Since (msg .StartTime ()).Seconds (), nil )
@@ -254,48 +257,6 @@ func (c *Controller) notifyForMessage(msg messages.Message, nodeClaim *karpv1.No
254
257
}
255
258
}
256
259
257
- // makeNodeClaimInstanceIDMap builds a map between the instance id that is stored in the
258
- // NodeClaim .status.providerID and the NodeClaim
259
- func (c * Controller ) makeNodeClaimInstanceIDMap (ctx context.Context ) (map [string ]* karpv1.NodeClaim , error ) {
260
- m := map [string ]* karpv1.NodeClaim {}
261
- nodeClaims , err := nodeclaimutils .ListManaged (ctx , c .kubeClient , c .cloudProvider )
262
- if err != nil {
263
- return nil , err
264
- }
265
- for _ , nc := range nodeClaims {
266
- if nc .Status .ProviderID == "" {
267
- continue
268
- }
269
- id , err := utils .ParseInstanceID (nc .Status .ProviderID )
270
- if err != nil || id == "" {
271
- continue
272
- }
273
- m [id ] = nc
274
- }
275
- return m , nil
276
- }
277
-
278
- // makeNodeInstanceIDMap builds a map between the instance id that is stored in the
279
- // node .spec.providerID and the node
280
- func (c * Controller ) makeNodeInstanceIDMap (ctx context.Context ) (map [string ]* corev1.Node , error ) {
281
- m := map [string ]* corev1.Node {}
282
- nodeList := & corev1.NodeList {}
283
- if err := c .kubeClient .List (ctx , nodeList ); err != nil {
284
- return nil , fmt .Errorf ("listing nodes, %w" , err )
285
- }
286
- for i := range nodeList .Items {
287
- if nodeList .Items [i ].Spec .ProviderID == "" {
288
- continue
289
- }
290
- id , err := utils .ParseInstanceID (nodeList .Items [i ].Spec .ProviderID )
291
- if err != nil || id == "" {
292
- continue
293
- }
294
- m [id ] = & nodeList .Items [i ]
295
- }
296
- return m , nil
297
- }
298
-
299
260
func actionForMessage (msg messages.Message ) Action {
300
261
switch msg .Kind () {
301
262
case messages .ScheduledChangeKind , messages .SpotInterruptionKind , messages .InstanceStoppedKind , messages .InstanceTerminatedKind :
0 commit comments