Skip to content

feat: Added UnavailableOfferingsTTL exposed as the flag & env variable in the karpenter #8013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hack/docs/instancetypes_gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ below are the resources available with some assumptions and after the instance o
cfg.Region,
),
nil,
awscache.NewUnavailableOfferings(),
awscache.NewUnavailableOfferings(ctx),
instancetype.NewDefaultResolver(
region,
),
Expand Down
2 changes: 1 addition & 1 deletion hack/tools/launchtemplate_counter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func main() {
cfg.Region,
),
nil,
awscache.NewUnavailableOfferings(),
awscache.NewUnavailableOfferings(ctx),
instancetype.NewDefaultResolver(
region,
),
Expand Down
5 changes: 2 additions & 3 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ const (
// AWS APIs, which can have a serious impact on performance and scalability.
// DO NOT CHANGE THIS VALUE WITHOUT DUE CONSIDERATION
DefaultTTL = time.Minute
// UnavailableOfferingsTTL is the time before offerings that were marked as unavailable
// are removed from the cache and are available for launch again
UnavailableOfferingsTTL = 3 * time.Minute
// CapacityReservationAvailabilityTTL is the time we will persist cached capacity availability. Nominally, this is
// updated every minute, but we want to persist the data longer in the event of an EC2 API outage. 24 hours was the
// compormise made for API outage reseliency and gargage collecting entries for orphaned reservations.
CapacityReservationAvailabilityTTL = 24 * time.Hour
// InstanceTypesZonesAndOfferingsTTL is the time before we refresh instance types, zones, and offerings at EC2
InstanceTypesZonesAndOfferingsTTL = 5 * time.Minute
// InstanceTypesAndZonesTTL is the time before we refresh instance types and zones at EC2
InstanceTypesAndZonesTTL = 5 * time.Minute
// InstanceProfileTTL is the time before we refresh checking instance profile existence at IAM
InstanceProfileTTL = 15 * time.Minute
// AvailableIPAddressTTL is time to drop AvailableIPAddress data if it is not updated within the TTL
Expand Down
6 changes: 5 additions & 1 deletion pkg/cache/unavailableofferings.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
"github.com/samber/lo"

"github.com/patrickmn/go-cache"
Expand All @@ -37,7 +39,8 @@ type UnavailableOfferings struct {
SeqNum uint64
}

func NewUnavailableOfferings() *UnavailableOfferings {
func NewUnavailableOfferings(ctx context.Context) *UnavailableOfferings {
UnavailableOfferingsTTL := time.Duration(int64(options.FromContext(ctx).UnavailableOfferingsTTL)) * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I'd prefer to pass in only the TTL as an argument than the whole ctx object just to keep the function footprint small. Additionally, why store the option as an int just to cast it to an int64 just to convert it to a time.Duration?

uo := &UnavailableOfferings{
offeringCache: cache.New(UnavailableOfferingsTTL, UnavailableOfferingsCleanupInterval),
capacityTypeCache: cache.New(UnavailableOfferingsTTL, UnavailableOfferingsCleanupInterval),
Expand All @@ -61,6 +64,7 @@ func (u *UnavailableOfferings) IsUnavailable(instanceType ec2types.InstanceType,

// MarkUnavailable communicates recently observed temporary capacity shortages in the provided offerings
func (u *UnavailableOfferings) MarkUnavailable(ctx context.Context, unavailableReason string, instanceType ec2types.InstanceType, zone, capacityType string) {
UnavailableOfferingsTTL := time.Duration(int64(options.FromContext(ctx).UnavailableOfferingsTTL)) * time.Second
// even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL
log.FromContext(ctx).WithValues(
"reason", unavailableReason,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func benchmarkNotificationController(b *testing.B, messageCount int) {

// Load all the fundamental components before setting up the controllers
recorder := coretest.NewEventRecorder()
unavailableOfferingsCache = awscache.NewUnavailableOfferings()
unavailableOfferingsCache = awscache.NewUnavailableOfferings(ctx)

// Set-up the controllers
interruptionController := interruption.NewController(env.Client, fakeClock, recorder, providers.sqsProvider, unavailableOfferingsCache)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/interruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var _ = BeforeSuite(func() {
env = coretest.NewEnvironment(coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...), coretest.WithFieldIndexers(test.NodeInstanceIDFieldIndexer(ctx), test.NodeClaimInstanceIDFieldIndexer(ctx)))
awsEnv = test.NewEnvironment(ctx, env)
fakeClock = &clock.FakeClock{}
unavailableOfferingsCache = awscache.NewUnavailableOfferings()
unavailableOfferingsCache = awscache.NewUnavailableOfferings(ctx)
sqsapi = &fake.SQSAPI{}
sqsProvider = lo.Must(sqs.NewDefaultProvider(sqsapi, fmt.Sprintf("https://sqs.%s.amazonaws.com/%s/test-cluster", fake.DefaultRegion, fake.DefaultAccount)))
cloudProvider := cloudprovider.New(awsEnv.InstanceTypesProvider, awsEnv.InstanceProvider, events.NewRecorder(&record.FakeRecorder{}),
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
} else {
log.FromContext(ctx).WithValues("kube-dns-ip", kubeDNSIP).V(1).Info("discovered kube dns")
}
unavailableOfferingsCache := awscache.NewUnavailableOfferings()
unavailableOfferingsCache := awscache.NewUnavailableOfferings(ctx)
ssmCache := cache.New(awscache.SSMCacheTTL, awscache.DefaultCleanupInterval)
validationCache := cache.New(awscache.ValidationTTL, awscache.DefaultCleanupInterval)

Expand Down
2 changes: 2 additions & 0 deletions pkg/operator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Options struct {
VMMemoryOverheadPercent float64
InterruptionQueue string
ReservedENIs int
UnavailableOfferingsTTL int
}

func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
Expand All @@ -53,6 +54,7 @@ func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types when cached information is unavailable.")
fs.StringVar(&o.InterruptionQueue, "interruption-queue", env.WithDefaultString("INTERRUPTION_QUEUE", ""), "Interruption queue is the name of the SQS queue used for processing interruption events from EC2. Interruption handling is disabled if not specified. Enabling interruption handling may require additional permissions on the controller service account. Additional permissions are outlined in the docs.")
fs.IntVar(&o.ReservedENIs, "reserved-enis", env.WithDefaultInt("RESERVED_ENIS", 0), "Reserved ENIs are not included in the calculations for max-pods or kube-reserved. This is most often used in the VPC CNI custom networking setup https://docs.aws.amazon.com/eks/latest/userguide/cni-custom-network.html.")
fs.IntVar(&o.UnavailableOfferingsTTL, "unavailable-offerings-ttl", env.WithDefaultInt("UNAVAILABLE_OFFERINGS_TTL", 180), "The Unavailable offerings TTL is the time before offerings that were marked as unavailable are removed from the cache and are available for launch again.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason to switch this TTL to seconds over minutes? Either way, the description should include a '... in minutes ...' or an '... in seconds ...' to indicate to the user the correct unit.

}

func (o *Options) Parse(fs *coreoptions.FlagSet, args ...string) error {
Expand Down
8 changes: 8 additions & 0 deletions pkg/operator/options/options_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (o Options) Validate() error {
o.validateEndpoint(),
o.validateVMMemoryOverheadPercent(),
o.validateReservedENIs(),
o.validateUnavailableOfferingsTTL(),
o.validateRequiredFields(),
)
}
Expand Down Expand Up @@ -57,6 +58,13 @@ func (o Options) validateReservedENIs() error {
return nil
}

func (o Options) validateUnavailableOfferingsTTL() error {
if o.UnavailableOfferingsTTL < 0 {
return fmt.Errorf("unavailable-offerings-ttl cannot be negative")
}
return nil
}

func (o Options) validateRequiredFields() error {
if o.ClusterName == "" {
return fmt.Errorf("missing field, cluster-name")
Expand Down
11 changes: 10 additions & 1 deletion pkg/operator/options/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ var _ = Describe("Options", func() {
"--isolated-vpc",
"--vm-memory-overhead-percent", "0.1",
"--interruption-queue", "env-cluster",
"--reserved-enis", "10")
"--reserved-enis", "10",
"--unavailable-offerings-ttl", "30")
Expect(err).ToNot(HaveOccurred())
expectOptionsEqual(opts, test.Options(test.OptionsFields{
ClusterCABundle: lo.ToPtr("env-bundle"),
Expand All @@ -72,6 +73,7 @@ var _ = Describe("Options", func() {
VMMemoryOverheadPercent: lo.ToPtr[float64](0.1),
InterruptionQueue: lo.ToPtr("env-cluster"),
ReservedENIs: lo.ToPtr(10),
UnavailableOfferingsTTL: lo.ToPtr(30),
}))
})
It("should correctly fallback to env vars when CLI flags aren't set", func() {
Expand All @@ -82,6 +84,7 @@ var _ = Describe("Options", func() {
os.Setenv("VM_MEMORY_OVERHEAD_PERCENT", "0.1")
os.Setenv("INTERRUPTION_QUEUE", "env-cluster")
os.Setenv("RESERVED_ENIS", "10")
os.Setenv("UNAVAILABLE_OFFERINGS_TTL", "30")

// Add flags after we set the environment variables so that the parsing logic correctly refers
// to the new environment variable values
Expand All @@ -96,6 +99,7 @@ var _ = Describe("Options", func() {
VMMemoryOverheadPercent: lo.ToPtr[float64](0.1),
InterruptionQueue: lo.ToPtr("env-cluster"),
ReservedENIs: lo.ToPtr(10),
UnavailableOfferingsTTL: lo.ToPtr(30),
}))
})

Expand All @@ -119,6 +123,10 @@ var _ = Describe("Options", func() {
err := opts.Parse(fs, "--cluster-name", "test-cluster", "--reserved-enis", "-1")
Expect(err).To(HaveOccurred())
})
It("should fail when unavailableOfferingsTTL is negative", func() {
err := opts.Parse(fs, "--cluster-name", "test-cluster", "--unavailable-offerings-ttl", "-1")
Expect(err).To(HaveOccurred())
})
})
})

Expand All @@ -131,4 +139,5 @@ func expectOptionsEqual(optsA *options.Options, optsB *options.Options) {
Expect(optsA.VMMemoryOverheadPercent).To(Equal(optsB.VMMemoryOverheadPercent))
Expect(optsA.InterruptionQueue).To(Equal(optsB.InterruptionQueue))
Expect(optsA.ReservedENIs).To(Equal(optsB.ReservedENIs))
Expect(optsA.UnavailableOfferingsTTL).To(Equal(optsB.UnavailableOfferingsTTL))
}
2 changes: 1 addition & 1 deletion pkg/test/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment
instanceTypeCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
offeringCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
discoveredCapacityCache := cache.New(awscache.DiscoveredCapacityCacheTTL, awscache.DefaultCleanupInterval)
unavailableOfferingsCache := awscache.NewUnavailableOfferings()
unavailableOfferingsCache := awscache.NewUnavailableOfferings(ctx)
launchTemplateCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
subnetCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
availableIPAdressCache := cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval)
Expand Down