Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't provision unnecessary capacity for pods which can't move to a new node #2033

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func NewControllers(
cloudProvider cloudprovider.CloudProvider,
cluster *state.Cluster,
) []controller.Controller {
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster, clock)
evictionQueue := terminator.NewQueue(kubeClient, recorder)
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster, evictionQueue, clock)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)

controllers := []controller.Controller{
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/disruption/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,7 @@ var _ = Describe("Drift", func() {
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, nodeClaim)
})
It("should drift nodes that have pods with the karpenter.sh/do-not-disrupt annotation when the NodePool's TerminationGracePeriod is not nil", func() {
nodeClaim.Spec.TerminationGracePeriod = &metav1.Duration{Duration: time.Second * 300}
It("should ignore nodes that have pods with the karpenter.sh/do-not-disrupt annotation when the NodePool's TerminationGracePeriod is not nil", func() {
pod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
Expand All @@ -570,8 +569,9 @@ var _ = Describe("Drift", func() {

ExpectSingletonReconciled(ctx, disruptionController)

// Expect to create a replacement but not delete the old nodeclaim
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2)) // new nodeclaim is created for drift
// Pods with `karpenter.sh/do-not-disrupt` can't be evicted and hence can't be rescheduled on a new node.
// Expect no new nodeclaims to be created.
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, nodeClaim)
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/disruption/orchestration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/controllers/state/informer"
Expand Down Expand Up @@ -85,7 +86,7 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cloudProvider, cluster)
recorder = test.NewEventRecorder()
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, fakeClock)
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, terminator.NewTestingQueue(env.Client, recorder), fakeClock)
queue = NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)
})

Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/disruption"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/controllers/state/informer"
Expand Down Expand Up @@ -96,7 +97,7 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cloudProvider, cluster)
recorder = test.NewEventRecorder()
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, fakeClock)
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, terminator.NewTestingQueue(env.Client, recorder), fakeClock)
queue = NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)
disruptionController = disruption.NewController(fakeClock, env.Client, prov, cloudProvider, recorder, cluster, queue)
})
Expand Down
67 changes: 48 additions & 19 deletions pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ func NewQueueKey(pod *corev1.Pod, providerID string) QueueKey {
type Queue struct {
workqueue.TypedRateLimitingInterface[QueueKey]

mu sync.Mutex
set sets.Set[QueueKey]
mu sync.Mutex
set sets.Set[QueueKey]
evictionError map[types.UID]error

kubeClient client.Client
recorder events.Recorder
Expand All @@ -99,16 +100,18 @@ func NewQueue(kubeClient client.Client, recorder events.Recorder) *Queue {
workqueue.TypedRateLimitingQueueConfig[QueueKey]{
Name: "eviction.workqueue",
}),
set: sets.New[QueueKey](),
kubeClient: kubeClient,
recorder: recorder,
set: sets.New[QueueKey](),
evictionError: map[types.UID]error{},
kubeClient: kubeClient,
recorder: recorder,
}
}

func NewTestingQueue(kubeClient client.Client, recorder events.Recorder) *Queue {
return &Queue{
TypedRateLimitingInterface: &controllertest.TypedQueue[QueueKey]{TypedInterface: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[QueueKey]{Name: "eviction.workqueue"})},
set: sets.New[QueueKey](),
evictionError: map[types.UID]error{},
kubeClient: kubeClient,
recorder: recorder,
}
Expand All @@ -135,6 +138,35 @@ func (q *Queue) Add(node *corev1.Node, pods ...*corev1.Pod) {
}
}

// Delete key from the Queue
func (q *Queue) Delete(key QueueKey) {
q.mu.Lock()
defer q.mu.Unlock()

q.TypedRateLimitingInterface.Forget(key)
q.set.Delete(key)
delete(q.evictionError, key.UID)
}

// Requeue key in the Queue
func (q *Queue) Requeue(key QueueKey, err error) {
q.mu.Lock()
defer q.mu.Unlock()

if err != nil {
q.evictionError[key.UID] = err
}
q.TypedRateLimitingInterface.AddRateLimited(key)
}

// Get eviction error for a pod.
func (q *Queue) EvictionError(pod *corev1.Pod) error {
q.mu.Lock()
defer q.mu.Unlock()

return q.evictionError[pod.UID]
}

func (q *Queue) Has(node *corev1.Node, pod *corev1.Pod) bool {
q.mu.Lock()
defer q.mu.Unlock()
Expand All @@ -159,21 +191,18 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) {
defer q.TypedRateLimitingInterface.Done(item)

// Evict the pod
if q.Evict(ctx, item) {
q.TypedRateLimitingInterface.Forget(item)
q.mu.Lock()
q.set.Delete(item)
q.mu.Unlock()
if evicted, evictionError := q.Evict(ctx, item); evicted {
q.Delete(item)
return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil
} else {
// Requeue pod if eviction failed
q.Requeue(item, evictionError)
return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil
}

// Requeue pod if eviction failed
q.TypedRateLimitingInterface.AddRateLimited(item)
return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil
}

// Evict returns true if successful eviction call, and false if there was an eviction-related error
func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
func (q *Queue) Evict(ctx context.Context, key QueueKey) (bool, error) {
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Pod", klog.KRef(key.Namespace, key.Name)))
evictionMessage, err := evictionReason(ctx, key, q.kubeClient)
if err != nil {
Expand Down Expand Up @@ -201,21 +230,21 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
// https://github.com/kubernetes/kubernetes/blob/ad19beaa83363de89a7772f4d5af393b85ce5e61/pkg/registry/core/pod/storage/eviction.go#L160
// 409 - The pod exists, but it is not the same pod that we initiated the eviction on
// https://github.com/kubernetes/kubernetes/blob/ad19beaa83363de89a7772f4d5af393b85ce5e61/pkg/registry/core/pod/storage/eviction.go#L318
return true
return true, nil
}
if apierrors.IsTooManyRequests(err) { // 429 - PDB violation
q.recorder.Publish(terminatorevents.NodeFailedToDrain(&corev1.Node{ObjectMeta: metav1.ObjectMeta{
Name: key.Name,
Namespace: key.Namespace,
}}, fmt.Errorf("evicting pod %s/%s violates a PDB", key.Namespace, key.Name)))
return false
return false, err
}
log.FromContext(ctx).Error(err, "failed evicting pod")
return false
return false, err
}
NodesEvictionRequestsTotal.Inc(map[string]string{CodeLabel: "200"})
q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}, evictionMessage))
return true
return true, nil
}

func evictionReason(ctx context.Context, key QueueKey, kubeClient client.Client) (string, error) {
Expand Down
8 changes: 6 additions & 2 deletions pkg/controllers/node/termination/terminator/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ var _ = Describe("Eviction/Queue", func() {
})
It("should return a NodeDrainError event when a PDB is blocking", func() {
ExpectApplied(ctx, env.Client, pdb, pod)
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))).To(BeFalse())
evicted, err := queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))
Expect(evicted).To(BeFalse())
Expect(err).To(Not(BeNil()))
Expect(recorder.Calls(events.FailedDraining)).To(Equal(1))
})
It("should fail when two PDBs refer to the same pod", func() {
Expand All @@ -135,7 +137,9 @@ var _ = Describe("Eviction/Queue", func() {
MaxUnavailable: &intstr.IntOrString{IntVal: 0},
})
ExpectApplied(ctx, env.Client, pdb, pdb2, pod)
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))).To(BeFalse())
evicted, err := queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))
Expect(evicted).To(BeFalse())
Expect(err).To(Not(BeNil()))
ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{terminator.CodeLabel: "500"})
})
It("should ensure that calling Evict() is valid while making Add() calls", func() {
Expand Down
12 changes: 11 additions & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.uber.org/multierr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
Expand All @@ -42,6 +43,7 @@ import (

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
scheduler "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/events"
Expand Down Expand Up @@ -76,21 +78,23 @@ type Provisioner struct {
batcher *Batcher[types.UID]
volumeTopology *scheduler.VolumeTopology
cluster *state.Cluster
evictionQueue *terminator.Queue
recorder events.Recorder
cm *pretty.ChangeMonitor
clock clock.Clock
}

func NewProvisioner(kubeClient client.Client, recorder events.Recorder,
cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster,
clock clock.Clock,
evictionQueue *terminator.Queue, clock clock.Clock,
) *Provisioner {
p := &Provisioner{
batcher: NewBatcher[types.UID](clock),
cloudProvider: cloudProvider,
kubeClient: kubeClient,
volumeTopology: scheduler.NewVolumeTopology(kubeClient),
cluster: cluster,
evictionQueue: evictionQueue,
recorder: recorder,
cm: pretty.NewChangeMonitor(),
clock: clock,
Expand Down Expand Up @@ -293,6 +297,12 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
if err != nil {
return scheduler.Results{}, err
}
// Don't provision capacity for pods which aren't getting evicted due to PDB violation.
// Since Karpenter doesn't know when these pods will be successfully evicted, spinning up capacity until
// these pods are evicted is wasteful.
deletingNodePods = lo.Filter(deletingNodePods, func(pod *corev1.Pod, _ int) bool {
return !apierrors.IsTooManyRequests(p.evictionQueue.EvictionError(pod))
})
pods := append(pendingPods, deletingNodePods...)
// nothing to schedule, so just return success
if len(pods) == 0 {
Expand Down
43 changes: 42 additions & 1 deletion pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import (
corev1 "k8s.io/api/core/v1"
nodev1 "k8s.io/api/node/v1"
storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
cloudproviderapi "k8s.io/cloud-provider/api"
Expand All @@ -47,6 +49,7 @@ import (
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/controllers/state"
Expand All @@ -71,6 +74,7 @@ var nodeStateController *informer.NodeController
var nodeClaimStateController *informer.NodeClaimController
var podStateController *informer.PodController
var podController *provisioning.PodController
var evictionQueue *terminator.Queue

const csiProvider = "fake.csi.provider"
const isDefaultStorageClassAnnotation = "storageclass.kubernetes.io/is-default-class"
Expand All @@ -93,7 +97,8 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cloudProvider, cluster)
podStateController = informer.NewPodController(env.Client, cluster)
prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster, fakeClock)
evictionQueue = terminator.NewTestingQueue(env.Client, test.NewEventRecorder())
prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster, evictionQueue, fakeClock)
podController = provisioning.NewPodController(env.Client, prov, cluster)
})

Expand Down Expand Up @@ -3647,6 +3652,42 @@ var _ = Context("Scheduling", func() {
Expect(n.Labels[corev1.LabelInstanceTypeStable]).To(Equal("small-instance-type"))
}
})
It("should not reschedule pods from a deleting node when pods are failing to evict due to PDB", func() {
ExpectApplied(ctx, env.Client, nodePool)
podLabels := map[string]string{"test": "value"}
pod := test.UnschedulablePod(
test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
},
ResourceRequirements: corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceMemory: resource.MustParse("100M"),
},
}})
pdb := test.PodDisruptionBudget(test.PDBOptions{
Labels: podLabels,
MaxUnavailable: lo.ToPtr(intstr.FromInt(0)),
})
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod)
ExpectApplied(ctx, env.Client, pdb)
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels[corev1.LabelInstanceTypeStable]).To(Equal("small-instance-type"))

// Mark for deletion so that we consider all pods on this node for reschedulability
cluster.MarkForDeletion(node.Spec.ProviderID)

// Add the pod to the eviction queue as a failed eviction
evictionQueue.Add(node, pod)
evictionQueue.Requeue(terminator.NewQueueKey(pod, node.Spec.ProviderID), apierrors.NewTooManyRequestsError("PDB"))

// Trigger a provisioning loop and expect that we don't create more nodes
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov)

// We shouldn't create an additional node here because this pod's eviction is blocked due to PDB
nodes := ExpectNodes(ctx, env.Client)
Expect(nodes).To(HaveLen(1))
})
})

Describe("Metrics", func() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/provisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
pscheduling "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -79,7 +80,7 @@ var _ = BeforeSuite(func() {
fakeClock = clock.NewFakeClock(time.Now())
cluster = state.NewCluster(fakeClock, env.Client, cloudProvider)
nodeController = informer.NewNodeController(env.Client, cluster)
prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster, fakeClock)
prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster, terminator.NewTestingQueue(env.Client, test.NewEventRecorder()), fakeClock)
daemonsetController = informer.NewDaemonSetController(env.Client, cluster)
instanceTypes, _ := cloudProvider.GetInstanceTypes(ctx, nil)
instanceTypeMap = map[string]*cloudprovider.InstanceType{}
Expand Down
4 changes: 3 additions & 1 deletion pkg/utils/pod/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ func IsActive(pod *corev1.Pod) bool {
// - Is an active pod (isn't terminal or actively terminating) OR Is owned by a StatefulSet and Is Terminating
// - Isn't owned by a DaemonSet
// - Isn't a mirror pod (https://kubernetes.io/docs/tasks/configure-pod-container/static-pod/)
// - Does not have the "karpenter.sh/do-not-disrupt=true" annotation (https://karpenter.sh/docs/concepts/disruption/#pod-level-controls)
func IsReschedulable(pod *corev1.Pod) bool {
// StatefulSet pods can be handled differently here because we know that StatefulSet pods MUST
// get deleted before new pods are re-created. This means that we can model terminating pods for StatefulSets
// differently for higher availability by considering terminating pods for scheduling
return (IsActive(pod) || (IsOwnedByStatefulSet(pod) && IsTerminating(pod))) &&
!IsOwnedByDaemonSet(pod) &&
!IsOwnedByNode(pod)
!IsOwnedByNode(pod) &&
!HasDoNotDisrupt(pod)
}

// IsEvictable checks if a pod is evictable by Karpenter by ensuring that the pod:
Expand Down