Skip to content

Commit 4c25410

Browse files
committed
final final review feedback
1 parent 55b41ce commit 4c25410

File tree

9 files changed

+118
-65
lines changed

9 files changed

+118
-65
lines changed

designs/capacity-reservations.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ Karpenter doesn't currently support reasoning about this capacity type. Karpente
2222
3. Karpenter should add logic to its scheduler to reason about this availability as an `int` -- ensuring that the scheduler never schedules more offerings of an instance type for a capacity type than are available
2323
4. Karpenter should extend its CloudProvider [InstanceType](https://github.com/kubernetes-sigs/karpenter/blob/35d6197e38e64cd6abfef71a082aee80e38d09fd/pkg/cloudprovider/types.go#L75) struct to allow offerings to represent availability of an offering as an `int` rather than a `bool` -- allowing Cloud Providers to represent the constrained capacity of `reserved`
2424
5. Karpenter should consolidate between `on-demand` and/or `spot` instance types to `reserved` when the capacity type is available
25-
6. Karpenter should introduce a feature flag `FEATURE_FLAG=CapacityReservations` to gate this new feature in `ALPHA` when it's introduced
25+
6. Karpenter should introduce a feature flag `FEATURE_FLAG=ReservedCapacity` to gate this new feature in `ALPHA` when it's introduced
2626

2727
### `karpenter.sh/capacity-type` API
2828

2929
_Note: Some excerpts taken from [`aws/karpenter-provider-aws` RFC](https://github.com/aws/karpenter-provider-aws/blob/main/designs/odcr.md#nodepool-api)._
3030

31-
This RFC proposes the addition of a new `karpenter.sh/capacity-type` label value, called `reserved`. A cluster admin could then select to support only launching reserved node capacity and falling back between reserved capacity to on-demand (or even spot) capacity respectively.
31+
This RFC proposes the addition of a new `karpenter.sh/capacity-type` label value, called `reserved`. A cluster admin could then select to support only launching reserved node capacity and falling back between reserved capacity to on-demand (or even spot) capacity respectively.
3232

3333
_Note: This option requires any applications (pods) that are using node selection on `karpenter.sh/capacity-type: "on-demand"` to expand their selection to include `reserved` or to update it to perform a `NotIn` node affinity on `karpenter.sh/capacity-type: spot`_
3434

@@ -140,4 +140,4 @@ In practice, this means that if a user has two capacity reservation offerings av
140140

141141
## Appendix
142142

143-
1. AWS Cloud Provider's RFC for On-Demand Capacity Reservations: https://github.com/aws/karpenter-provider-aws/blob/main/designs/odcr.md
143+
1. AWS Cloud Provider's RFC for On-Demand Capacity Reservations: https://github.com/aws/karpenter-provider-aws/blob/main/designs/odcr.md

pkg/controllers/disruption/consolidation.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,16 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
308308
func getCandidatePrices(candidates []*Candidate) (float64, error) {
309309
var price float64
310310
for _, c := range candidates {
311-
compatibleOfferings := c.instanceType.Offerings.Compatible(scheduling.NewLabelRequirements(c.StateNode.Labels()))
311+
reqs := scheduling.NewLabelRequirements(c.StateNode.Labels())
312+
compatibleOfferings := c.instanceType.Offerings.Compatible(reqs)
312313
if len(compatibleOfferings) == 0 {
314+
// It's expected that offerings may no longer exist for capacity reservations once a NodeClass stops selecting on
315+
// them (or they are no longer considered for some other reason on by the cloudprovider). By definition though,
316+
// reserved capacity is free. By modeling it as free, consolidation won't be able to succeed, but the node should be
317+
// disrupted via drift regardless.
318+
if reqs.Get(v1.CapacityTypeLabelKey).Has(v1.CapacityTypeReserved) {
319+
return 0.0, nil
320+
}
313321
return 0.0, fmt.Errorf("unable to determine offering for %s/%s/%s", c.instanceType.Name, c.capacityType, c.zone)
314322
}
315323
price += compatibleOfferings.Cheapest().Price

pkg/controllers/disruption/consolidation_test.go

+88-44
Original file line numberDiff line numberDiff line change
@@ -4415,40 +4415,90 @@ var _ = Describe("Consolidation", func() {
44154415
},
44164416
})
44174417
reservedNodeClaim.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable)
4418-
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{CapacityReservations: lo.ToPtr(true)}}))
4418+
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{ReservedCapacity: lo.ToPtr(true)}}))
4419+
})
4420+
It("can consolidate from one reserved offering to another", func() {
4421+
leastExpensiveReservationID := fmt.Sprintf("r-%s", leastExpensiveInstance.Name)
4422+
leastExpensiveInstance.Requirements.Add(scheduling.NewRequirement(
4423+
cloudprovider.ReservationIDLabel,
4424+
corev1.NodeSelectorOpIn,
4425+
leastExpensiveReservationID,
4426+
))
4427+
leastExpensiveInstance.Requirements.Get(v1.CapacityTypeLabelKey).Insert(v1.CapacityTypeReserved)
4428+
leastExpensiveInstance.Offerings = append(leastExpensiveInstance.Offerings, &cloudprovider.Offering{
4429+
Price: leastExpensiveOffering.Price / 1_000_000.0,
4430+
Available: true,
4431+
ReservationCapacity: 10,
4432+
Requirements: scheduling.NewLabelRequirements(map[string]string{
4433+
v1.CapacityTypeLabelKey: v1.CapacityTypeReserved,
4434+
corev1.LabelTopologyZone: leastExpensiveOffering.Zone(),
4435+
v1alpha1.LabelReservationID: leastExpensiveReservationID,
4436+
}),
4437+
})
4438+
4439+
// create our RS so we can link a pod to it
4440+
rs := test.ReplicaSet()
4441+
ExpectApplied(ctx, env.Client, rs)
4442+
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())
4443+
4444+
pod := test.Pod(test.PodOptions{ObjectMeta: metav1.ObjectMeta{
4445+
Labels: labels,
4446+
OwnerReferences: []metav1.OwnerReference{
4447+
{
4448+
APIVersion: "apps/v1",
4449+
Kind: "ReplicaSet",
4450+
Name: rs.Name,
4451+
UID: rs.UID,
4452+
Controller: lo.ToPtr(true),
4453+
BlockOwnerDeletion: lo.ToPtr(true),
4454+
},
4455+
},
4456+
}})
4457+
ExpectApplied(ctx, env.Client, rs, pod, reservedNode, reservedNodeClaim, nodePool)
4458+
4459+
// bind pods to node
4460+
ExpectManualBinding(ctx, env.Client, pod, reservedNode)
4461+
4462+
// inform cluster state about nodes and nodeClaims
4463+
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{reservedNode}, []*v1.NodeClaim{reservedNodeClaim})
4464+
4465+
fakeClock.Step(10 * time.Minute)
4466+
4467+
// consolidation won't delete the old nodeclaim until the new nodeclaim is ready
4468+
var wg sync.WaitGroup
4469+
ExpectToWait(fakeClock, &wg)
4470+
ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1)
4471+
ExpectSingletonReconciled(ctx, disruptionController)
4472+
wg.Wait()
4473+
4474+
// Process the item so that the nodes can be deleted.
4475+
ExpectSingletonReconciled(ctx, queue)
4476+
4477+
// Cascade any deletion of the nodeclaim to the node
4478+
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, reservedNodeClaim)
4479+
4480+
// should create a new nodeclaim as there is a cheaper one that can hold the pod
4481+
nodeClaims := ExpectNodeClaims(ctx, env.Client)
4482+
nodes := ExpectNodes(ctx, env.Client)
4483+
Expect(nodeClaims).To(HaveLen(1))
4484+
Expect(nodes).To(HaveLen(1))
4485+
4486+
Expect(nodeClaims[0].Name).ToNot(Equal(reservedNodeClaim.Name))
4487+
4488+
// We should have consolidated into the same instance type, just into reserved.
4489+
Expect(nodes[0].Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, leastExpensiveInstance.Name))
4490+
Expect(nodes[0].Labels).To(HaveKeyWithValue(v1.CapacityTypeLabelKey, v1.CapacityTypeReserved))
4491+
Expect(nodes[0].Labels).To(HaveKeyWithValue(cloudprovider.ReservationIDLabel, leastExpensiveReservationID))
4492+
4493+
// and delete the old one
4494+
ExpectNotFound(ctx, env.Client, reservedNodeClaim, reservedNode)
44194495
})
44204496
DescribeTable(
4421-
"can replace node",
4497+
"can consolidate into reserved capacity for the same instance pool",
44224498
func(initialCapacityType string) {
4423-
nodeClaim = lo.Switch[string, *v1.NodeClaim](initialCapacityType).
4424-
Case(v1.CapacityTypeOnDemand, nodeClaim).
4425-
Case(v1.CapacityTypeSpot, spotNodeClaim).
4426-
Default(reservedNodeClaim)
4427-
node = lo.Switch[string, *corev1.Node](initialCapacityType).
4428-
Case(v1.CapacityTypeOnDemand, node).
4429-
Case(v1.CapacityTypeSpot, spotNode).
4430-
Default(reservedNode)
4431-
4432-
// If the capacity type is reserved, we will need a cheaper reserved instance to consolidat into
4433-
var leastExpensiveReservationID string
4434-
if initialCapacityType == v1.CapacityTypeReserved {
4435-
leastExpensiveReservationID = fmt.Sprintf("r-%s", leastExpensiveInstance.Name)
4436-
leastExpensiveInstance.Requirements.Add(scheduling.NewRequirement(
4437-
cloudprovider.ReservationIDLabel,
4438-
corev1.NodeSelectorOpIn,
4439-
leastExpensiveReservationID,
4440-
))
4441-
leastExpensiveInstance.Requirements.Get(v1.CapacityTypeLabelKey).Insert(v1.CapacityTypeReserved)
4442-
leastExpensiveInstance.Offerings = append(leastExpensiveInstance.Offerings, &cloudprovider.Offering{
4443-
Price: leastExpensiveOffering.Price / 1_000_000.0,
4444-
Available: true,
4445-
ReservationCapacity: 10,
4446-
Requirements: scheduling.NewLabelRequirements(map[string]string{
4447-
v1.CapacityTypeLabelKey: v1.CapacityTypeReserved,
4448-
corev1.LabelTopologyZone: leastExpensiveOffering.Zone(),
4449-
v1alpha1.LabelReservationID: leastExpensiveReservationID,
4450-
}),
4451-
})
4499+
if initialCapacityType == v1.CapacityTypeSpot {
4500+
nodeClaim = spotNodeClaim
4501+
node = spotNode
44524502
}
44534503

44544504
// create our RS so we can link a pod to it
@@ -4499,23 +4549,17 @@ var _ = Describe("Consolidation", func() {
44994549
Expect(nodes).To(HaveLen(1))
45004550

45014551
Expect(nodeClaims[0].Name).ToNot(Equal(nodeClaim.Name))
4502-
// If the original capacity type was OD or spot, we should be able to consolidate into the reserved offering of the
4503-
// same type.
4504-
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Has(corev1.LabelInstanceTypeStable)).To(BeTrue())
4505-
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Get(corev1.LabelInstanceTypeStable).Has(mostExpensiveInstance.Name)).To(Equal(initialCapacityType != v1.CapacityTypeReserved))
4506-
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Has(cloudprovider.ReservationIDLabel)).To(BeTrue())
4507-
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Get(cloudprovider.ReservationIDLabel).Any()).To(Equal(lo.Ternary(
4508-
initialCapacityType == v1.CapacityTypeReserved,
4509-
leastExpensiveReservationID,
4510-
mostExpensiveReservationID,
4511-
)))
4552+
4553+
// We should have consolidated into the same instance type, just into reserved.
4554+
Expect(nodes[0].Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, mostExpensiveInstance.Name))
4555+
Expect(nodes[0].Labels).To(HaveKeyWithValue(v1.CapacityTypeLabelKey, v1.CapacityTypeReserved))
4556+
Expect(nodes[0].Labels).To(HaveKeyWithValue(cloudprovider.ReservationIDLabel, mostExpensiveReservationID))
45124557

45134558
// and delete the old one
45144559
ExpectNotFound(ctx, env.Client, nodeClaim, node)
45154560
},
4516-
Entry("on-demand", v1.CapacityTypeOnDemand),
4517-
Entry("spot", v1.CapacityTypeSpot),
4518-
Entry("reserved", v1.CapacityTypeReserved),
4561+
Entry("from on-demand", v1.CapacityTypeOnDemand),
4562+
Entry("from spot", v1.CapacityTypeSpot),
45194563
)
45204564
})
45214565
})

pkg/controllers/nodeclaim/disruption/drift_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ var _ = Describe("Drift", func() {
556556
}))
557557
}
558558

559-
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{CapacityReservations: lo.ToPtr(true)}}))
559+
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{ReservedCapacity: lo.ToPtr(true)}}))
560560
})
561561
// This is required to support cloudproviders dynamically updating the capacity type based on reservation expirations
562562
It("should drift reserved nodeclaim if the capacity type label has been updated", func() {

pkg/controllers/provisioning/scheduling/nodeclaim.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func (n *NodeClaim) reserveOfferings(
188188
instanceTypes []*cloudprovider.InstanceType,
189189
nodeClaimRequirements scheduling.Requirements,
190190
) (cloudprovider.Offerings, error) {
191-
if !opts.FromContext(ctx).FeatureGates.CapacityReservations {
191+
if !opts.FromContext(ctx).FeatureGates.ReservedCapacity {
192192
return nil, nil
193193
}
194194

pkg/controllers/provisioning/scheduling/suite_test.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -3819,7 +3819,7 @@ var _ = Context("Scheduling", func() {
38193819
Price: fake.PriceFromResources(it.Capacity) / 100_000.0,
38203820
})
38213821
}
3822-
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{CapacityReservations: lo.ToPtr(true)}}))
3822+
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{ReservedCapacity: lo.ToPtr(true)}}))
38233823
})
38243824
It("shouldn't fallback to on-demand or spot when compatible reserved offerings are available", func() {
38253825
// With the pessimistic nature of scheduling reservations, we'll only be able to provision one instance per loop if a
@@ -3864,6 +3864,8 @@ var _ = Context("Scheduling", func() {
38643864
return bindings.Get(p) == nil
38653865
})
38663866

3867+
// Finally, we schedule the final pod. Since both capacity reservations are now exhausted and their offerings are
3868+
// marked as unavailable, we will fall back to either OD or spot.
38673869
bindings = ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...)
38683870
Expect(len(bindings)).To(Equal(1))
38693871
node = lo.Values(bindings)[0].Node
@@ -3987,9 +3989,8 @@ var _ = Context("Scheduling", func() {
39873989
})
39883990
})
39893991

3990-
// Even though the pods schedule to separate NodePools, those NodePools share a capacity reservation for the
3991-
// selected instance type. Karpenter should successfully provision a reserved instance for one pod, but fail
3992-
// to provision anything for the second since it won't fallback to OD or spot.
3992+
// Since each pod can only schedule to one of the NodePools, and each NodePool has a distinct capacity reservation,
3993+
// we should be able to schedule both pods simultaneously despite them selecting on the same instance pool.
39933994
bindings := lo.Values(ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...))
39943995
Expect(len(bindings)).To(Equal(2))
39953996
for _, binding := range bindings {

pkg/operator/options/options.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ type optionsKey struct{}
4141
type FeatureGates struct {
4242
inputStr string
4343

44-
SpotToSpotConsolidation bool
4544
NodeRepair bool
46-
CapacityReservations bool
45+
ReservedCapacity bool
46+
SpotToSpotConsolidation bool
4747
}
4848

4949
// Options contains all CLI flags / env vars for karpenter-core. It adheres to the options.Injectable interface.
@@ -99,7 +99,7 @@ func (o *Options) AddFlags(fs *FlagSet) {
9999
fs.StringVar(&o.LogErrorOutputPaths, "log-error-output-paths", env.WithDefaultString("LOG_ERROR_OUTPUT_PATHS", "stderr"), "Optional comma separated paths for logging error output")
100100
fs.DurationVar(&o.BatchMaxDuration, "batch-max-duration", env.WithDefaultDuration("BATCH_MAX_DURATION", 10*time.Second), "The maximum length of a batch window. The longer this is, the more pods we can consider for provisioning at one time which usually results in fewer but larger nodes.")
101101
fs.DurationVar(&o.BatchIdleDuration, "batch-idle-duration", env.WithDefaultDuration("BATCH_IDLE_DURATION", time.Second), "The maximum amount of time with no new pending pods that if exceeded ends the current batching window. If pods arrive faster than this time, the batching window will be extended up to the maxDuration. If they arrive slower, the pods will be batched separately.")
102-
fs.StringVar(&o.FeatureGates.inputStr, "feature-gates", env.WithDefaultString("FEATURE_GATES", "CapacityReservations=false,NodeRepair=false,SpotToSpotConsolidation=false"), "Optional features can be enabled / disabled using feature gates. Current options are: CapacityReservations, NodeRepair, and SpotToSpotConsolidation")
102+
fs.StringVar(&o.FeatureGates.inputStr, "feature-gates", env.WithDefaultString("FEATURE_GATES", "NodeRepair=false,ReservedCapacity=false,SpotToSpotConsolidation=false"), "Optional features can be enabled / disabled using feature gates. Current options are: NodeRepair, ReservedCapacity, and SpotToSpotConsolidation")
103103
}
104104

105105
func (o *Options) Parse(fs *FlagSet, args ...string) error {
@@ -141,7 +141,7 @@ func ParseFeatureGates(gateStr string) (FeatureGates, error) {
141141
gates.SpotToSpotConsolidation = val
142142
}
143143
if val, ok := gateMap["CapacityReservations"]; ok {
144-
gates.CapacityReservations = val
144+
gates.ReservedCapacity = val
145145
}
146146

147147
return gates, nil

pkg/operator/options/suite_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ var _ = Describe("Options", func() {
111111
BatchMaxDuration: lo.ToPtr(10 * time.Second),
112112
BatchIdleDuration: lo.ToPtr(time.Second),
113113
FeatureGates: test.FeatureGates{
114-
CapacityReservations: lo.ToPtr(false),
114+
ReservedCapacity: lo.ToPtr(false),
115115
NodeRepair: lo.ToPtr(false),
116116
SpotToSpotConsolidation: lo.ToPtr(false),
117117
},
@@ -158,7 +158,7 @@ var _ = Describe("Options", func() {
158158
BatchMaxDuration: lo.ToPtr(5 * time.Second),
159159
BatchIdleDuration: lo.ToPtr(5 * time.Second),
160160
FeatureGates: test.FeatureGates{
161-
CapacityReservations: lo.ToPtr(true),
161+
ReservedCapacity: lo.ToPtr(true),
162162
NodeRepair: lo.ToPtr(true),
163163
SpotToSpotConsolidation: lo.ToPtr(true),
164164
},
@@ -205,7 +205,7 @@ var _ = Describe("Options", func() {
205205
BatchMaxDuration: lo.ToPtr(5 * time.Second),
206206
BatchIdleDuration: lo.ToPtr(5 * time.Second),
207207
FeatureGates: test.FeatureGates{
208-
CapacityReservations: lo.ToPtr(true),
208+
ReservedCapacity: lo.ToPtr(true),
209209
NodeRepair: lo.ToPtr(true),
210210
SpotToSpotConsolidation: lo.ToPtr(true),
211211
},
@@ -252,7 +252,7 @@ var _ = Describe("Options", func() {
252252
BatchMaxDuration: lo.ToPtr(5 * time.Second),
253253
BatchIdleDuration: lo.ToPtr(5 * time.Second),
254254
FeatureGates: test.FeatureGates{
255-
CapacityReservations: lo.ToPtr(true),
255+
ReservedCapacity: lo.ToPtr(true),
256256
NodeRepair: lo.ToPtr(true),
257257
SpotToSpotConsolidation: lo.ToPtr(true),
258258
},
@@ -308,7 +308,7 @@ func expectOptionsMatch(optsA, optsB *options.Options) {
308308
Expect(optsA.LogErrorOutputPaths).To(Equal(optsB.LogErrorOutputPaths))
309309
Expect(optsA.BatchMaxDuration).To(Equal(optsB.BatchMaxDuration))
310310
Expect(optsA.BatchIdleDuration).To(Equal(optsB.BatchIdleDuration))
311-
Expect(optsA.FeatureGates.CapacityReservations).To(Equal(optsB.FeatureGates.CapacityReservations))
311+
Expect(optsA.FeatureGates.ReservedCapacity).To(Equal(optsB.FeatureGates.ReservedCapacity))
312312
Expect(optsA.FeatureGates.NodeRepair).To(Equal(optsB.FeatureGates.NodeRepair))
313313
Expect(optsA.FeatureGates.SpotToSpotConsolidation).To(Equal(optsB.FeatureGates.SpotToSpotConsolidation))
314314
}

pkg/test/options.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ type OptionsFields struct {
4747
}
4848

4949
type FeatureGates struct {
50-
CapacityReservations *bool
5150
NodeRepair *bool
51+
ReservedCapacity *bool
5252
SpotToSpotConsolidation *bool
5353
}
5454

@@ -75,8 +75,8 @@ func Options(overrides ...OptionsFields) *options.Options {
7575
BatchMaxDuration: lo.FromPtrOr(opts.BatchMaxDuration, 10*time.Second),
7676
BatchIdleDuration: lo.FromPtrOr(opts.BatchIdleDuration, time.Second),
7777
FeatureGates: options.FeatureGates{
78-
CapacityReservations: lo.FromPtrOr(opts.FeatureGates.CapacityReservations, false),
7978
NodeRepair: lo.FromPtrOr(opts.FeatureGates.NodeRepair, false),
79+
ReservedCapacity: lo.FromPtrOr(opts.FeatureGates.ReservedCapacity, false),
8080
SpotToSpotConsolidation: lo.FromPtrOr(opts.FeatureGates.SpotToSpotConsolidation, false),
8181
},
8282
}

0 commit comments

Comments
 (0)