Skip to content

Commit 15781e3

Browse files
cherrypick: fix: skip EC2 API calls for instances in zonally shifted AZs (#9112) (#9139)
Co-authored-by: Derek Frank <dfrankmn@protonmail.com>
1 parent c9beee2 commit 15781e3

9 files changed

Lines changed: 231 additions & 18 deletions

File tree

kwok/operator/operator.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,16 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
218218
launchTemplateProvider,
219219
capacityReservationProvider,
220220
placementGroupProvider,
221-
cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval),
221+
zsProvider,
222+
// Instance cache entries never expire. The cache is actively refreshed by the garbage
223+
// collection controller's List() every 2 minutes, and entries are explicitly removed
224+
// when Get() returns NotFound from EC2. NoExpiration ensures cached instances in zonally
225+
// shifted AZs remain available for the zonal shift guards in Get(), Delete(), and
226+
// CreateTags(), even if List() cannot return instances from the impaired AZ.
227+
// TODO: Add a cache garbage collector that reconciles entries against EC2 and the API
228+
// server, evicting entries for instances that no longer exist in either. Note: removing
229+
// NodeClaim finalizers to bypass Karpenter's lifecycle management is not supported.
230+
cache.New(cache.NoExpiration, cache.NoExpiration),
222231
)
223232

224233
instanceStatusProvider := instancestatus.NewDefaultProvider(ec2api, operator.Clock)

pkg/fake/arczonalshiftapi.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
Licensed under the Apache License, Version 2.0 (the "License");
3+
you may not use this file except in compliance with the License.
4+
You may obtain a copy of the License at
5+
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
8+
Unless required by applicable law or agreed to in writing, software
9+
distributed under the License is distributed on an "AS IS" BASIS,
10+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
See the License for the specific language governing permissions and
12+
limitations under the License.
13+
*/
14+
15+
package fake
16+
17+
import (
18+
"context"
19+
20+
"github.com/aws/aws-sdk-go-v2/service/arczonalshift"
21+
22+
sdk "github.com/aws/karpenter-provider-aws/pkg/aws"
23+
)
24+
25+
type ARCZonalShiftAPI struct {
26+
sdk.ARCZonalShiftAPI
27+
GetManagedResourceBehavior MockedFunction[arczonalshift.GetManagedResourceInput, arczonalshift.GetManagedResourceOutput]
28+
}
29+
30+
func NewARCZonalShiftAPI() *ARCZonalShiftAPI {
31+
return &ARCZonalShiftAPI{}
32+
}
33+
34+
func (a *ARCZonalShiftAPI) GetManagedResource(ctx context.Context, input *arczonalshift.GetManagedResourceInput, _ ...func(*arczonalshift.Options)) (*arczonalshift.GetManagedResourceOutput, error) {
35+
return a.GetManagedResourceBehavior.Invoke(input, func(*arczonalshift.GetManagedResourceInput) (*arczonalshift.GetManagedResourceOutput, error) {
36+
return &arczonalshift.GetManagedResourceOutput{}, nil
37+
})
38+
}
39+
40+
func (a *ARCZonalShiftAPI) Reset() {
41+
a.GetManagedResourceBehavior.Reset()
42+
}

pkg/operator/operator.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,16 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
228228
launchTemplateProvider,
229229
capacityReservationProvider,
230230
placementGroupProvider,
231-
cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval),
231+
zsProvider,
232+
// Instance cache entries never expire. The cache is actively refreshed by the garbage
233+
// collection controller's List() every 2 minutes, and entries are explicitly removed
234+
// when Get() returns NotFound from EC2. NoExpiration ensures cached instances in zonally
235+
// shifted AZs remain available for the zonal shift guards in Get(), Delete(), and
236+
// CreateTags(), even if List() cannot return instances from the impaired AZ.
237+
// TODO: Add a cache garbage collector that reconciles entries against EC2 and the API
238+
// server, evicting entries for instances that no longer exist in either. Note: removing
239+
// NodeClaim finalizers to bypass Karpenter's lifecycle management is not supported.
240+
cache.New(cache.NoExpiration, cache.NoExpiration),
232241
)
233242
instanceStatusProvider := instancestatus.NewDefaultProvider(ec2api, operator.Clock)
234243

pkg/providers/arczonalshift/arczonalshift.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"time"
2222

2323
"github.com/aws/aws-sdk-go-v2/service/arczonalshift"
24+
arczonalshifttypes "github.com/aws/aws-sdk-go-v2/service/arczonalshift/types"
2425
"k8s.io/utils/clock"
2526
"sigs.k8s.io/controller-runtime/pkg/log"
2627

@@ -70,7 +71,7 @@ func (p *DefaultProvider) UpdateZonalShifts(ctx context.Context) error {
7071
for _, shift := range activeZonalShifts {
7172
shiftStatuses[*shift.AwayFrom] = shiftStatus{
7273
shiftExpiry: *shift.ExpiryTime,
73-
applied: shift.AppliedStatus == "APPLIED",
74+
applied: shift.AppliedStatus == arczonalshifttypes.AppliedStatusApplied,
7475
}
7576
}
7677
if len(shiftStatuses) == 0 {
@@ -85,6 +86,12 @@ func (p *DefaultProvider) UpdateZonalShifts(ctx context.Context) error {
8586
return nil
8687
}
8788

89+
func (p *DefaultProvider) Reset() {
90+
p.Lock()
91+
defer p.Unlock()
92+
p.zonalShiftStatuses = make(map[string]shiftStatus)
93+
}
94+
8895
func (p *DefaultProvider) IsZonalShifted(ctx context.Context, zoneId string) bool {
8996
p.RLock()
9097
defer p.RUnlock()

pkg/providers/instance/instance.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"sigs.k8s.io/karpenter/pkg/utils/resources"
2929

3030
sdk "github.com/aws/karpenter-provider-aws/pkg/aws"
31+
"github.com/aws/karpenter-provider-aws/pkg/providers/arczonalshift"
3132
"github.com/aws/karpenter-provider-aws/pkg/utils"
3233

3334
"github.com/aws/aws-sdk-go-v2/aws"
@@ -106,6 +107,7 @@ type DefaultProvider struct {
106107
ec2Batcher *batcher.EC2API
107108
capacityReservationProvider capacityreservation.Provider
108109
placementGroupProvider placementgroup.Provider
110+
zonalshiftProvider arczonalshift.Provider
109111
instanceCache *cache.Cache
110112
}
111113

@@ -119,6 +121,7 @@ func NewDefaultProvider(
119121
launchTemplateProvider launchtemplate.Provider,
120122
capacityReservationProvider capacityreservation.Provider,
121123
placementGroupProvider placementgroup.Provider,
124+
zonalshiftProvider arczonalshift.Provider,
122125
instanceCache *cache.Cache,
123126
) *DefaultProvider {
124127
return &DefaultProvider{
@@ -131,6 +134,7 @@ func NewDefaultProvider(
131134
ec2Batcher: batcher.EC2(ctx, ec2api),
132135
capacityReservationProvider: capacityReservationProvider,
133136
placementGroupProvider: placementGroupProvider,
137+
zonalshiftProvider: zonalshiftProvider,
134138
instanceCache: instanceCache,
135139
}
136140
}
@@ -177,9 +181,15 @@ func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1.EC2NodeClass
177181

178182
func (p *DefaultProvider) Get(ctx context.Context, id string, opts ...Options) (*Instance, error) {
179183
skipCache := option.Resolve(opts...).SkipCache
180-
if !skipCache {
181-
if i, ok := p.instanceCache.Get(id); ok {
182-
return i.(*Instance), nil
184+
if i, ok := p.instanceCache.Get(id); ok {
185+
inst := i.(*Instance)
186+
// During a zonal shift, return cached data for instances in the shifted zone to avoid
187+
// DescribeInstances calls that could cause retry storms against the impaired AZ
188+
if inst.ZoneID != "" && p.zonalshiftProvider.IsZonalShifted(ctx, inst.ZoneID) {
189+
return inst, nil
190+
}
191+
if !skipCache {
192+
return inst, nil
183193
}
184194
}
185195
out, err := p.ec2Batcher.DescribeInstances(ctx, &ec2.DescribeInstancesInput{
@@ -246,6 +256,11 @@ func (p *DefaultProvider) Delete(ctx context.Context, id string) error {
246256
if err != nil {
247257
return err
248258
}
259+
// During a zonal shift, Get() returns cached data without calling DescribeInstances.
260+
// We also skip TerminateInstances to avoid retry storms against the impaired AZ.
261+
if out.ZoneID != "" && p.zonalshiftProvider.IsZonalShifted(ctx, out.ZoneID) {
262+
return fmt.Errorf("instance %s is in zonally shifted availability zone %s (%s), skipping termination", id, out.Zone, out.ZoneID)
263+
}
249264
// Check if the instance is already shutting-down to reduce the number of terminate-instance calls we make thereby
250265
// reducing our overall QPS. Due to EC2's eventual consistency model, the result of the terminate-instance or
251266
// describe-instance call may return a not found error even when the instance is not terminated -
@@ -262,6 +277,13 @@ func (p *DefaultProvider) Delete(ctx context.Context, id string) error {
262277
}
263278

264279
func (p *DefaultProvider) CreateTags(ctx context.Context, id string, tags map[string]string) error {
280+
// Check the instance cache for zonal shift status before making the API call.
281+
// During a zonal shift, CreateTags calls to the impaired AZ can cause retry storms.
282+
if i, ok := p.instanceCache.Get(id); ok {
283+
if inst := i.(*Instance); inst.ZoneID != "" && p.zonalshiftProvider.IsZonalShifted(ctx, inst.ZoneID) {
284+
return fmt.Errorf("instance %s is in zonally shifted availability zone %s (%s), skipping tag creation", id, inst.Zone, inst.ZoneID)
285+
}
286+
}
265287
ec2Tags := lo.MapToSlice(tags, func(key, value string) ec2types.Tag {
266288
return ec2types.Tag{Key: aws.String(key), Value: aws.String(value)}
267289
})

pkg/providers/instance/suite_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
testv1alpha1 "sigs.k8s.io/karpenter/pkg/test/v1alpha1"
2424

2525
"github.com/aws/aws-sdk-go-v2/aws"
26+
"github.com/aws/aws-sdk-go-v2/service/arczonalshift"
27+
arczonalshifttypes "github.com/aws/aws-sdk-go-v2/service/arczonalshift/types"
2628
"github.com/aws/aws-sdk-go-v2/service/ec2"
2729
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
2830
"github.com/awslabs/operatorpkg/object"
@@ -665,4 +667,90 @@ var _ = Describe("InstanceProvider", func() {
665667
Expect(createdInstance.EFACount).To(Equal(0))
666668
})
667669
})
670+
Context("Zonal Shift", func() {
671+
var instanceID string
672+
BeforeEach(func() {
673+
// Store an instance in the shifted zone and populate the cache
674+
ec2Instance := test.EC2Instance(ec2types.Instance{
675+
Placement: &ec2types.Placement{
676+
AvailabilityZone: aws.String("test-zone-1a"),
677+
AvailabilityZoneId: aws.String("tstz1-1a"),
678+
},
679+
})
680+
instanceID = aws.ToString(ec2Instance.InstanceId)
681+
awsEnv.EC2API.Instances.Store(instanceID, ec2Instance)
682+
683+
_, err := awsEnv.InstanceProvider.Get(ctx, instanceID)
684+
Expect(err).ToNot(HaveOccurred())
685+
awsEnv.EC2API.DescribeInstancesBehavior.CalledWithInput.Reset()
686+
awsEnv.EC2API.TerminateInstancesBehavior.CalledWithInput.Reset()
687+
awsEnv.EC2API.CreateTagsBehavior.CalledWithInput.Reset()
688+
689+
// Activate a zonal shift for tstz1-1a
690+
awsEnv.ARCZonalShiftAPI.GetManagedResourceBehavior.Output.Set(&arczonalshift.GetManagedResourceOutput{
691+
ZonalShifts: []arczonalshifttypes.ZonalShiftInResource{
692+
{
693+
AwayFrom: aws.String("tstz1-1a"),
694+
ExpiryTime: aws.Time(time.Now().Add(time.Hour)),
695+
AppliedStatus: arczonalshifttypes.AppliedStatusApplied,
696+
},
697+
},
698+
})
699+
Expect(awsEnv.ZonalShiftProvider.UpdateZonalShifts(ctx)).To(Succeed())
700+
})
701+
It("should not call DescribeInstances for instances in a zonally shifted AZ", func() {
702+
inst, err := awsEnv.InstanceProvider.Get(ctx, instanceID, instance.SkipCache)
703+
Expect(err).ToNot(HaveOccurred())
704+
Expect(inst.ID).To(Equal(instanceID))
705+
Expect(inst.ZoneID).To(Equal("tstz1-1a"))
706+
Expect(awsEnv.EC2API.DescribeInstancesBehavior.CalledWithInput.Len()).To(Equal(0))
707+
})
708+
It("should not call TerminateInstances for instances in a zonally shifted AZ", func() {
709+
err := awsEnv.InstanceProvider.Delete(ctx, instanceID)
710+
Expect(err).To(HaveOccurred())
711+
Expect(err.Error()).To(ContainSubstring("zonally shifted"))
712+
Expect(awsEnv.EC2API.DescribeInstancesBehavior.CalledWithInput.Len()).To(Equal(0))
713+
Expect(awsEnv.EC2API.TerminateInstancesBehavior.CalledWithInput.Len()).To(Equal(0))
714+
})
715+
It("should not call CreateTags for instances in a zonally shifted AZ", func() {
716+
err := awsEnv.InstanceProvider.CreateTags(ctx, instanceID, map[string]string{"test-key": "test-value"})
717+
Expect(err).To(HaveOccurred())
718+
Expect(err.Error()).To(ContainSubstring("zonally shifted"))
719+
Expect(awsEnv.EC2API.CreateTagsBehavior.CalledWithInput.Len()).To(Equal(0))
720+
})
721+
Context("Cache Miss", func() {
722+
// When the instance cache is cold, Get() calls DescribeInstances which populates
723+
// the cache with zone information. Delete() benefits from this since it calls Get()
724+
// first, so the zonal shift guard still applies. CreateTags() does not call Get(),
725+
// so its guard depends on a warm cache.
726+
var uncachedInstanceID string
727+
BeforeEach(func() {
728+
ec2Instance := test.EC2Instance(ec2types.Instance{
729+
Placement: &ec2types.Placement{
730+
AvailabilityZone: aws.String("test-zone-1a"),
731+
AvailabilityZoneId: aws.String("tstz1-1a"),
732+
},
733+
})
734+
uncachedInstanceID = aws.ToString(ec2Instance.InstanceId)
735+
// Store in EC2 but do NOT call Get() to populate the instance cache
736+
awsEnv.EC2API.Instances.Store(uncachedInstanceID, ec2Instance)
737+
awsEnv.EC2API.DescribeInstancesBehavior.CalledWithInput.Reset()
738+
awsEnv.EC2API.TerminateInstancesBehavior.CalledWithInput.Reset()
739+
awsEnv.EC2API.CreateTagsBehavior.CalledWithInput.Reset()
740+
})
741+
It("should not call TerminateInstances even with a cold cache since Delete calls Get first", func() {
742+
err := awsEnv.InstanceProvider.Delete(ctx, uncachedInstanceID)
743+
Expect(err).To(HaveOccurred())
744+
Expect(err.Error()).To(ContainSubstring("zonally shifted"))
745+
// DescribeInstances is called by Get() inside Delete() to fetch instance data
746+
Expect(awsEnv.EC2API.DescribeInstancesBehavior.CalledWithInput.Len()).To(Equal(1))
747+
Expect(awsEnv.EC2API.TerminateInstancesBehavior.CalledWithInput.Len()).To(Equal(0))
748+
})
749+
It("should proceed with CreateTags when instance is not in the cache", func() {
750+
err := awsEnv.InstanceProvider.CreateTags(ctx, uncachedInstanceID, map[string]string{"test-key": "test-value"})
751+
Expect(err).ToNot(HaveOccurred())
752+
Expect(awsEnv.EC2API.CreateTagsBehavior.CalledWithInput.Len()).To(Equal(1))
753+
})
754+
})
755+
})
668756
})

pkg/providers/instance/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type Instance struct {
4141
ImageID string
4242
Type ec2types.InstanceType
4343
Zone string
44+
ZoneID string
4445
CapacityType string
4546
SecurityGroupIDs []string
4647
SubnetID string
@@ -76,6 +77,7 @@ func NewInstance(ctx context.Context, instance ec2types.Instance) *Instance {
7677
ImageID: lo.FromPtr(instance.ImageId),
7778
Type: instance.InstanceType,
7879
Zone: lo.FromPtr(instance.Placement.AvailabilityZone),
80+
ZoneID: lo.FromPtr(instance.Placement.AvailabilityZoneId),
7981
// NOTE: Only set the capacity type to reserved and assign a reservation ID if the feature gate is enabled. It's
8082
// possible for these to be set if the instance launched into an open ODCR, but treating it as reserved would induce
8183
// drift.

pkg/test/environment.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,12 @@ type Environment struct {
6565
InstanceTypeStore *nodeoverlay.InstanceTypeStore
6666

6767
// API
68-
EC2API *fake.EC2API
69-
EKSAPI *fake.EKSAPI
70-
SSMAPI *fake.SSMAPI
71-
IAMAPI *fake.IAMAPI
72-
PricingAPI *fake.PricingAPI
68+
EC2API *fake.EC2API
69+
EKSAPI *fake.EKSAPI
70+
SSMAPI *fake.SSMAPI
71+
IAMAPI *fake.IAMAPI
72+
PricingAPI *fake.PricingAPI
73+
ARCZonalShiftAPI *fake.ARCZonalShiftAPI
7374

7475
// Cache
7576
AMICache *cache.Cache
@@ -110,6 +111,7 @@ type Environment struct {
110111
VersionProvider *version.DefaultProvider
111112
LaunchTemplateProvider *launchtemplate.DefaultProvider
112113
SSMProvider *ssmp.DefaultProvider
114+
ZonalShiftProvider *arczonalshift.DefaultProvider
113115
}
114116

115117
func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment {
@@ -122,12 +124,14 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment
122124
eksapi := fake.NewEKSAPI()
123125
ssmapi := fake.NewSSMAPI()
124126
iamapi := fake.NewIAMAPI()
127+
arczonalshiftapi := fake.NewARCZonalShiftAPI()
125128

126129
// cache
127130
amiCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
128131
ec2Cache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
129132
instanceTypeCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
130-
instanceCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
133+
// Instance cache entries never expire. See comment in pkg/operator/operator.go.
134+
instanceCache := cache.New(cache.NoExpiration, cache.NoExpiration)
131135
offeringCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
132136
discoveredCapacityCache := cache.New(awscache.DiscoveredCapacityCacheTTL, awscache.DefaultCleanupInterval)
133137
unavailableOfferingsCache := awscache.NewUnavailableOfferings()
@@ -165,7 +169,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment
165169
amiResolver := amifamily.NewDefaultResolver(fake.DefaultRegion)
166170
instanceTypesResolver := instancetype.NewDefaultResolver(fake.DefaultRegion)
167171
capacityReservationProvider := capacityreservation.NewProvider(ec2api, clock, capacityReservationCache, capacityReservationAvailabilityCache)
168-
zonalshiftProvider := arczonalshift.NewNoopProvider()
172+
zonalshiftProvider := arczonalshift.NewProvider(arczonalshiftapi, clock, "")
169173
instanceTypesProvider := instancetype.NewDefaultProvider(instanceTypeCache, offeringCache, discoveredCapacityCache, ec2api, subnetProvider, pricingProvider, capacityReservationProvider, placementGroupProvider, unavailableOfferingsCache, instanceTypesResolver, zonalshiftProvider)
170174
// Ensure we're able to hydrate instance types before starting any reliant controllers.
171175
// Instance type updates are hydrated asynchronously after this by controllers.
@@ -200,6 +204,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment
200204
launchTemplateProvider,
201205
capacityReservationProvider,
202206
placementGroupProvider,
207+
zonalshiftProvider,
203208
instanceCache,
204209
)
205210

@@ -208,11 +213,12 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment
208213
EventRecorder: eventRecorder,
209214
InstanceTypeStore: store,
210215

211-
EC2API: ec2api,
212-
EKSAPI: eksapi,
213-
SSMAPI: ssmapi,
214-
IAMAPI: iamapi,
215-
PricingAPI: fakePricingAPI,
216+
EC2API: ec2api,
217+
EKSAPI: eksapi,
218+
SSMAPI: ssmapi,
219+
IAMAPI: iamapi,
220+
PricingAPI: fakePricingAPI,
221+
ARCZonalShiftAPI: arczonalshiftapi,
216222

217223
AMICache: amiCache,
218224
EC2Cache: ec2Cache,
@@ -252,6 +258,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment
252258
AMIResolver: amiResolver,
253259
VersionProvider: versionProvider,
254260
SSMProvider: ssmProvider,
261+
ZonalShiftProvider: zonalshiftProvider,
255262
}
256263
}
257264

@@ -262,6 +269,8 @@ func (env *Environment) Reset() {
262269
env.SSMAPI.Reset()
263270
env.IAMAPI.Reset()
264271
env.PricingAPI.Reset()
272+
env.ARCZonalShiftAPI.Reset()
273+
env.ZonalShiftProvider.Reset()
265274
env.PricingProvider.Reset()
266275
env.InstanceTypesProvider.Reset()
267276

0 commit comments

Comments
 (0)