Skip to content

Commit e523238

Browse files
committed
snapshots and disruption integration
1 parent afe2a47 commit e523238

File tree

10 files changed

+156
-72
lines changed

10 files changed

+156
-72
lines changed

kwok/cloudprovider/cloudprovider.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"math/rand"
2424
"strings"
2525

26+
"github.com/awslabs/operatorpkg/option"
2627
"github.com/awslabs/operatorpkg/status"
2728
"github.com/docker/docker/pkg/namesgenerator"
2829
"github.com/samber/lo"
@@ -109,7 +110,7 @@ func (c CloudProvider) List(ctx context.Context) ([]*v1.NodeClaim, error) {
109110
}
110111

111112
// Return the hard-coded instance types.
112-
func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
113+
func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, _ ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
113114
return c.instanceTypes, nil
114115
}
115116

pkg/cloudprovider/fake/cloudprovider.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"sync"
2525
"time"
2626

27+
"github.com/awslabs/operatorpkg/option"
2728
"github.com/awslabs/operatorpkg/status"
2829
"github.com/samber/lo"
2930
corev1 "k8s.io/api/core/v1"
@@ -189,7 +190,7 @@ func (c *CloudProvider) List(_ context.Context) ([]*v1.NodeClaim, error) {
189190
}), nil
190191
}
191192

192-
func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
193+
func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool, _ ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
193194
if np != nil {
194195
if err, ok := c.ErrorsForNodePool[np.Name]; ok {
195196
return nil, err

pkg/cloudprovider/metrics/cloudprovider.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121

2222
opmetrics "github.com/awslabs/operatorpkg/metrics"
23+
"github.com/awslabs/operatorpkg/option"
2324
"github.com/prometheus/client_golang/prometheus"
2425
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
2526

@@ -133,7 +134,7 @@ func (d *decorator) List(ctx context.Context) ([]*v1.NodeClaim, error) {
133134
return nodeClaims, err
134135
}
135136

136-
func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
137+
func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
137138
method := "GetInstanceTypes"
138139
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
139140
instanceType, err := d.CloudProvider.GetInstanceTypes(ctx, nodePool)

pkg/cloudprovider/types.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ import (
2525
"sync"
2626
"time"
2727

28+
"github.com/awslabs/operatorpkg/option"
2829
"github.com/awslabs/operatorpkg/status"
2930
"github.com/samber/lo"
3031
corev1 "k8s.io/api/core/v1"
32+
"k8s.io/apimachinery/pkg/types"
3133
"k8s.io/apimachinery/pkg/util/sets"
3234

3335
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
@@ -67,7 +69,7 @@ type CloudProvider interface {
6769
// Availability of types or zone may vary by nodepool or over time. Regardless of
6870
// availability, the GetInstanceTypes method should always return all instance types,
6971
// even those with no offerings available.
70-
GetInstanceTypes(context.Context, *v1.NodePool) ([]*InstanceType, error)
72+
GetInstanceTypes(context.Context, *v1.NodePool, ...option.Function[GetInstanceTypeOptions]) ([]*InstanceType, error)
7173
// IsDrifted returns whether a NodeClaim has drifted from the provisioning requirements
7274
// it is tied to.
7375
IsDrifted(context.Context, *v1.NodeClaim) (DriftReason, error)
@@ -81,6 +83,18 @@ type CloudProvider interface {
8183
GetSupportedNodeClasses() []status.Object
8284
}
8385

86+
type GetInstanceTypeOptions struct {
87+
AvailabilitySnapshotUUID types.UID
88+
}
89+
90+
// GetInstanceTypes calls made with the same snapshot ID should have a consistent view of offering availability. This
91+
// is crucial for offerings with capacity type "reserved" since cross-nodepool offerings may share availability.
92+
func WithAvailabilitySnapshotUUID(uuid types.UID) option.Function[GetInstanceTypeOptions] {
93+
return func(opts *GetInstanceTypeOptions) {
94+
opts.AvailabilitySnapshotUUID = uuid
95+
}
96+
}
97+
8498
// InstanceType describes the properties of a potential node (either concrete attributes of an instance of this type
8599
// or supported options in the case of arrays)
86100
type InstanceType struct {

pkg/controllers/disruption/helpers.go

+23-10
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events"
3434
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
3535
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
36-
pscheduling "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
36+
"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
3737
"sigs.k8s.io/karpenter/pkg/controllers/state"
3838
"sigs.k8s.io/karpenter/pkg/events"
3939
"sigs.k8s.io/karpenter/pkg/metrics"
@@ -48,7 +48,7 @@ var errCandidateDeleting = fmt.Errorf("candidate is deleting")
4848
//nolint:gocyclo
4949
func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner,
5050
candidates ...*Candidate,
51-
) (pscheduling.Results, error) {
51+
) (scheduling.Results, error) {
5252
candidateNames := sets.NewString(lo.Map(candidates, func(t *Candidate, i int) string { return t.Name() })...)
5353
nodes := cluster.Nodes()
5454
deletingNodes := nodes.Deleting()
@@ -62,33 +62,45 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
6262
if _, ok := lo.Find(deletingNodes, func(n *state.StateNode) bool {
6363
return candidateNames.Has(n.Name())
6464
}); ok {
65-
return pscheduling.Results{}, errCandidateDeleting
65+
return scheduling.Results{}, errCandidateDeleting
6666
}
6767

6868
// We get the pods that are on nodes that are deleting
6969
deletingNodePods, err := deletingNodes.ReschedulablePods(ctx, kubeClient)
7070
if err != nil {
71-
return pscheduling.Results{}, fmt.Errorf("failed to get pods from deleting nodes, %w", err)
71+
return scheduling.Results{}, fmt.Errorf("failed to get pods from deleting nodes, %w", err)
7272
}
7373
// start by getting all pending pods
7474
pods, err := provisioner.GetPendingPods(ctx)
7575
if err != nil {
76-
return pscheduling.Results{}, fmt.Errorf("determining pending pods, %w", err)
76+
return scheduling.Results{}, fmt.Errorf("determining pending pods, %w", err)
7777
}
7878
for _, n := range candidates {
7979
pods = append(pods, n.reschedulablePods...)
8080
}
8181
pods = append(pods, deletingNodePods...)
82-
scheduler, err := provisioner.NewScheduler(log.IntoContext(ctx, operatorlogging.NopLogger), pods, stateNodes)
82+
scheduler, err := provisioner.NewScheduler(
83+
log.IntoContext(ctx, operatorlogging.NopLogger),
84+
pods,
85+
stateNodes,
86+
// ReservedOfferingModeFallback is used for the following reasons:
87+
// - For consolidation, we're only going to accept a decision if it lowers the cost of the cluster, and if it only
88+
// requires a single additional nodeclaim. It doesn't matter in this scenario if we fallback.
89+
// - For drift, fallback is required to ensure progress. Progress is only ensured with strict if multiple scheduling
90+
// loops are allowed to proceed, but we need to ensure all pods on the drifted node are scheduled within a single
91+
// iteration. This may result in non-ideal instance choices, but the alternative is deadlock.
92+
// See issue TODO for more details.
93+
scheduling.ReservedOfferingModeFallback,
94+
)
8395
if err != nil {
84-
return pscheduling.Results{}, fmt.Errorf("creating scheduler, %w", err)
96+
return scheduling.Results{}, fmt.Errorf("creating scheduler, %w", err)
8597
}
8698

8799
deletingNodePodKeys := lo.SliceToMap(deletingNodePods, func(p *corev1.Pod) (client.ObjectKey, interface{}) {
88100
return client.ObjectKeyFromObject(p), nil
89101
})
90102

91-
results := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods).TruncateInstanceTypes(pscheduling.MaxInstanceTypes)
103+
results := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods).TruncateInstanceTypes(scheduling.MaxInstanceTypes)
92104
for _, n := range results.ExistingNodes {
93105
// We consider existing nodes for scheduling. When these nodes are unmanaged, their taint logic should
94106
// tell us if we can schedule to them or not; however, if these nodes are managed, we will still schedule to them
@@ -100,6 +112,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
100112
// If the pod is on a deleting node, we assume one of two things has already happened:
101113
// 1. The node was manually terminated, at which the provisioning controller has scheduled or is scheduling a node
102114
// for the pod.
115+
// TODO: clarify this point, not clear to me
103116
// 2. The node was chosen for a previous disruption command, we assume that the uninitialized node will come up
104117
// for this command, and we assume it will be successful. If it is not successful, the node will become
105118
// not terminating, and we will no longer need to consider these pods.
@@ -115,10 +128,10 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
115128
// UninitializedNodeError tracks a special pod error for disruption where pods schedule to a node
116129
// that hasn't been initialized yet, meaning that we can't be confident to make a disruption decision based off of it
117130
type UninitializedNodeError struct {
118-
*pscheduling.ExistingNode
131+
*scheduling.ExistingNode
119132
}
120133

121-
func NewUninitializedNodeError(node *pscheduling.ExistingNode) *UninitializedNodeError {
134+
func NewUninitializedNodeError(node *scheduling.ExistingNode) *UninitializedNodeError {
122135
return &UninitializedNodeError{ExistingNode: node}
123136
}
124137

pkg/controllers/provisioning/provisioner.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
corev1 "k8s.io/api/core/v1"
3333
"k8s.io/apimachinery/pkg/types"
3434
"k8s.io/apimachinery/pkg/util/sets"
35+
"k8s.io/apimachinery/pkg/util/uuid"
3536
"k8s.io/client-go/util/workqueue"
3637
"k8s.io/klog/v2"
3738
"k8s.io/utils/clock"
@@ -212,7 +213,12 @@ func (p *Provisioner) consolidationWarnings(ctx context.Context, pods []*corev1.
212213
var ErrNodePoolsNotFound = errors.New("no nodepools found")
213214

214215
//nolint:gocyclo
215-
func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stateNodes []*state.StateNode) (*scheduler.Scheduler, error) {
216+
func (p *Provisioner) NewScheduler(
217+
ctx context.Context,
218+
pods []*corev1.Pod,
219+
stateNodes []*state.StateNode,
220+
reservedOfferingMode scheduler.ReservedOfferingMode,
221+
) (*scheduler.Scheduler, error) {
216222
nodePools, err := nodepoolutils.ListManaged(ctx, p.kubeClient, p.cloudProvider)
217223
if err != nil {
218224
return nil, fmt.Errorf("listing nodepools, %w", err)
@@ -228,6 +234,8 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
228234
return nil, ErrNodePoolsNotFound
229235
}
230236

237+
schedulerID := uuid.NewUUID()
238+
231239
// nodeTemplates generated from NodePools are ordered by weight
232240
// since they are stored within a slice and scheduling
233241
// will always attempt to schedule on the first nodeTemplate
@@ -236,7 +244,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
236244
instanceTypes := map[string][]*cloudprovider.InstanceType{}
237245
domains := map[string]sets.Set[string]{}
238246
for _, np := range nodePools {
239-
its, err := p.cloudProvider.GetInstanceTypes(ctx, np)
247+
its, err := p.cloudProvider.GetInstanceTypes(ctx, np, cloudprovider.WithAvailabilitySnapshotUUID(schedulerID))
240248
if err != nil {
241249
log.FromContext(ctx).WithValues("NodePool", klog.KRef("", np.Name)).Error(err, "skipping, unable to resolve instance types")
242250
continue
@@ -295,7 +303,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
295303
if err != nil {
296304
return nil, fmt.Errorf("getting daemon pods, %w", err)
297305
}
298-
return scheduler.NewScheduler(ctx, p.kubeClient, nodePools, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock), nil
306+
return scheduler.NewScheduler(ctx, schedulerID, p.kubeClient, nodePools, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock, reservedOfferingMode), nil
299307
}
300308

301309
func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
@@ -332,7 +340,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
332340
if len(pods) == 0 {
333341
return scheduler.Results{}, nil
334342
}
335-
s, err := p.NewScheduler(ctx, pods, nodes.Active())
343+
s, err := p.NewScheduler(ctx, pods, nodes.Active(), scheduler.ReservedOfferingModeStrict)
336344
if err != nil {
337345
if errors.Is(err, ErrNodePoolsNotFound) {
338346
log.FromContext(ctx).Info("no nodepools found")

pkg/controllers/provisioning/scheduling/nodeclaim.go

+24-23
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,13 @@ import (
4242
type NodeClaim struct {
4343
NodeClaimTemplate
4444

45-
Pods []*corev1.Pod
46-
reservedOfferings map[string]cloudprovider.Offerings
47-
topology *Topology
48-
hostPortUsage *scheduling.HostPortUsage
49-
daemonResources corev1.ResourceList
50-
hostname string
45+
Pods []*corev1.Pod
46+
reservedOfferings map[string]cloudprovider.Offerings
47+
reservedOfferingMode ReservedOfferingMode
48+
topology *Topology
49+
hostPortUsage *scheduling.HostPortUsage
50+
daemonResources corev1.ResourceList
51+
hostname string
5152
}
5253

5354
type NodePoolLimitsExceededError struct {
@@ -58,11 +59,6 @@ func (e NodePoolLimitsExceededError) Error() string {
5859
return fmt.Sprintf("all available instance types exceed limits for nodepool: %q", e.nodePool)
5960
}
6061

61-
// type IncompatibleNodeClaimTemplateError struct {
62-
// nodePool string
63-
// daemonSetOverhead corev1.ResourceList
64-
// }
65-
6662
// ReservedOfferingError indicates a NodeClaim couldn't be created or a pod couldn't be added to an exxisting NodeClaim
6763
// due to
6864
type ReservedOfferingError struct {
@@ -88,6 +84,7 @@ func NewNodeClaimForPod(
8884
remainingResources corev1.ResourceList,
8985
pod *corev1.Pod,
9086
podRequests corev1.ResourceList,
87+
reservedOfferingMode ReservedOfferingMode,
9188
) (*NodeClaim, error) {
9289
// Ensure we don't consider instance types which would exceed the limits of the NodePool
9390
instanceTypes := filterByRemainingResources(nodeClaimTemplate.InstanceTypeOptions, remainingResources)
@@ -200,6 +197,7 @@ func (n *NodeClaim) Add(pod *corev1.Pod, podRequests corev1.ResourceList) error
200197
// offerings in a map from instance type name to offerings. Additionally, a slice of offerings to release is returned.
201198
// This is based on the previously reserved offerings which are no longer compatible with the nodeclaim. These should
202199
// not be released until we're ready to persist the changes to the nodeclaim.
200+
// nolint:gocyclo
203201
func (n *NodeClaim) reserveOfferings(
204202
instanceTypes []*cloudprovider.InstanceType,
205203
nodeClaimRequirements scheduling.Requirements,
@@ -225,18 +223,21 @@ func (n *NodeClaim) reserveOfferings(
225223
reservedOfferings[it.Name] = reserved
226224
}
227225
}
228-
// If an instance type with a compatible reserved offering exists, but we failed to make any reservations, we should
229-
// fail. We include this check due to the pessimistic nature of instance reservation - we have to reserve each potential
230-
// offering for every nodeclaim, since we don't know what we'll launch with. Without this check we would fall back to
231-
// on-demand / spot even when there's sufficient reserved capacity. This check means we may fail to schedule the pod
232-
// during this scheduling simulation, but with the possibility of success on subsequent simulations.
233-
if len(compatibleReservedInstanceTypes) != 0 && len(reservedOfferings) == 0 {
234-
return nil, nil, NewReservedOfferingError(fmt.Errorf("one or more instance types with compatible reserved offerings are available, but could not be reserved"))
235-
}
236-
// If the nodeclaim previously had compatible reserved offerings, but the additional requirements filtered those out,
237-
// we should fail to add the pod to this nodeclaim.
238-
if len(n.reservedOfferings) != 0 && len(reservedOfferings) == 0 {
239-
return nil, nil, NewReservedOfferingError(fmt.Errorf("satisfying constraints would result "))
226+
if n.reservedOfferingMode == ReservedOfferingModeStrict {
227+
// If an instance type with a compatible reserved offering exists, but we failed to make any reservations, we should
228+
// fail. We include this check due to the pessimistic nature of instance reservation - we have to reserve each potential
229+
// offering for every nodeclaim, since we don't know what we'll launch with. Without this check we would fall back to
230+
// on-demand / spot even when there's sufficient reserved capacity. This check means we may fail to schedule the pod
231+
// during this scheduling simulation, but with the possibility of success on subsequent simulations.
232+
// Note: while this can occur both on initial creation and on
233+
if len(compatibleReservedInstanceTypes) != 0 && len(reservedOfferings) == 0 {
234+
return nil, nil, NewReservedOfferingError(fmt.Errorf("one or more instance types with compatible reserved offerings are available, but could not be reserved"))
235+
}
236+
// If the nodeclaim previously had compatible reserved offerings, but the additional requirements filtered those out,
237+
// we should fail to add the pod to this nodeclaim.
238+
if len(n.reservedOfferings) != 0 && len(reservedOfferings) == 0 {
239+
return nil, nil, NewReservedOfferingError(fmt.Errorf("satisfying constraints would result "))
240+
}
240241
}
241242
// Ensure we release any offerings for instance types which are no longer compatible with nodeClaimRequirements
242243
for instanceName, offerings := range n.reservedOfferings {

0 commit comments

Comments
 (0)