Skip to content

Commit c07b539

Browse files
committed
remaining updates
1 parent 290a7dc commit c07b539

File tree

12 files changed

+339
-106
lines changed

12 files changed

+339
-106
lines changed

pkg/cloudprovider/fake/cloudprovider.go

-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ import (
4343
func init() {
4444
v1.WellKnownLabels = v1.WellKnownLabels.Insert(v1alpha1.LabelReservationID)
4545
cloudprovider.ReservationIDLabel = v1alpha1.LabelReservationID
46-
cloudprovider.ReservedCapacityPriceFactor = 1.0 / 1_000_000.0
4746
}
4847

4948
var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)

pkg/cloudprovider/types.go

+11-18
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,12 @@ import (
3838
var (
3939
SpotRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeSpot))
4040
OnDemandRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeOnDemand))
41+
ReservedRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeReserved))
4142

4243
// ReservationIDLabel is a label injected into a reserved offering's requirements which is used to uniquely identify a
4344
// reservation. For example, a reservation could be shared across multiple NodePools, and the value encoded in this
4445
// requirement is used to inform the scheduler that a reservation for one should affect the other.
4546
ReservationIDLabel string
46-
47-
// ReservedCapacityPriceFactor is a constant which should be applied when determining the cost of a reserved offering,
48-
// if unavailable from `GetInstanceTypes`.
49-
ReservedCapacityPriceFactor float64
5047
)
5148

5249
type DriftReason string
@@ -311,21 +308,17 @@ func (ofs Offerings) MostExpensive() *Offering {
311308
})
312309
}
313310

314-
// WorstLaunchPrice gets the worst-case launch price from the offerings that are offered
315-
// on an instance type. If the instance type has a spot offering available, then it uses the spot offering
316-
// to get the launch price; else, it uses the on-demand launch price
311+
// WorstLaunchPrice gets the worst-case launch price from the offerings that are offered on an instance type. Only
312+
// offerings for the capacity type we will launch with are considered. The following precedence order is used to
313+
// determine which capacity type is used: reserved, spot, on-demand.
317314
func (ofs Offerings) WorstLaunchPrice(reqs scheduling.Requirements) float64 {
318-
// We prefer to launch spot offerings, so we will get the worst price based on the node requirements
319-
if reqs.Get(v1.CapacityTypeLabelKey).Has(v1.CapacityTypeSpot) {
320-
spotOfferings := ofs.Compatible(reqs).Compatible(SpotRequirement)
321-
if len(spotOfferings) > 0 {
322-
return spotOfferings.MostExpensive().Price
323-
}
324-
}
325-
if reqs.Get(v1.CapacityTypeLabelKey).Has(v1.CapacityTypeOnDemand) {
326-
onDemandOfferings := ofs.Compatible(reqs).Compatible(OnDemandRequirement)
327-
if len(onDemandOfferings) > 0 {
328-
return onDemandOfferings.MostExpensive().Price
315+
for _, ctReqs := range []scheduling.Requirements{
316+
ReservedRequirement,
317+
SpotRequirement,
318+
OnDemandRequirement,
319+
} {
320+
if compatOfs := ofs.Compatible(reqs).Compatible(ctReqs); len(compatOfs) != 0 {
321+
return compatOfs.MostExpensive().Price
329322
}
330323
}
331324
return math.MaxFloat64

pkg/controllers/disruption/consolidation.go

+4-17
Original file line numberDiff line numberDiff line change
@@ -308,24 +308,11 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
308308
func getCandidatePrices(candidates []*Candidate) (float64, error) {
309309
var price float64
310310
for _, c := range candidates {
311-
var compatibleOfferings cloudprovider.Offerings
312-
reservedFallback := false
313-
reqs := scheduling.NewLabelRequirements(c.StateNode.Labels())
314-
for {
315-
compatibleOfferings = c.instanceType.Offerings.Compatible(reqs)
316-
if len(compatibleOfferings) != 0 {
317-
break
318-
}
319-
if c.capacityType != v1.CapacityTypeReserved {
320-
return 0.0, fmt.Errorf("unable to determine offering for %s/%s/%s", c.instanceType.Name, c.capacityType, c.zone)
321-
}
322-
// If there are no compatible offerings, but the capacity type for the candidate is reserved, we can fall-back to
323-
// the on-demand offering to derive pricing.
324-
reqs[v1.CapacityTypeLabelKey] = scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeOnDemand)
325-
delete(reqs, cloudprovider.ReservationIDLabel)
326-
reservedFallback = true
311+
compatibleOfferings := c.instanceType.Offerings.Compatible(scheduling.NewLabelRequirements(c.StateNode.Labels()))
312+
if len(compatibleOfferings) == 0 {
313+
return 0.0, fmt.Errorf("unable to determine offering for %s/%s/%s", c.instanceType.Name, c.capacityType, c.zone)
327314
}
328-
price += compatibleOfferings.Cheapest().Price * lo.Ternary(reservedFallback, cloudprovider.ReservedCapacityPriceFactor, 1.0)
315+
price += compatibleOfferings.Cheapest().Price
329316
}
330317
return price, nil
331318
}

pkg/controllers/disruption/consolidation_test.go

+139
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"sigs.k8s.io/karpenter/pkg/scheduling"
4848
"sigs.k8s.io/karpenter/pkg/test"
4949
. "sigs.k8s.io/karpenter/pkg/test/expectations"
50+
"sigs.k8s.io/karpenter/pkg/test/v1alpha1"
5051
)
5152

5253
var _ = Describe("Consolidation", func() {
@@ -4379,4 +4380,142 @@ var _ = Describe("Consolidation", func() {
43794380
Expect(result.RequeueAfter).To(BeNumerically(">", 0))
43804381
})
43814382
})
4383+
Context("Reserved Capacity", func() {
4384+
var reservedNodeClaim *v1.NodeClaim
4385+
var reservedNode *corev1.Node
4386+
var mostExpensiveReservationID string
4387+
4388+
BeforeEach(func() {
4389+
mostExpensiveReservationID = fmt.Sprintf("r-%s", mostExpensiveInstance.Name)
4390+
mostExpensiveInstance.Requirements.Add(scheduling.NewRequirement(
4391+
cloudprovider.ReservationIDLabel,
4392+
corev1.NodeSelectorOpIn,
4393+
mostExpensiveReservationID,
4394+
))
4395+
mostExpensiveInstance.Requirements.Get(v1.CapacityTypeLabelKey).Insert(v1.CapacityTypeReserved)
4396+
mostExpensiveInstance.Offerings = append(mostExpensiveInstance.Offerings, &cloudprovider.Offering{
4397+
Price: mostExpensiveOffering.Price / 1_000_000.0,
4398+
Available: true,
4399+
ReservationCapacity: 10,
4400+
Requirements: scheduling.NewLabelRequirements(map[string]string{
4401+
v1.CapacityTypeLabelKey: v1.CapacityTypeReserved,
4402+
corev1.LabelTopologyZone: mostExpensiveOffering.Zone(),
4403+
v1alpha1.LabelReservationID: mostExpensiveReservationID,
4404+
}),
4405+
})
4406+
reservedNodeClaim, reservedNode = test.NodeClaimAndNode(v1.NodeClaim{
4407+
ObjectMeta: metav1.ObjectMeta{
4408+
Labels: map[string]string{
4409+
v1.NodePoolLabelKey: nodePool.Name,
4410+
corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
4411+
v1.CapacityTypeLabelKey: v1.CapacityTypeReserved,
4412+
corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
4413+
cloudprovider.ReservationIDLabel: mostExpensiveReservationID,
4414+
},
4415+
},
4416+
})
4417+
reservedNodeClaim.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable)
4418+
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{CapacityReservations: lo.ToPtr(true)}}))
4419+
})
4420+
DescribeTable(
4421+
"can replace node",
4422+
func(initialCapacityType string) {
4423+
nodeClaim = lo.Switch[string, *v1.NodeClaim](initialCapacityType).
4424+
Case(v1.CapacityTypeOnDemand, nodeClaim).
4425+
Case(v1.CapacityTypeSpot, spotNodeClaim).
4426+
Default(reservedNodeClaim)
4427+
node = lo.Switch[string, *corev1.Node](initialCapacityType).
4428+
Case(v1.CapacityTypeOnDemand, node).
4429+
Case(v1.CapacityTypeSpot, spotNode).
4430+
Default(reservedNode)
4431+
4432+
// If the capacity type is reserved, we will need a cheaper reserved instance to consolidat into
4433+
var leastExpensiveReservationID string
4434+
if initialCapacityType == v1.CapacityTypeReserved {
4435+
leastExpensiveReservationID = fmt.Sprintf("r-%s", leastExpensiveInstance.Name)
4436+
leastExpensiveInstance.Requirements.Add(scheduling.NewRequirement(
4437+
cloudprovider.ReservationIDLabel,
4438+
corev1.NodeSelectorOpIn,
4439+
leastExpensiveReservationID,
4440+
))
4441+
leastExpensiveInstance.Requirements.Get(v1.CapacityTypeLabelKey).Insert(v1.CapacityTypeReserved)
4442+
leastExpensiveInstance.Offerings = append(leastExpensiveInstance.Offerings, &cloudprovider.Offering{
4443+
Price: leastExpensiveOffering.Price / 1_000_000.0,
4444+
Available: true,
4445+
ReservationCapacity: 10,
4446+
Requirements: scheduling.NewLabelRequirements(map[string]string{
4447+
v1.CapacityTypeLabelKey: v1.CapacityTypeReserved,
4448+
corev1.LabelTopologyZone: leastExpensiveOffering.Zone(),
4449+
v1alpha1.LabelReservationID: leastExpensiveReservationID,
4450+
}),
4451+
})
4452+
}
4453+
4454+
// create our RS so we can link a pod to it
4455+
rs := test.ReplicaSet()
4456+
ExpectApplied(ctx, env.Client, rs)
4457+
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())
4458+
4459+
pod := test.Pod(test.PodOptions{ObjectMeta: metav1.ObjectMeta{
4460+
Labels: labels,
4461+
OwnerReferences: []metav1.OwnerReference{
4462+
{
4463+
APIVersion: "apps/v1",
4464+
Kind: "ReplicaSet",
4465+
Name: rs.Name,
4466+
UID: rs.UID,
4467+
Controller: lo.ToPtr(true),
4468+
BlockOwnerDeletion: lo.ToPtr(true),
4469+
},
4470+
},
4471+
}})
4472+
ExpectApplied(ctx, env.Client, rs, pod, node, nodeClaim, nodePool)
4473+
4474+
// bind pods to node
4475+
ExpectManualBinding(ctx, env.Client, pod, node)
4476+
4477+
// inform cluster state about nodes and nodeClaims
4478+
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim})
4479+
4480+
fakeClock.Step(10 * time.Minute)
4481+
4482+
// consolidation won't delete the old nodeclaim until the new nodeclaim is ready
4483+
var wg sync.WaitGroup
4484+
ExpectToWait(fakeClock, &wg)
4485+
ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1)
4486+
ExpectSingletonReconciled(ctx, disruptionController)
4487+
wg.Wait()
4488+
4489+
// Process the item so that the nodes can be deleted.
4490+
ExpectSingletonReconciled(ctx, queue)
4491+
4492+
// Cascade any deletion of the nodeclaim to the node
4493+
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim)
4494+
4495+
// should create a new nodeclaim as there is a cheaper one that can hold the pod
4496+
nodeClaims := ExpectNodeClaims(ctx, env.Client)
4497+
nodes := ExpectNodes(ctx, env.Client)
4498+
Expect(nodeClaims).To(HaveLen(1))
4499+
Expect(nodes).To(HaveLen(1))
4500+
4501+
Expect(nodeClaims[0].Name).ToNot(Equal(nodeClaim.Name))
4502+
// If the original capacity type was OD or spot, we should be able to consolidate into the reserved offering of the
4503+
// same type.
4504+
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Has(corev1.LabelInstanceTypeStable)).To(BeTrue())
4505+
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Get(corev1.LabelInstanceTypeStable).Has(mostExpensiveInstance.Name)).To(Equal(initialCapacityType != v1.CapacityTypeReserved))
4506+
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Has(cloudprovider.ReservationIDLabel)).To(BeTrue())
4507+
Expect(scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaims[0].Spec.Requirements...).Get(cloudprovider.ReservationIDLabel).Any()).To(Equal(lo.Ternary(
4508+
initialCapacityType == v1.CapacityTypeReserved,
4509+
leastExpensiveReservationID,
4510+
mostExpensiveReservationID,
4511+
)))
4512+
4513+
// and delete the old one
4514+
ExpectNotFound(ctx, env.Client, nodeClaim, node)
4515+
},
4516+
Entry("on-demand", v1.CapacityTypeOnDemand),
4517+
Entry("spot", v1.CapacityTypeSpot),
4518+
Entry("reserved", v1.CapacityTypeReserved),
4519+
)
4520+
})
43824521
})

pkg/controllers/nodeclaim/disruption/drift_test.go

+78
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package disruption_test
1818

1919
import (
20+
"fmt"
2021
"time"
2122

2223
"github.com/imdario/mergo"
@@ -26,10 +27,13 @@ import (
2627
corev1 "k8s.io/api/core/v1"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829

30+
"sigs.k8s.io/controller-runtime/pkg/client"
31+
2932
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
3033
"sigs.k8s.io/karpenter/pkg/cloudprovider"
3134
"sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/disruption"
3235
"sigs.k8s.io/karpenter/pkg/controllers/nodepool/hash"
36+
"sigs.k8s.io/karpenter/pkg/operator/options"
3337
"sigs.k8s.io/karpenter/pkg/scheduling"
3438
"sigs.k8s.io/karpenter/pkg/test"
3539
. "sigs.k8s.io/karpenter/pkg/test/expectations"
@@ -521,4 +525,78 @@ var _ = Describe("Drift", func() {
521525
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted)).To(BeNil())
522526
})
523527
})
528+
Context("Reserved Capacity", func() {
529+
var reservedOffering *cloudprovider.Offering
530+
BeforeEach(func() {
531+
reservedOffering = &cloudprovider.Offering{
532+
Available: true,
533+
Requirements: scheduling.NewLabelRequirements(map[string]string{
534+
v1.CapacityTypeLabelKey: v1.CapacityTypeReserved,
535+
corev1.LabelTopologyZone: "test-zone-1a",
536+
cloudprovider.ReservationIDLabel: fmt.Sprintf("r-%s", it.Name),
537+
}),
538+
Price: it.Offerings[0].Price / 1_000_000.0,
539+
ReservationCapacity: 10,
540+
}
541+
it.Offerings = append(it.Offerings, reservedOffering)
542+
it.Requirements.Get(v1.CapacityTypeLabelKey).Insert(v1.CapacityTypeReserved)
543+
544+
nodePool.Spec.Template.Spec.Requirements = append(nodePool.Spec.Template.Spec.Requirements, v1.NodeSelectorRequirementWithMinValues{
545+
NodeSelectorRequirement: corev1.NodeSelectorRequirement{
546+
Key: v1.CapacityTypeLabelKey,
547+
Operator: corev1.NodeSelectorOpIn,
548+
Values: []string{v1.CapacityTypeReserved},
549+
},
550+
})
551+
552+
for _, o := range []client.Object{nodeClaim, node} {
553+
o.SetLabels(lo.Assign(o.GetLabels(), map[string]string{
554+
v1.CapacityTypeLabelKey: v1.CapacityTypeReserved,
555+
cloudprovider.ReservationIDLabel: reservedOffering.ReservationID(),
556+
}))
557+
}
558+
559+
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{CapacityReservations: lo.ToPtr(true)}}))
560+
})
561+
// This is required to support cloudproviders dynamically updating the capacity type based on reservation expirations
562+
It("should drift reserved nodeclaim if the capacity type label has been updated", func() {
563+
cp.Drifted = ""
564+
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
565+
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
566+
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
567+
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeFalse())
568+
569+
nodeClaim.Labels[v1.CapacityTypeLabelKey] = v1.CapacityTypeOnDemand
570+
ExpectApplied(ctx, env.Client, nodeClaim)
571+
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
572+
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
573+
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeTrue())
574+
})
575+
It("should drift reserved nodeclaims if there are no reserved offerings available for the nodepool", func() {
576+
cp.Drifted = ""
577+
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
578+
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
579+
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
580+
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeFalse())
581+
582+
it.Offerings = lo.Reject(it.Offerings, func(o *cloudprovider.Offering, _ int) bool {
583+
return o.CapacityType() == v1.CapacityTypeReserved
584+
})
585+
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
586+
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
587+
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeTrue())
588+
})
589+
It("should drift reserved nodeclaims if an offering with the reservation ID is no longer available for the nodepool", func() {
590+
cp.Drifted = ""
591+
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
592+
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
593+
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
594+
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeFalse())
595+
596+
reservedOffering.Requirements[cloudprovider.ReservationIDLabel] = scheduling.NewRequirement(cloudprovider.ReservationIDLabel, corev1.NodeSelectorOpIn, "test")
597+
ExpectObjectReconciled(ctx, env.Client, nodeClaimDisruptionController, nodeClaim)
598+
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
599+
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrifted).IsTrue()).To(BeTrue())
600+
})
601+
})
524602
})

pkg/controllers/provisioning/provisioner.go

+22-5
Original file line numberDiff line numberDiff line change
@@ -316,13 +316,30 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
316316
}
317317
return scheduler.Results{}, fmt.Errorf("creating scheduler, %w", err)
318318
}
319+
319320
results := s.Solve(ctx, pods).TruncateInstanceTypes(scheduler.MaxInstanceTypes)
320-
if len(results.ReservedOfferingErrors) != 0 {
321-
log.FromContext(ctx).V(1).WithValues("Pods", pretty.Slice(lo.Map(lo.Keys(results.ReservedOfferingErrors), func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5)).Info("deferring scheduling decision for provisionable pod(s) to future simulation due to limited reserved offering capacity")
322-
}
323-
scheduler.UnschedulablePodsCount.Set(float64(len(results.PodErrors)), map[string]string{scheduler.ControllerLabel: injection.GetControllerName(ctx)})
321+
reservedOfferingErrors := results.ReservedOfferingErrors()
322+
if len(reservedOfferingErrors) != 0 {
323+
log.FromContext(ctx).V(1).WithValues(
324+
"Pods", pretty.Slice(lo.Map(lo.Keys(reservedOfferingErrors), func(p *corev1.Pod, _ int) string {
325+
return klog.KRef(p.Namespace, p.Name).String()
326+
}), 5),
327+
).Info("deferring scheduling decision for provisionable pod(s) to future simulation due to limited reserved offering capacity")
328+
}
329+
scheduler.UnschedulablePodsCount.Set(
330+
// A reserved offering error doesn't indicate a pod is unschedulable, just that the scheduling decision was deferred.
331+
float64(len(results.PodErrors)-len(reservedOfferingErrors)),
332+
map[string]string{
333+
scheduler.ControllerLabel: injection.GetControllerName(ctx),
334+
},
335+
)
324336
if len(results.NewNodeClaims) > 0 {
325-
log.FromContext(ctx).WithValues("Pods", pretty.Slice(lo.Map(pods, func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5), "duration", time.Since(start)).Info("found provisionable pod(s)")
337+
log.FromContext(ctx).WithValues(
338+
"Pods", pretty.Slice(lo.Map(pods, func(p *corev1.Pod, _ int) string {
339+
return klog.KRef(p.Namespace, p.Name).String()
340+
}), 5),
341+
"duration", time.Since(start),
342+
).Info("found provisionable pod(s)")
326343
}
327344
// Mark in memory when these pods were marked as schedulable or when we made a decision on the pods
328345
p.cluster.MarkPodSchedulingDecisions(results.PodErrors, pendingPods...)

0 commit comments

Comments
 (0)