Skip to content

Commit b67556e

Browse files
committed
feat: reserved capacity
1 parent ff18b57 commit b67556e

24 files changed

+737
-183
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module sigs.k8s.io/karpenter
22

3-
go 1.23.6
3+
go 1.24.0
44

55
require (
66
github.com/Pallinder/go-randomdata v1.2.0

kwok/cloudprovider/cloudprovider.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"math/rand"
2424
"strings"
2525

26+
"github.com/awslabs/operatorpkg/option"
2627
"github.com/awslabs/operatorpkg/status"
2728
"github.com/docker/docker/pkg/namesgenerator"
2829
"github.com/samber/lo"
@@ -109,7 +110,7 @@ func (c CloudProvider) List(ctx context.Context) ([]*v1.NodeClaim, error) {
109110
}
110111

111112
// Return the hard-coded instance types.
112-
func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
113+
func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, _ ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
113114
return c.instanceTypes, nil
114115
}
115116

kwok/cloudprovider/helpers.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,11 @@ func newInstanceType(options InstanceTypeOptions) *cloudprovider.InstanceType {
186186
Requirements: requirements,
187187
Offerings: lo.Map(options.Offerings, func(off KWOKOffering, _ int) cloudprovider.Offering {
188188
return cloudprovider.Offering{
189+
ReservationManager: off.Offering.ReservationManager,
189190
Requirements: scheduling.NewRequirements(lo.Map(off.Requirements, func(req corev1.NodeSelectorRequirement, _ int) *scheduling.Requirement {
190191
return scheduling.NewRequirement(req.Key, req.Operator, req.Values...)
191192
})...),
192-
Price: off.Offering.Price,
193-
Available: off.Offering.Available,
193+
Price: off.Offering.Price,
194194
}
195195
}),
196196
Capacity: options.Resources,

kwok/tools/gen_instance_types.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ func constructGenericInstanceTypes() []kwok.InstanceTypeOptions {
9898
corev1.NodeSelectorRequirement{Key: corev1.LabelTopologyZone, Operator: corev1.NodeSelectorOpIn, Values: []string{zone}},
9999
},
100100
Offering: cloudprovider.Offering{
101-
Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price),
102101
Available: true,
102+
Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price),
103103
},
104104
})
105105
}

pkg/apis/v1/labels.go

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const (
3333
ArchitectureArm64 = "arm64"
3434
CapacityTypeSpot = "spot"
3535
CapacityTypeOnDemand = "on-demand"
36+
CapacityTypeReserved = "reserved"
3637
)
3738

3839
// Karpenter specific domains and labels

pkg/cloudprovider/fake/cloudprovider.go

+44-13
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"sync"
2525
"time"
2626

27+
"github.com/awslabs/operatorpkg/option"
2728
"github.com/awslabs/operatorpkg/status"
2829
"github.com/samber/lo"
2930
corev1 "k8s.io/api/core/v1"
@@ -40,6 +41,10 @@ import (
4041
"sigs.k8s.io/karpenter/pkg/utils/resources"
4142
)
4243

44+
func init() {
45+
v1.WellKnownLabels = v1.WellKnownLabels.Insert(v1alpha1.LabelReservationID)
46+
}
47+
4348
var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)
4449

4550
type CloudProvider struct {
@@ -61,14 +66,17 @@ type CloudProvider struct {
6166
Drifted cloudprovider.DriftReason
6267
NodeClassGroupVersionKind []schema.GroupVersionKind
6368
RepairPolicy []cloudprovider.RepairPolicy
69+
70+
ReservationManagerProvider *ReservationManagerProvider
6471
}
6572

6673
func NewCloudProvider() *CloudProvider {
6774
return &CloudProvider{
68-
AllowedCreateCalls: math.MaxInt,
69-
CreatedNodeClaims: map[string]*v1.NodeClaim{},
70-
InstanceTypesForNodePool: map[string][]*cloudprovider.InstanceType{},
71-
ErrorsForNodePool: map[string]error{},
75+
AllowedCreateCalls: math.MaxInt,
76+
CreatedNodeClaims: map[string]*v1.NodeClaim{},
77+
InstanceTypesForNodePool: map[string][]*cloudprovider.InstanceType{},
78+
ErrorsForNodePool: map[string]error{},
79+
ReservationManagerProvider: NewReservationManagerProvider(),
7280
}
7381
}
7482

@@ -102,6 +110,7 @@ func (c *CloudProvider) Reset() {
102110
TolerationDuration: 30 * time.Minute,
103111
},
104112
}
113+
c.ReservationManagerProvider.Reset()
105114
}
106115

107116
func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1.NodeClaim, error) {
@@ -139,14 +148,19 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v
139148
labels[key] = requirement.Values()[0]
140149
}
141150
}
142-
// Find Offering
143-
for _, o := range instanceType.Offerings.Available() {
144-
if reqs.IsCompatible(o.Requirements, scheduling.AllowUndefinedWellKnownLabels) {
145-
labels[corev1.LabelTopologyZone] = o.Requirements.Get(corev1.LabelTopologyZone).Any()
146-
labels[v1.CapacityTypeLabelKey] = o.Requirements.Get(v1.CapacityTypeLabelKey).Any()
147-
break
151+
// Find offering, prioritizing reserved instances
152+
offering := func() cloudprovider.Offering {
153+
offerings := instanceType.Offerings.Available().Compatible(reqs)
154+
lo.Must0(len(offerings) != 0, "created nodeclaim with no available offerings")
155+
if reservedOfferings := offerings.WithCapacityType(v1.CapacityTypeReserved); len(reservedOfferings) != 0 {
156+
c.ReservationManagerProvider.create(reservedOfferings[0].Requirements.Get(v1alpha1.LabelReservationID).Any())
157+
return reservedOfferings[0]
148158
}
149-
}
159+
return offerings[0]
160+
}()
161+
labels[corev1.LabelTopologyZone] = offering.Requirements.Get(corev1.LabelTopologyZone).Any()
162+
labels[v1.CapacityTypeLabelKey] = offering.Requirements.Get(v1.CapacityTypeLabelKey).Any()
163+
150164
created := &v1.NodeClaim{
151165
ObjectMeta: metav1.ObjectMeta{
152166
Name: nodeClaim.Name,
@@ -189,7 +203,8 @@ func (c *CloudProvider) List(_ context.Context) ([]*v1.NodeClaim, error) {
189203
}), nil
190204
}
191205

192-
func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
206+
// Note: this fake implementation does **not** support availability snapshots. The burden of testing snapshot support should be on the cloudprovider implementation.
207+
func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
193208
if np != nil {
194209
if err, ok := c.ErrorsForNodePool[np.Name]; ok {
195210
return nil, err
@@ -200,7 +215,23 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([]
200215
}
201216
}
202217
if c.InstanceTypes != nil {
203-
return c.InstanceTypes, nil
218+
return lo.Map(c.InstanceTypes, func(it *cloudprovider.InstanceType, _ int) *cloudprovider.InstanceType {
219+
for i := range it.Offerings {
220+
if it.Offerings[i].Requirements.Get(v1.CapacityTypeLabelKey).Any() != v1.CapacityTypeReserved {
221+
continue
222+
}
223+
lo.Must0(
224+
it.Offerings[i].Requirements.Has(v1alpha1.LabelReservationID),
225+
"reserved offering for instance type %s must define requirement for label %s",
226+
it.Name,
227+
v1alpha1.LabelReservationID,
228+
)
229+
reservationID := it.Offerings[i].Requirements.Get(v1alpha1.LabelReservationID).Any()
230+
it.Offerings[i].ReservationManager = c.ReservationManagerProvider.ReservationManager(reservationID, opts...)
231+
it.Offerings[i].Available = c.ReservationManagerProvider.Capacity(reservationID) > 0
232+
}
233+
return it
234+
}), nil
204235
}
205236
return []*cloudprovider.InstanceType{
206237
NewInstanceType(InstanceTypeOptions{

pkg/cloudprovider/fake/instancetype.go

+42-22
Original file line numberDiff line numberDiff line change
@@ -65,26 +65,46 @@ func NewInstanceTypeWithCustomRequirement(options InstanceTypeOptions, customReq
6565
}
6666
if len(options.Offerings) == 0 {
6767
options.Offerings = []cloudprovider.Offering{
68-
{Requirements: scheduling.NewLabelRequirements(map[string]string{
69-
v1.CapacityTypeLabelKey: "spot",
70-
corev1.LabelTopologyZone: "test-zone-1",
71-
}), Price: PriceFromResources(options.Resources), Available: true},
72-
{Requirements: scheduling.NewLabelRequirements(map[string]string{
73-
v1.CapacityTypeLabelKey: "spot",
74-
corev1.LabelTopologyZone: "test-zone-2",
75-
}), Price: PriceFromResources(options.Resources), Available: true},
76-
{Requirements: scheduling.NewLabelRequirements(map[string]string{
77-
v1.CapacityTypeLabelKey: "on-demand",
78-
corev1.LabelTopologyZone: "test-zone-1",
79-
}), Price: PriceFromResources(options.Resources), Available: true},
80-
{Requirements: scheduling.NewLabelRequirements(map[string]string{
81-
v1.CapacityTypeLabelKey: "on-demand",
82-
corev1.LabelTopologyZone: "test-zone-2",
83-
}), Price: PriceFromResources(options.Resources), Available: true},
84-
{Requirements: scheduling.NewLabelRequirements(map[string]string{
85-
v1.CapacityTypeLabelKey: "on-demand",
86-
corev1.LabelTopologyZone: "test-zone-3",
87-
}), Price: PriceFromResources(options.Resources), Available: true},
68+
{
69+
Available: true,
70+
Requirements: scheduling.NewLabelRequirements(map[string]string{
71+
v1.CapacityTypeLabelKey: "spot",
72+
corev1.LabelTopologyZone: "test-zone-1",
73+
}),
74+
Price: PriceFromResources(options.Resources),
75+
},
76+
{
77+
Available: true,
78+
Requirements: scheduling.NewLabelRequirements(map[string]string{
79+
v1.CapacityTypeLabelKey: "spot",
80+
corev1.LabelTopologyZone: "test-zone-2",
81+
}),
82+
Price: PriceFromResources(options.Resources),
83+
},
84+
{
85+
Available: true,
86+
Requirements: scheduling.NewLabelRequirements(map[string]string{
87+
v1.CapacityTypeLabelKey: "on-demand",
88+
corev1.LabelTopologyZone: "test-zone-1",
89+
}),
90+
Price: PriceFromResources(options.Resources),
91+
},
92+
{
93+
Available: true,
94+
Requirements: scheduling.NewLabelRequirements(map[string]string{
95+
v1.CapacityTypeLabelKey: "on-demand",
96+
corev1.LabelTopologyZone: "test-zone-2",
97+
}),
98+
Price: PriceFromResources(options.Resources),
99+
},
100+
{
101+
Available: true,
102+
Requirements: scheduling.NewLabelRequirements(map[string]string{
103+
v1.CapacityTypeLabelKey: "on-demand",
104+
corev1.LabelTopologyZone: "test-zone-3",
105+
}),
106+
Price: PriceFromResources(options.Resources),
107+
},
88108
}
89109
}
90110
if len(options.Architecture) == 0 {
@@ -153,12 +173,12 @@ func InstanceTypesAssorted() []*cloudprovider.InstanceType {
153173
price := PriceFromResources(opts.Resources)
154174
opts.Offerings = []cloudprovider.Offering{
155175
{
176+
Available: true,
156177
Requirements: scheduling.NewLabelRequirements(map[string]string{
157178
v1.CapacityTypeLabelKey: ct,
158179
corev1.LabelTopologyZone: zone,
159180
}),
160-
Price: price,
161-
Available: true,
181+
Price: price,
162182
},
163183
}
164184
instanceTypes = append(instanceTypes, NewInstanceType(opts))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package fake
2+
3+
import (
4+
"maps"
5+
6+
"github.com/awslabs/operatorpkg/option"
7+
"github.com/samber/lo"
8+
"k8s.io/apimachinery/pkg/types"
9+
"k8s.io/apimachinery/pkg/util/sets"
10+
"sigs.k8s.io/karpenter/pkg/cloudprovider"
11+
)
12+
13+
type ReservationManagerProvider struct {
14+
snapshots map[types.UID]*snapshot
15+
capacity map[string]int // map[offering name]total capacity
16+
}
17+
18+
type snapshot struct {
19+
reservations map[string]sets.Set[string] // map[reservation id]set[offering name]
20+
capacity map[string]int
21+
}
22+
23+
func NewReservationManagerProvider() *ReservationManagerProvider {
24+
return &ReservationManagerProvider{
25+
snapshots: map[types.UID]*snapshot{},
26+
capacity: map[string]int{},
27+
}
28+
}
29+
30+
// SetCapacity sets the total number of instances available for a given reservationID. This value will be decremented
31+
// internally each time an instance is launched for the given reservationID.
32+
func (p *ReservationManagerProvider) SetCapacity(reservationID string, capacity int) {
33+
p.capacity[reservationID] = capacity
34+
}
35+
36+
// Capacity returns the total number of instances
37+
func (p *ReservationManagerProvider) Capacity(reservationID string) int {
38+
return p.capacity[reservationID]
39+
}
40+
41+
// create decrements the availability for the given reservationID by one.
42+
func (p *ReservationManagerProvider) create(reservationID string) {
43+
lo.Must0(p.capacity[reservationID] > 0, "created an instance with an offering with no availability")
44+
p.capacity[reservationID] -= 1
45+
if p.capacity[reservationID] == 0 {
46+
47+
}
48+
}
49+
50+
// getSnapshot returns an existing snapshot, if one exists for the given UUID, or creates a new one
51+
func (p *ReservationManagerProvider) getSnapshot(uuid *types.UID) *snapshot {
52+
if uuid != nil {
53+
if snapshot, ok := p.snapshots[*uuid]; ok {
54+
return snapshot
55+
}
56+
}
57+
snapshot := &snapshot{
58+
reservations: map[string]sets.Set[string]{},
59+
capacity: map[string]int{},
60+
}
61+
maps.Copy(snapshot.capacity, p.capacity)
62+
if uuid != nil {
63+
p.snapshots[*uuid] = snapshot
64+
}
65+
return snapshot
66+
}
67+
68+
func (p *ReservationManagerProvider) Reset() {
69+
*p = *NewReservationManagerProvider()
70+
}
71+
72+
func (p *ReservationManagerProvider) ReservationManager(reservationID string, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) cloudprovider.ReservationManager {
73+
return snapshotAdapter{
74+
snapshot: p.getSnapshot(option.Resolve(opts...).AvailabilitySnapshotUUID),
75+
reservationID: reservationID,
76+
}
77+
}
78+
79+
type snapshotAdapter struct {
80+
*snapshot
81+
reservationID string
82+
}
83+
84+
func (a snapshotAdapter) Reserve(reservationID string) bool {
85+
if reservations, ok := a.reservations[reservationID]; ok && reservations.Has(a.reservationID) {
86+
return true
87+
}
88+
if a.capacity[a.reservationID] > 0 {
89+
reservations, ok := a.reservations[reservationID]
90+
if !ok {
91+
reservations = sets.New[string]()
92+
a.reservations[reservationID] = reservations
93+
}
94+
reservations.Insert(a.reservationID)
95+
a.capacity[a.reservationID] -= 1
96+
return true
97+
}
98+
return false
99+
}
100+
101+
func (a snapshotAdapter) Release(reservationID string) {
102+
if reservations, ok := a.reservations[reservationID]; ok && reservations.Has(a.reservationID) {
103+
reservations.Delete(a.reservationID)
104+
}
105+
}

pkg/cloudprovider/metrics/cloudprovider.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121

2222
opmetrics "github.com/awslabs/operatorpkg/metrics"
23+
"github.com/awslabs/operatorpkg/option"
2324
"github.com/prometheus/client_golang/prometheus"
2425
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
2526

@@ -133,7 +134,7 @@ func (d *decorator) List(ctx context.Context) ([]*v1.NodeClaim, error) {
133134
return nodeClaims, err
134135
}
135136

136-
func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
137+
func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
137138
method := "GetInstanceTypes"
138139
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
139140
instanceType, err := d.CloudProvider.GetInstanceTypes(ctx, nodePool)

0 commit comments

Comments
 (0)