Skip to content

Commit 290a7dc

Browse files
committed
review, consolidation updates, and testing
1 parent 94377e1 commit 290a7dc

File tree

10 files changed

+501
-126
lines changed

10 files changed

+501
-126
lines changed

pkg/cloudprovider/fake/cloudprovider.go

+26-15
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
func init() {
4444
v1.WellKnownLabels = v1.WellKnownLabels.Insert(v1alpha1.LabelReservationID)
4545
cloudprovider.ReservationIDLabel = v1alpha1.LabelReservationID
46+
cloudprovider.ReservedCapacityPriceFactor = 1.0 / 1_000_000.0
4647
}
4748

4849
var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)
@@ -109,6 +110,7 @@ func (c *CloudProvider) Reset() {
109110
}
110111
}
111112

113+
//nolint:gocyclo
112114
func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1.NodeClaim, error) {
113115
c.mu.Lock()
114116
defer c.mu.Unlock()
@@ -126,9 +128,16 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v
126128
reqs := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...)
127129
np := &v1.NodePool{ObjectMeta: metav1.ObjectMeta{Name: nodeClaim.Labels[v1.NodePoolLabelKey]}}
128130
instanceTypes := lo.Filter(lo.Must(c.GetInstanceTypes(ctx, np)), func(i *cloudprovider.InstanceType, _ int) bool {
129-
return reqs.IsCompatible(i.Requirements, scheduling.AllowUndefinedWellKnownLabels) &&
130-
i.Offerings.Available().HasCompatible(reqs) &&
131-
resources.Fits(nodeClaim.Spec.Resources.Requests, i.Allocatable())
131+
if !reqs.IsCompatible(i.Requirements, scheduling.AllowUndefinedWellKnownLabels) {
132+
return false
133+
}
134+
if !i.Offerings.Available().HasCompatible(reqs) {
135+
return false
136+
}
137+
if !resources.Fits(nodeClaim.Spec.Resources.Requests, i.Allocatable()) {
138+
return false
139+
}
140+
return true
132141
})
133142
// Order instance types so that we get the cheapest instance types of the available offerings
134143
sort.Slice(instanceTypes, func(i, j int) bool {
@@ -145,20 +154,22 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v
145154
}
146155
}
147156
// Find offering, prioritizing reserved instances
148-
offering := func() *cloudprovider.Offering {
149-
offerings := instanceType.Offerings.Available().Compatible(reqs)
150-
lo.Must0(len(offerings) != 0, "created nodeclaim with no available offerings")
151-
for _, o := range offerings {
152-
if o.CapacityType() == v1.CapacityTypeReserved {
153-
o.ReservationCapacity -= 1
154-
if o.ReservationCapacity == 0 {
155-
o.Available = false
156-
}
157-
return o
157+
var offering *cloudprovider.Offering
158+
offerings := instanceType.Offerings.Available().Compatible(reqs)
159+
lo.Must0(len(offerings) != 0, "created nodeclaim with no available offerings")
160+
for _, o := range offerings {
161+
if o.CapacityType() == v1.CapacityTypeReserved {
162+
o.ReservationCapacity -= 1
163+
if o.ReservationCapacity == 0 {
164+
o.Available = false
158165
}
166+
offering = o
167+
break
159168
}
160-
return offerings[0]
161-
}()
169+
}
170+
if offering == nil {
171+
offering = offerings[0]
172+
}
162173
// Propagate labels dictated by offering requirements - e.g. zone, capacity-type, and reservation-id
163174
for _, req := range offering.Requirements {
164175
labels[req.Key] = req.Any()

pkg/cloudprovider/types.go

+8-14
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ var (
4343
// reservation. For example, a reservation could be shared across multiple NodePools, and the value encoded in this
4444
// requirement is used to inform the scheduler that a reservation for one should affect the other.
4545
ReservationIDLabel string
46+
47+
// ReservedCapacityPriceFactor is a constant which should be applied when determining the cost of a reserved offering,
48+
// if unavailable from `GetInstanceTypes`.
49+
ReservedCapacityPriceFactor float64
4650
)
4751

4852
type DriftReason string
@@ -244,20 +248,6 @@ func (i InstanceTypeOverhead) Total() corev1.ResourceList {
244248
return resources.Merge(i.KubeReserved, i.SystemReserved, i.EvictionThreshold)
245249
}
246250

247-
// ReservationManager is used to track the availability of a reserved offering over the course of a scheduling
248-
// simulation. Reserved offerings may have a limited number of available instances associated with them,
249-
// This is exposed as an interface for cloudprovider's to implement to give flexibility when dealing with separate
250-
// offerings with associated availablility.
251-
type ReservationManager interface {
252-
// Reserve takes a unique identifier for a reservation, and returns a boolean indicating if the reservation was
253-
// successful. Reserve should be idempotent, i.e. multiple calls with the same reservation ID should only count for a
254-
// single reservation.
255-
Reserve(string) bool
256-
// Release takes a unique identifier for a reservation, and should discard any matching reservations. If no
257-
// reservations exist for the given id, release should be a no-op.
258-
Release(string)
259-
}
260-
261251
// An Offering describes where an InstanceType is available to be used, with the expectation that its properties
262252
// may be tightly coupled (e.g. the availability of an instance type in some zone is scoped to a capacity type) and
263253
// these properties are captured with labels in Requirements.
@@ -273,6 +263,10 @@ func (o *Offering) CapacityType() string {
273263
return o.Requirements.Get(v1.CapacityTypeLabelKey).Any()
274264
}
275265

266+
func (o *Offering) Zone() string {
267+
return o.Requirements.Get(corev1.LabelTopologyZone).Any()
268+
}
269+
276270
func (o *Offering) ReservationID() string {
277271
return o.Requirements.Get(ReservationIDLabel).Any()
278272
}

pkg/controllers/disruption/consolidation.go

+19-5
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,8 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
211211
// We are consolidating a node from OD -> [OD,Spot] but have filtered the instance types by cost based on the
212212
// assumption, that the spot variant will launch. We also need to add a requirement to the node to ensure that if
213213
// spot capacity is insufficient we don't replace the node with a more expensive on-demand node. Instead the launch
214-
// should fail and we'll just leave the node alone.
214+
// should fail and we'll just leave the node alone. We don't need to do the same for reserved since the requirements
215+
// are injected on by the scheduler.
215216
ctReq := results.NewNodeClaims[0].Requirements.Get(v1.CapacityTypeLabelKey)
216217
if ctReq.Has(v1.CapacityTypeSpot) && ctReq.Has(v1.CapacityTypeOnDemand) {
217218
results.NewNodeClaims[0].Requirements.Add(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeSpot))
@@ -307,11 +308,24 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
307308
func getCandidatePrices(candidates []*Candidate) (float64, error) {
308309
var price float64
309310
for _, c := range candidates {
310-
compatibleOfferings := c.instanceType.Offerings.Compatible(scheduling.NewLabelRequirements(c.StateNode.Labels()))
311-
if len(compatibleOfferings) == 0 {
312-
return 0.0, fmt.Errorf("unable to determine offering for %s/%s/%s", c.instanceType.Name, c.capacityType, c.zone)
311+
var compatibleOfferings cloudprovider.Offerings
312+
reservedFallback := false
313+
reqs := scheduling.NewLabelRequirements(c.StateNode.Labels())
314+
for {
315+
compatibleOfferings = c.instanceType.Offerings.Compatible(reqs)
316+
if len(compatibleOfferings) != 0 {
317+
break
318+
}
319+
if c.capacityType != v1.CapacityTypeReserved {
320+
return 0.0, fmt.Errorf("unable to determine offering for %s/%s/%s", c.instanceType.Name, c.capacityType, c.zone)
321+
}
322+
// If there are no compatible offerings, but the capacity type for the candidate is reserved, we can fall-back to
323+
// the on-demand offering to derive pricing.
324+
reqs[v1.CapacityTypeLabelKey] = scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeOnDemand)
325+
delete(reqs, cloudprovider.ReservationIDLabel)
326+
reservedFallback = true
313327
}
314-
price += compatibleOfferings.Cheapest().Price
328+
price += compatibleOfferings.Cheapest().Price * lo.Ternary(reservedFallback, cloudprovider.ReservedCapacityPriceFactor, 1.0)
315329
}
316330
return price, nil
317331
}

pkg/controllers/disruption/types.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -157,19 +157,22 @@ func (c Command) String() string {
157157
}
158158
odNodeClaims := 0
159159
spotNodeClaims := 0
160+
reservedNodeClaims := 0
160161
for _, nodeClaim := range c.replacements {
161162
ct := nodeClaim.Requirements.Get(v1.CapacityTypeLabelKey)
162-
if ct.Has(v1.CapacityTypeOnDemand) {
163+
switch {
164+
case ct.Has(v1.CapacityTypeOnDemand):
163165
odNodeClaims++
164-
}
165-
if ct.Has(v1.CapacityTypeSpot) {
166+
case ct.Has(v1.CapacityTypeSpot):
166167
spotNodeClaims++
168+
case ct.Has(v1.CapacityTypeReserved):
169+
reservedNodeClaims++
167170
}
168171
}
169172
// Print list of instance types for the first replacements.
170173
if len(c.replacements) > 1 {
171-
fmt.Fprintf(&buf, " and replacing with %d spot and %d on-demand, from types %s",
172-
spotNodeClaims, odNodeClaims,
174+
fmt.Fprintf(&buf, " and replacing with %d spot, %d on-demand, and %d reserved, from types %s",
175+
spotNodeClaims, odNodeClaims, reservedNodeClaims,
173176
scheduling.InstanceTypeList(c.replacements[0].InstanceTypeOptions))
174177
return buf.String()
175178
}

pkg/controllers/provisioning/provisioner.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
304304
return scheduler.Results{}, nil
305305
}
306306
log.FromContext(ctx).V(1).WithValues("pending-pods", len(pendingPods), "deleting-pods", len(deletingNodePods)).Info("computing scheduling decision for provisionable pod(s)")
307-
s, err := p.NewScheduler(ctx, pods, nodes.Active(), scheduler.DisableReservedFallback)
307+
s, err := p.NewScheduler(ctx, pods, nodes.Active(), scheduler.DisableReservedCapacityFallback)
308308
if err != nil {
309309
if errors.Is(err, ErrNodePoolsNotFound) {
310310
log.FromContext(ctx).Info("no nodepools found")
@@ -318,7 +318,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
318318
}
319319
results := s.Solve(ctx, pods).TruncateInstanceTypes(scheduler.MaxInstanceTypes)
320320
if len(results.ReservedOfferingErrors) != 0 {
321-
log.FromContext(ctx).V(1).WithValues("Pods", pretty.Slice(lo.Map(pods, func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5)).Info("deferring scheduling decision for provisionable pod(s) to future simulation due to limited reserved offering capacity")
321+
log.FromContext(ctx).V(1).WithValues("Pods", pretty.Slice(lo.Map(lo.Keys(results.ReservedOfferingErrors), func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5)).Info("deferring scheduling decision for provisionable pod(s) to future simulation due to limited reserved offering capacity")
322322
}
323323
scheduler.UnschedulablePodsCount.Set(float64(len(results.PodErrors)), map[string]string{scheduler.ControllerLabel: injection.GetControllerName(ctx)})
324324
if len(results.NewNodeClaims) > 0 {

pkg/controllers/provisioning/scheduling/nodeclaim.go

+31-28
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (n *NodeClaim) Add(pod *corev1.Pod, podData *PodData) error {
136136
return err
137137
}
138138

139-
reservedOfferings, offeringsToRelease, err := n.reserveOfferings(remaining, nodeClaimRequirements)
139+
reservedOfferings, err := n.reserveOfferings(remaining, nodeClaimRequirements)
140140
if err != nil {
141141
return err
142142
}
@@ -148,13 +148,29 @@ func (n *NodeClaim) Add(pod *corev1.Pod, podData *PodData) error {
148148
n.Requirements = nodeClaimRequirements
149149
n.topology.Record(pod, n.NodeClaim.Spec.Taints, nodeClaimRequirements, scheduling.AllowUndefinedWellKnownLabels)
150150
n.hostPortUsage.Add(pod, hostPorts)
151+
n.releaseReservedOfferings(n.reservedOfferings, reservedOfferings)
151152
n.reservedOfferings = reservedOfferings
152-
for _, o := range offeringsToRelease {
153-
n.reservationManager.Release(n.hostname, o)
154-
}
155153
return nil
156154
}
157155

156+
// releaseReservedOfferings releases all offerings which are present in the current reserved offerings, but are not
157+
// present in the updated reserved offerings.
158+
func (n *NodeClaim) releaseReservedOfferings(current, updated map[string]cloudprovider.Offerings) {
159+
updatedIDs := sets.New[string]()
160+
for _, ofs := range updated {
161+
for _, o := range ofs {
162+
updatedIDs.Insert(o.ReservationID())
163+
}
164+
}
165+
for _, ofs := range current {
166+
for _, o := range ofs {
167+
if !updatedIDs.Has(o.ReservationID()) {
168+
n.reservationManager.Release(n.hostname, o)
169+
}
170+
}
171+
}
172+
}
173+
158174
// reserveOfferings handles the reservation of `karpenter.sh/capacity-type: reserved` offerings, returning the reserved
159175
// offerings in a map from instance type name to offerings. Additionally, a slice of offerings to release is returned.
160176
// This is based on the previously reserved offerings which are no longer compatible with the nodeclaim. These should
@@ -163,19 +179,17 @@ func (n *NodeClaim) Add(pod *corev1.Pod, podData *PodData) error {
163179
func (n *NodeClaim) reserveOfferings(
164180
instanceTypes []*cloudprovider.InstanceType,
165181
nodeClaimRequirements scheduling.Requirements,
166-
) (reservedOfferings map[string]cloudprovider.Offerings, offeringsToRelease []*cloudprovider.Offering, err error) {
167-
compatibleReservedInstanceTypes := sets.New[string]()
168-
reservedOfferings = map[string]cloudprovider.Offerings{}
182+
) (map[string]cloudprovider.Offerings, error) {
183+
hasCompatibleOffering := false
184+
reservedOfferings := map[string]cloudprovider.Offerings{}
169185
for _, it := range instanceTypes {
170-
hasCompatibleOffering := false
171186
for _, o := range it.Offerings {
172187
if o.CapacityType() != v1.CapacityTypeReserved || !o.Available {
173188
continue
174189
}
175190
// Track every incompatible reserved offering for release. Since releasing a reservation is a no-op when there is no
176191
// reservation for the given host, there's no need to check that a reservation actually exists for the offering.
177192
if !nodeClaimRequirements.IsCompatible(o.Requirements, scheduling.AllowUndefinedWellKnownLabels) {
178-
offeringsToRelease = append(offeringsToRelease, o)
179193
continue
180194
}
181195
hasCompatibleOffering = true
@@ -186,35 +200,24 @@ func (n *NodeClaim) reserveOfferings(
186200
reservedOfferings[it.Name] = append(reservedOfferings[it.Name], o)
187201
}
188202
}
189-
if hasCompatibleOffering {
190-
compatibleReservedInstanceTypes.Insert(it.Name)
191-
}
192203
}
193204

194205
if n.reservedOfferingMode == ReservedOfferingModeStrict {
195206
// If an instance type with a compatible reserved offering exists, but we failed to make any reservations, we should
196-
// fail. We include this check due to the pessimistic nature of instance reservation - we have to reserve each potential
197-
// offering for every nodeclaim, since we don't know what we'll launch with. Without this check we would fall back to
198-
// on-demand / spot even when there's sufficient reserved capacity. This check means we may fail to schedule the pod
199-
// during this scheduling simulation, but with the possibility of success on subsequent simulations.
200-
// Note: while this can occur both on initial creation and on
201-
if len(compatibleReservedInstanceTypes) != 0 && len(reservedOfferings) == 0 {
202-
return nil, nil, NewReservedOfferingError(fmt.Errorf("one or more instance types with compatible reserved offerings are available, but could not be reserved"))
207+
// fail. This could occur when all of the capacity for compatible instances has been reserved by previously created
208+
// nodeclaims. Since we reserve offering pessimistically, i.e. we will reserve any offering that the instance could
209+
// be launched with, we should fall back and attempt to schedule this pod in a subsequent scheduling simulation once
210+
// reservation capacity is available again.
211+
if hasCompatibleOffering && len(reservedOfferings) == 0 {
212+
return nil, NewReservedOfferingError(fmt.Errorf("one or more instance types with compatible reserved offerings are available, but could not be reserved"))
203213
}
204214
// If the nodeclaim previously had compatible reserved offerings, but the additional requirements filtered those out,
205215
// we should fail to add the pod to this nodeclaim.
206216
if len(n.reservedOfferings) != 0 && len(reservedOfferings) == 0 {
207-
return nil, nil, NewReservedOfferingError(fmt.Errorf("satisfying updated nodeclaim constraints would remove all compatible reserved offering options"))
208-
}
209-
}
210-
// Ensure we release any offerings for instance types which are no longer compatible with nodeClaimRequirements
211-
for instanceName, offerings := range n.reservedOfferings {
212-
if compatibleReservedInstanceTypes.Has(instanceName) {
213-
continue
217+
return nil, NewReservedOfferingError(fmt.Errorf("satisfying updated nodeclaim constraints would remove all compatible reserved offering options"))
214218
}
215-
offeringsToRelease = append(offeringsToRelease, offerings...)
216219
}
217-
return reservedOfferings, offeringsToRelease, nil
220+
return reservedOfferings, nil
218221
}
219222

220223
func (n *NodeClaim) Destroy() {

0 commit comments

Comments
 (0)