-
Notifications
You must be signed in to change notification settings - Fork 1.1k
perf: clear instance type cache after ICE #7517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -17,6 +17,9 @@ package offering | |||||
import ( | ||||||
"context" | ||||||
"fmt" | ||||||
"strings" | ||||||
"sync" | ||||||
"time" | ||||||
|
||||||
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" | ||||||
"github.com/mitchellh/hashstructure/v2" | ||||||
|
@@ -44,6 +47,11 @@ type DefaultProvider struct { | |||||
capacityReservationProvider capacityreservation.Provider | ||||||
unavailableOfferings *awscache.UnavailableOfferings | ||||||
cache *cache.Cache | ||||||
|
||||||
muLastUnavailableOfferingsSeqNum sync.Mutex | ||||||
|
||||||
// lastUnavailableOfferingsSeqNum is the most recently seen seq num of the unavailable offerings cache, used to track changes | ||||||
lastUnavailableOfferingsSeqNum uint64 | ||||||
} | ||||||
|
||||||
func NewDefaultProvider( | ||||||
|
@@ -66,6 +74,15 @@ func (p *DefaultProvider) InjectOfferings( | |||||
nodeClass *v1.EC2NodeClass, | ||||||
allZones sets.Set[string], | ||||||
) []*cloudprovider.InstanceType { | ||||||
|
||||||
// If unavailable offerings have changed, the availability of all cached on-demand & spot offerings must be updated | ||||||
p.muLastUnavailableOfferingsSeqNum.Lock() | ||||||
if seqNum := p.unavailableOfferings.SeqNum; p.lastUnavailableOfferingsSeqNum < seqNum { | ||||||
p.updateOfferingAvailability() | ||||||
p.lastUnavailableOfferingsSeqNum = seqNum | ||||||
} | ||||||
p.muLastUnavailableOfferingsSeqNum.Unlock() | ||||||
|
||||||
subnetZones := lo.SliceToMap(nodeClass.Status.Subnets, func(s v1.Subnet) (string, string) { | ||||||
return s.Zone, s.ZoneID | ||||||
}) | ||||||
|
@@ -218,10 +235,39 @@ func (p *DefaultProvider) cacheKeyFromInstanceType(it *cloudprovider.InstanceTyp | |||||
&hashstructure.HashOptions{SlicesAsSets: true}, | ||||||
) | ||||||
return fmt.Sprintf( | ||||||
"%s-%016x-%016x-%d", | ||||||
"%s-%016x-%016x", | ||||||
it.Name, | ||||||
zonesHash, | ||||||
capacityTypesHash, | ||||||
p.unavailableOfferings.SeqNum, | ||||||
) | ||||||
} | ||||||
|
||||||
func (p *DefaultProvider) updateOfferingAvailability() { | ||||||
for k, v := range p.cache.Items() { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of changing the values in the cache, why not do the manipulation before we add the offerings in the cache? When we have a change in the SeqNum, why not clear the entries from the cache There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The previous solution would clear the entire cache whenever the SeqNum changed as all keys were invalidated. |
||||||
var updatedOfferings []*cloudprovider.Offering | ||||||
// Extract instance type name from cache key | ||||||
itName := strings.Split(k, "-")[0] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a better way to do this? This is a bit fragile, but perhaps this is as good as it gets with stringly-typed cache keys There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should be able to derive from the instance type resolved from the cache, no? From There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
for _, offering := range v.Object.([]*cloudprovider.Offering) { | ||||||
capacityType := offering.CapacityType() | ||||||
// unavailableOfferings only affects on-demand & spot offerings | ||||||
if capacityType == karpv1.CapacityTypeOnDemand || capacityType == karpv1.CapacityTypeSpot { | ||||||
zone := offering.Zone() | ||||||
isUnavailable := p.unavailableOfferings.IsUnavailable(ec2types.InstanceType(itName), zone, capacityType) | ||||||
hasPrice := offering.Price > 0.0 | ||||||
// A new offering is created to ensure that the previous offering is not modified while still in use | ||||||
updatedOfferings = append(updatedOfferings, &cloudprovider.Offering{ | ||||||
Requirements: offering.Requirements, | ||||||
Price: offering.Price, | ||||||
Available: !isUnavailable && hasPrice, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is missing the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we dropped these instance types, this would induced drift. Increasing disruption rate for users is a pretty big draw back: https://github.com/kubernetes-sigs/karpenter/blob/main/pkg/controllers/nodeclaim/disruption/drift.go#L114 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be better to include itZones (and itName) in the cache record then, since that would also solve the issue with extracting the instance type name from the cache key. |
||||||
}) | ||||||
} else if capacityType == karpv1.CapacityTypeReserved { | ||||||
// Since the previous offering has not been modified, it can be reused | ||||||
updatedOfferings = append(updatedOfferings, offering) | ||||||
} else { | ||||||
panic(fmt.Sprintf("invalid capacity type %q in requirements for instance type %q", capacityType, itName)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we adding a panic here? Karpenter should be able to continue to work with other nodepools if one nodepool is misconfigured There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The panic is originally from karpenter-provider-aws/pkg/providers/instancetype/offering/provider.go Lines 169 to 170 in eaa3f20
The duplicated functionality here could be extracted into a separate "get price by name, zone and capacity type" utility function |
||||||
} | ||||||
} | ||||||
// The previous cache expiration time is retained | ||||||
p.cache.Set(k, updatedOfferings, time.Duration(v.Expiration)) | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accessing
unavailableOfferings.SeqNum
non-atomically seems incorrect to me, but this is how it's done in the instancetype provider as well.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are you thinking we should do instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make
UnavailableOfferings.SeqNum
private and add a getter that usesatomic.LoadUint64
, and/or replace it with theatomic.Uint64
type which enforces atomic access.