From 5392f91d33fbfbe09d1ad46662c99365487623cf Mon Sep 17 00:00:00 2001 From: Jason Deal Date: Wed, 4 Dec 2024 13:41:27 -0800 Subject: [PATCH] remaining refactor --- pkg/controllers/controllers.go | 6 +- pkg/controllers/node/health/suite_test.go | 6 +- .../node/termination/controller.go | 112 +++++++---- pkg/controllers/node/termination/drain.go | 129 +++++++++++-- .../{terminator/events => }/events.go | 27 +-- .../node/termination/eviction/events.go | 43 +++++ .../{terminator => eviction}/eviction.go | 25 +-- .../{terminator => eviction}/metrics.go | 2 +- .../{terminator => eviction}/suite_test.go | 63 ++----- .../node/termination/instancetermination.go | 2 +- .../node/termination/suite_test.go | 10 +- .../node/termination/terminator/terminator.go | 177 ------------------ .../node/termination/volumedetachment.go | 5 +- .../nodeclaim/lifecycle/controller.go | 5 +- pkg/controllers/nodeclaim/lifecycle/events.go | 10 + pkg/events/suite_test.go | 45 ++--- pkg/utils/node/node.go | 21 +++ 17 files changed, 328 insertions(+), 360 deletions(-) rename pkg/controllers/node/termination/{terminator/events => }/events.go (74%) create mode 100644 pkg/controllers/node/termination/eviction/events.go rename pkg/controllers/node/termination/{terminator => eviction}/eviction.go (89%) rename pkg/controllers/node/termination/{terminator => eviction}/metrics.go (98%) rename pkg/controllers/node/termination/{terminator => eviction}/suite_test.go (61%) delete mode 100644 pkg/controllers/node/termination/terminator/terminator.go diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index f7caafc463..d69cfece8b 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -36,7 +36,7 @@ import ( metricspod "sigs.k8s.io/karpenter/pkg/controllers/metrics/pod" "sigs.k8s.io/karpenter/pkg/controllers/node/health" "sigs.k8s.io/karpenter/pkg/controllers/node/termination" - "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" + "sigs.k8s.io/karpenter/pkg/controllers/node/termination/eviction" nodeclaimconsistency "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/consistency" nodeclaimdisruption "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/disruption" "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/expiration" @@ -64,7 +64,7 @@ func NewControllers( ) []controller.Controller { cluster := state.NewCluster(clock, kubeClient, cloudProvider) p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster, clock) - evictionQueue := terminator.NewQueue(kubeClient, recorder) + evictionQueue := eviction.NewQueue(kubeClient, recorder) disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p) controllers := []controller.Controller{ @@ -79,7 +79,7 @@ func NewControllers( informer.NewPodController(kubeClient, cluster), informer.NewNodePoolController(kubeClient, cloudProvider, cluster), informer.NewNodeClaimController(kubeClient, cloudProvider, cluster), - termination.NewController(clock, kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder), + termination.NewController(clock, kubeClient, cloudProvider, recorder, evictionQueue), metricspod.NewController(kubeClient, cluster), metricsnodepool.NewController(kubeClient, cloudProvider), metricsnode.NewController(cluster), diff --git a/pkg/controllers/node/health/suite_test.go b/pkg/controllers/node/health/suite_test.go index ee8d46b01d..b2bebb6cf7 100644 --- a/pkg/controllers/node/health/suite_test.go +++ b/pkg/controllers/node/health/suite_test.go @@ -33,7 +33,7 @@ import ( "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/cloudprovider/fake" "sigs.k8s.io/karpenter/pkg/controllers/node/health" - "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" + "sigs.k8s.io/karpenter/pkg/controllers/node/termination/eviction" "sigs.k8s.io/karpenter/pkg/metrics" "sigs.k8s.io/karpenter/pkg/test" . "sigs.k8s.io/karpenter/pkg/test/expectations" @@ -47,7 +47,7 @@ var env *test.Environment var fakeClock *clock.FakeClock var cloudProvider *fake.CloudProvider var recorder *test.EventRecorder -var queue *terminator.Queue +var queue *eviction.Queue func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) @@ -65,7 +65,7 @@ var _ = BeforeSuite(func() { cloudProvider = fake.NewCloudProvider() cloudProvider = fake.NewCloudProvider() recorder = test.NewEventRecorder() - queue = terminator.NewTestingQueue(env.Client, recorder) + queue = eviction.NewTestingQueue(env.Client, recorder) healthController = health.NewController(env.Client, cloudProvider, fakeClock) }) diff --git a/pkg/controllers/node/termination/controller.go b/pkg/controllers/node/termination/controller.go index fe28053f4f..3880eddd7c 100644 --- a/pkg/controllers/node/termination/controller.go +++ b/pkg/controllers/node/termination/controller.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "github.com/samber/lo" "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -41,7 +42,7 @@ import ( v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" - "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" + "sigs.k8s.io/karpenter/pkg/controllers/node/termination/eviction" "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/metrics" "sigs.k8s.io/karpenter/pkg/operator/injection" @@ -53,53 +54,60 @@ type Controller struct { clock clock.Clock kubeClient client.Client cloudProvider cloudprovider.CloudProvider - terminator *terminator.Terminator - reconcilers []terminationReconciler + reconcilers []reconciler } -// TODO (jmdeal@): Split subreconcilers into individual controllers -type terminationReconciler interface { +type reconciler interface { Reconcile(context.Context, *corev1.Node, *v1.NodeClaim) (reconcile.Result, error) } // NewController constructs a controller instance -func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, terminator *terminator.Terminator, recorder events.Recorder) *Controller { +func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, recorder events.Recorder, evictionQueue *eviction.Queue) *Controller { return &Controller{ clock: clk, kubeClient: kubeClient, cloudProvider: cloudProvider, - terminator: terminator, - reconcilers: []terminationReconciler{ - &DrainReconciler{kubeClient, cloudProvider, recorder, terminator}, - &VolumeDetachmentReconciler{kubeClient, clk, recorder}, - &InstanceTerminationReconciler{kubeClient, cloudProvider, clk}, + reconcilers: []reconciler{ + &DrainReconciler{clk, kubeClient, cloudProvider, recorder, evictionQueue}, + &VolumeDetachmentReconciler{clk, kubeClient, recorder}, + &InstanceTerminationReconciler{clk, kubeClient, cloudProvider}, }, } } +func (c *Controller) Register(_ context.Context, m manager.Manager) error { + return controllerruntime.NewControllerManagedBy(m). + Named("node.termination"). + For(&corev1.Node{}, builder.WithPredicates(nodeutils.IsManagedPredicateFuncs(c.cloudProvider))). + Watches(&v1.NodeClaim{}, nodeutils.NodeClaimEventHandler(c.kubeClient, c.cloudProvider)). + WithOptions( + controller.Options{ + RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request]( + workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](100*time.Millisecond, 10*time.Second), + // 10 qps, 100 bucket size + &workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), + MaxConcurrentReconciles: 100, + }, + ). + Complete(reconcile.AsReconciler(m.GetClient(), c)) +} + // nolint:gocyclo func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.Result, error) { ctx = injection.WithControllerName(ctx, "node.termination") ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(n.Namespace, n.Name))) - if !nodeutils.IsManaged(n, c.cloudProvider) { - return reconcile.Result{}, nil - } + if n.GetDeletionTimestamp().IsZero() { return reconcile.Result{}, nil } if !controllerutil.ContainsFinalizer(n, v1.TerminationFinalizer) { return reconcile.Result{}, nil } - - if err := c.terminator.Taint(ctx, n, v1.DisruptedNoScheduleTaint); err != nil { - if errors.IsNotFound(err) { - return reconcile.Result{}, nil - } - if errors.IsConflict(err) { - return reconcile.Result{Requeue: true}, nil - } - return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err) + if !nodeutils.IsManaged(n, c.cloudProvider) { + return reconcile.Result{}, nil } + nc, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, n) if err != nil { if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) { @@ -115,16 +123,55 @@ func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.R } } + if err := c.prepareNode(ctx, n); err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err) + } + for _, r := range c.reconcilers { res, err := r.Reconcile(ctx, n, nc) if res.Requeue || res.RequeueAfter != 0 || err != nil { return res, err } } - return reconcile.Result{}, nil } +// prepareNode ensures that the node is ready to begin the drain / termination process. This includes ensuring that it +// is tainted appropriately and annotated to be ignored by external load balancers. +func (c *Controller) prepareNode(ctx context.Context, n *corev1.Node) error { + stored := n.DeepCopy() + + // Add the karpenter.sh/disrupted:NoSchedule taint to ensure no additional pods schedule to the Node during the + // drain process. + if !lo.ContainsBy(n.Spec.Taints, func(t corev1.Taint) bool { + return t.MatchTaint(&v1.DisruptedNoScheduleTaint) + }) { + n.Spec.Taints = append(n.Spec.Taints, v1.DisruptedNoScheduleTaint) + } + // Adding this label to the node ensures that the node is removed from the load-balancer target group while it is + // draining and before it is terminated. This prevents 500s coming prior to health check when the load balancer + // controller hasn't yet determined that the node and underlying connections are gone. + // https://github.com/aws/aws-node-termination-handler/issues/316 + // https://github.com/aws/karpenter/pull/2518 + n.Labels = lo.Assign(n.Labels, map[string]string{ + corev1.LabelNodeExcludeBalancers: "karpenter", + }) + + if equality.Semantic.DeepEqual(n, stored) { + return nil + } + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch can cause races due + // to the fact that it fully replaces the list on a change. Here, we are updating the taint list. + return c.kubeClient.Patch(ctx, n, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})) +} + +// removeFinalizer removes Karpenter's termination finalizer from the given Node, updating the termination metrics in the process. func removeFinalizer(ctx context.Context, kubeClient client.Client, n *corev1.Node) error { stored := n.DeepCopy() controllerutil.RemoveFinalizer(n, v1.TerminationFinalizer) @@ -153,20 +200,3 @@ func removeFinalizer(ctx context.Context, kubeClient client.Client, n *corev1.No } return nil } - -func (c *Controller) Register(_ context.Context, m manager.Manager) error { - return controllerruntime.NewControllerManagedBy(m). - Named("node.termination"). - For(&corev1.Node{}, builder.WithPredicates(nodeutils.IsManagedPredicateFuncs(c.cloudProvider))). - WithOptions( - controller.Options{ - RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request]( - workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](100*time.Millisecond, 10*time.Second), - // 10 qps, 100 bucket size - &workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, - ), - MaxConcurrentReconciles: 100, - }, - ). - Complete(reconcile.AsReconciler(m.GetClient(), c)) -} diff --git a/pkg/controllers/node/termination/drain.go b/pkg/controllers/node/termination/drain.go index e18c13f7fc..d95c7a3183 100644 --- a/pkg/controllers/node/termination/drain.go +++ b/pkg/controllers/node/termination/drain.go @@ -18,30 +18,50 @@ package termination import ( "context" + "errors" "fmt" "time" + "github.com/samber/lo" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" - "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" - terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" + "sigs.k8s.io/karpenter/pkg/controllers/node/termination/eviction" "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/metrics" nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" "sigs.k8s.io/karpenter/pkg/utils/nodeclaim" + podutils "sigs.k8s.io/karpenter/pkg/utils/pod" ) +type nodeDrainError struct { + error +} + +func newNodeDrainError(err error) *nodeDrainError { + return &nodeDrainError{error: err} +} + +func isNodeDrainError(err error) bool { + if err == nil { + return false + } + var nodeDrainErr *nodeDrainError + return errors.As(err, &nodeDrainErr) +} + type DrainReconciler struct { + clock clock.Clock kubeClient client.Client cloudProvider cloudprovider.CloudProvider recorder events.Recorder - terminator *terminator.Terminator + evictionQueue *eviction.Queue } // nolint:gocyclo @@ -56,11 +76,11 @@ func (d *DrainReconciler) Reconcile(ctx context.Context, n *corev1.Node, nc *v1. return reconcile.Result{}, nil } if tgpExpirationTime != nil { - d.recorder.Publish(terminatorevents.NodeTerminationGracePeriodExpiring(n, *tgpExpirationTime)) + d.recorder.Publish(NodeTerminationGracePeriodExpiringEvent(n, *tgpExpirationTime)) } - if err := d.terminator.Drain(ctx, n, tgpExpirationTime); err != nil { - if !terminator.IsNodeDrainError(err) { + if err := d.drain(ctx, n, tgpExpirationTime); err != nil { + if !isNodeDrainError(err) { return reconcile.Result{}, fmt.Errorf("draining node, %w", err) } // If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully draining @@ -79,26 +99,26 @@ func (d *DrainReconciler) Reconcile(ctx context.Context, n *corev1.Node, nc *v1. stored := nc.DeepCopy() if nc.StatusConditions().SetFalse(v1.ConditionTypeDrained, "Draining", "Draining") { if err := d.kubeClient.Status().Patch(ctx, nc, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { - if errors.IsNotFound(err) { + if apierrors.IsNotFound(err) { return reconcile.Result{}, nil } - if errors.IsConflict(err) { + if apierrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } return reconcile.Result{}, err } } - d.recorder.Publish(terminatorevents.NodeFailedToDrain(n, err)) + d.recorder.Publish(NodeDrainFailedEvent(n, err)) return reconcile.Result{RequeueAfter: 5 * time.Second}, nil } stored := nc.DeepCopy() _ = nc.StatusConditions().SetTrue(v1.ConditionTypeDrained) if err := d.kubeClient.Status().Patch(ctx, nc, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil { - if errors.IsNotFound(err) { + if apierrors.IsNotFound(err) { return reconcile.Result{}, nil } - if errors.IsConflict(err) { + if apierrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } return reconcile.Result{}, err @@ -108,3 +128,88 @@ func (d *DrainReconciler) Reconcile(ctx context.Context, n *corev1.Node, nc *v1. }) return reconcile.Result{}, nil } + +// Drain evicts pods from the node and returns true when all pods are evicted +// https://kubernetes.io/docs/concepts/architecture/nodes/#graceful-node-shutdown +func (d *DrainReconciler) drain(ctx context.Context, node *corev1.Node, nodeGracePeriodExpirationTime *time.Time) error { + pods, err := nodeutils.GetPods(ctx, d.kubeClient, node) + if err != nil { + return fmt.Errorf("listing pods on node, %w", err) + } + podsToDelete := lo.Filter(pods, func(p *corev1.Pod, _ int) bool { + return podutils.IsWaitingEviction(p, d.clock) && !podutils.IsTerminating(p) + }) + if err := d.DeleteExpiringPods(ctx, podsToDelete, nodeGracePeriodExpirationTime); err != nil { + return fmt.Errorf("deleting expiring pods, %w", err) + } + // Monitor pods in pod groups that either haven't been evicted or are actively evicting + podGroups := d.groupPodsByPriority(lo.Filter(pods, func(p *corev1.Pod, _ int) bool { return podutils.IsWaitingEviction(p, d.clock) })) + for _, group := range podGroups { + if len(group) > 0 { + // Only add pods to the eviction queue that haven't been evicted yet + d.evictionQueue.Add(lo.Filter(group, func(p *corev1.Pod, _ int) bool { return podutils.IsEvictable(p) })...) + return newNodeDrainError(fmt.Errorf("%d pods are waiting to be evicted", lo.SumBy(podGroups, func(pods []*corev1.Pod) int { return len(pods) }))) + } + } + return nil +} + +func (d *DrainReconciler) groupPodsByPriority(pods []*corev1.Pod) [][]*corev1.Pod { + // 1. Prioritize noncritical pods, non-daemon pods https://kubernetes.io/docs/concepts/architecture/nodes/#graceful-node-shutdown + var nonCriticalNonDaemon, nonCriticalDaemon, criticalNonDaemon, criticalDaemon []*corev1.Pod + for _, pod := range pods { + if pod.Spec.PriorityClassName == "system-cluster-critical" || pod.Spec.PriorityClassName == "system-node-critical" { + if podutils.IsOwnedByDaemonSet(pod) { + criticalDaemon = append(criticalDaemon, pod) + } else { + criticalNonDaemon = append(criticalNonDaemon, pod) + } + } else { + if podutils.IsOwnedByDaemonSet(pod) { + nonCriticalDaemon = append(nonCriticalDaemon, pod) + } else { + nonCriticalNonDaemon = append(nonCriticalNonDaemon, pod) + } + } + } + return [][]*corev1.Pod{nonCriticalNonDaemon, nonCriticalDaemon, criticalNonDaemon, criticalDaemon} +} + +func (d *DrainReconciler) DeleteExpiringPods(ctx context.Context, pods []*corev1.Pod, nodeGracePeriodTerminationTime *time.Time) error { + for _, pod := range pods { + // check if the node has an expiration time and the pod needs to be deleted + deleteTime := d.podDeleteTimeWithGracePeriod(nodeGracePeriodTerminationTime, pod) + if deleteTime != nil && time.Now().After(*deleteTime) { + // delete pod proactively to give as much of its terminationGracePeriodSeconds as possible for deletion + // ensure that we clamp the maximum pod terminationGracePeriodSeconds to the node's remaining expiration time in the delete command + gracePeriodSeconds := lo.ToPtr(int64(time.Until(*nodeGracePeriodTerminationTime).Seconds())) + d.recorder.Publish(PodDeletedEvent(pod, gracePeriodSeconds, nodeGracePeriodTerminationTime)) + opts := &client.DeleteOptions{ + GracePeriodSeconds: gracePeriodSeconds, + } + if err := d.kubeClient.Delete(ctx, pod, opts); err != nil && !apierrors.IsNotFound(err) { // ignore 404, not a problem + return fmt.Errorf("deleting pod, %w", err) // otherwise, bubble up the error + } + log.FromContext(ctx).WithValues( + "namespace", pod.Namespace, + "name", pod.Name, + "pod.terminationGracePeriodSeconds", *pod.Spec.TerminationGracePeriodSeconds, + "delete.gracePeriodSeconds", *gracePeriodSeconds, + "nodeclaim.terminationTime", *nodeGracePeriodTerminationTime, + ).V(1).Info("deleting pod") + } + } + return nil +} + +// if a pod should be deleted to give it the full terminationGracePeriodSeconds of time before the node will shut down, return the time the pod should be deleted +func (*DrainReconciler) podDeleteTimeWithGracePeriod(nodeGracePeriodExpirationTime *time.Time, pod *corev1.Pod) *time.Time { + if nodeGracePeriodExpirationTime == nil || pod.Spec.TerminationGracePeriodSeconds == nil { // k8s defaults to 30s, so we should never see a nil TerminationGracePeriodSeconds + return nil + } + + // calculate the time the pod should be deleted to allow it's full grace period for termination, equal to its terminationGracePeriodSeconds before the node's expiration time + // eg: if a node will be force terminated in 30m, but the current pod has a grace period of 45m, we return a time of 15m ago + deleteTime := nodeGracePeriodExpirationTime.Add(time.Duration(*pod.Spec.TerminationGracePeriodSeconds) * time.Second * -1) + return &deleteTime +} diff --git a/pkg/controllers/node/termination/terminator/events/events.go b/pkg/controllers/node/termination/events.go similarity index 74% rename from pkg/controllers/node/termination/terminator/events/events.go rename to pkg/controllers/node/termination/events.go index 3c39e3cc1e..91ffbc26d5 100644 --- a/pkg/controllers/node/termination/terminator/events/events.go +++ b/pkg/controllers/node/termination/events.go @@ -14,41 +14,30 @@ See the License for the specific language governing permissions and limitations under the License. */ -package events +package termination import ( "fmt" "time" corev1 "k8s.io/api/core/v1" - storagev1 "k8s.io/api/storage/v1" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/events" ) -func EvictPod(pod *corev1.Pod) events.Event { - return events.Event{ - InvolvedObject: pod, - Type: corev1.EventTypeNormal, - Reason: "Evicted", - Message: "Evicted pod", - DedupeValues: []string{pod.Name}, - } -} - -func DisruptPodDelete(pod *corev1.Pod, gracePeriodSeconds *int64, nodeGracePeriodTerminationTime *time.Time) events.Event { +func PodDeletedEvent(pod *corev1.Pod, gracePeriodSeconds *int64, nodeGracePeriodTerminationTime *time.Time) events.Event { return events.Event{ InvolvedObject: pod, Type: corev1.EventTypeNormal, Reason: "Disrupted", Message: fmt.Sprintf("Deleting the pod to accommodate the terminationTime %v of the node. The pod was granted %v seconds of grace-period of its %v terminationGracePeriodSeconds. This bypasses the PDB of the pod and the do-not-disrupt annotation.", *nodeGracePeriodTerminationTime, *gracePeriodSeconds, pod.Spec.TerminationGracePeriodSeconds), - DedupeValues: []string{pod.Name}, + DedupeValues: []string{pod.Namespace, pod.Name}, } } -func NodeFailedToDrain(node *corev1.Node, err error) events.Event { +func NodeDrainFailedEvent(node *corev1.Node, err error) events.Event { return events.Event{ InvolvedObject: node, Type: corev1.EventTypeWarning, @@ -58,17 +47,17 @@ func NodeFailedToDrain(node *corev1.Node, err error) events.Event { } } -func NodeAwaitingVolumeDetachment(node *corev1.Node, volumeAttachments ...*storagev1.VolumeAttachment) events.Event { +func NodeAwaitingVolumeDetachmentEvent(node *corev1.Node, volumeAttachments ...*storagev1.VolumeAttachment) events.Event { return events.Event{ InvolvedObject: node, - Type: corev1.EventTypeWarning, + Type: corev1.EventTypeNormal, Reason: "AwaitingVolumeDetachment", Message: fmt.Sprintf("Awaiting deletion of %d VolumeAttachments bound to node", len(volumeAttachments)), DedupeValues: []string{node.Name}, } } -func NodeTerminationGracePeriodExpiring(node *corev1.Node, t time.Time) events.Event { +func NodeTerminationGracePeriodExpiringEvent(node *corev1.Node, t time.Time) events.Event { return events.Event{ InvolvedObject: node, Type: corev1.EventTypeWarning, @@ -78,7 +67,7 @@ func NodeTerminationGracePeriodExpiring(node *corev1.Node, t time.Time) events.E } } -func NodeClaimTerminationGracePeriodExpiring(nodeClaim *v1.NodeClaim, terminationTime string) events.Event { +func NodeClaimTerminationGracePeriodExpiringEvent(nodeClaim *v1.NodeClaim, terminationTime string) events.Event { return events.Event{ InvolvedObject: nodeClaim, Type: corev1.EventTypeWarning, diff --git a/pkg/controllers/node/termination/eviction/events.go b/pkg/controllers/node/termination/eviction/events.go new file mode 100644 index 0000000000..ff53cb0924 --- /dev/null +++ b/pkg/controllers/node/termination/eviction/events.go @@ -0,0 +1,43 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eviction + +import ( + corev1 "k8s.io/api/core/v1" + + "sigs.k8s.io/karpenter/pkg/events" +) + +func PodEvictedEvent(pod *corev1.Pod) events.Event { + return events.Event{ + InvolvedObject: pod, + Type: corev1.EventTypeNormal, + Reason: "Evicted", + Message: "Evicted pod", + DedupeValues: []string{pod.Namespace, pod.Name}, + } +} + +func PodEvictionFailedEvent(pod *corev1.Pod, message string) events.Event { + return events.Event{ + InvolvedObject: pod, + Type: corev1.EventTypeWarning, + Reason: "FailedEviction", + Message: message, + DedupeValues: []string{pod.Namespace, pod.Name}, + } +} diff --git a/pkg/controllers/node/termination/terminator/eviction.go b/pkg/controllers/node/termination/eviction/eviction.go similarity index 89% rename from pkg/controllers/node/termination/terminator/eviction.go rename to pkg/controllers/node/termination/eviction/eviction.go index 4401837067..1682b67a74 100644 --- a/pkg/controllers/node/termination/terminator/eviction.go +++ b/pkg/controllers/node/termination/eviction/eviction.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package terminator +package eviction import ( "context" @@ -40,7 +40,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/operator/injection" ) @@ -50,22 +49,6 @@ const ( evictionQueueMaxDelay = 10 * time.Second ) -type NodeDrainError struct { - error -} - -func NewNodeDrainError(err error) *NodeDrainError { - return &NodeDrainError{error: err} -} - -func IsNodeDrainError(err error) bool { - if err == nil { - return false - } - var nodeDrainErr *NodeDrainError - return errors.As(err, &nodeDrainErr) -} - type QueueKey struct { types.NamespacedName UID types.UID @@ -195,16 +178,16 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool { return true } if apierrors.IsTooManyRequests(err) { // 429 - PDB violation - q.recorder.Publish(terminatorevents.NodeFailedToDrain(&corev1.Node{ObjectMeta: metav1.ObjectMeta{ + q.recorder.Publish(PodEvictionFailedEvent(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{ Name: key.Name, Namespace: key.Namespace, - }}, fmt.Errorf("evicting pod %s/%s violates a PDB", key.Namespace, key.Name))) + }}, fmt.Sprintf("evicting pod %s/%s violated a PDB", key.Namespace, key.Name))) return false } log.FromContext(ctx).Error(err, "failed evicting pod") return false } NodesEvictionRequestsTotal.Inc(map[string]string{CodeLabel: "200"}) - q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}})) + q.recorder.Publish(PodEvictedEvent(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}})) return true } diff --git a/pkg/controllers/node/termination/terminator/metrics.go b/pkg/controllers/node/termination/eviction/metrics.go similarity index 98% rename from pkg/controllers/node/termination/terminator/metrics.go rename to pkg/controllers/node/termination/eviction/metrics.go index c2c2fb381b..aecc80c115 100644 --- a/pkg/controllers/node/termination/terminator/metrics.go +++ b/pkg/controllers/node/termination/eviction/metrics.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package terminator +package eviction import ( opmetrics "github.com/awslabs/operatorpkg/metrics" diff --git a/pkg/controllers/node/termination/terminator/suite_test.go b/pkg/controllers/node/termination/eviction/suite_test.go similarity index 61% rename from pkg/controllers/node/termination/terminator/suite_test.go rename to pkg/controllers/node/termination/eviction/suite_test.go index 2b0e3a72d4..da867f37d3 100644 --- a/pkg/controllers/node/termination/terminator/suite_test.go +++ b/pkg/controllers/node/termination/eviction/suite_test.go @@ -14,13 +14,12 @@ See the License for the specific language governing permissions and limitations under the License. */ -package terminator_test +package eviction_test import ( "context" "sync" "testing" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -30,11 +29,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" - clock "k8s.io/utils/clock/testing" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/karpenter/pkg/apis" - "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" + "sigs.k8s.io/karpenter/pkg/controllers/node/termination/eviction" "sigs.k8s.io/karpenter/pkg/operator/options" "sigs.k8s.io/karpenter/pkg/test" . "sigs.k8s.io/karpenter/pkg/test/expectations" @@ -45,11 +43,9 @@ import ( var ctx context.Context var env *test.Environment var recorder *test.EventRecorder -var queue *terminator.Queue +var queue *eviction.Queue var pdb *policyv1.PodDisruptionBudget var pod *corev1.Pod -var fakeClock *clock.FakeClock -var terminatorInstance *terminator.Terminator func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) @@ -61,8 +57,7 @@ var _ = BeforeSuite(func() { env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...)) ctx = options.ToContext(ctx, test.Options()) recorder = test.NewEventRecorder() - queue = terminator.NewTestingQueue(env.Client, recorder) - terminatorInstance = terminator.NewTerminator(fakeClock, env.Client, queue, recorder) + queue = eviction.NewTestingQueue(env.Client, recorder) }) var _ = AfterSuite(func() { @@ -72,7 +67,7 @@ var _ = AfterSuite(func() { var _ = BeforeEach(func() { recorder.Reset() // Reset the events that we captured during the run // Shut down the queue and restart it to ensure no races - *queue = lo.FromPtr(terminator.NewTestingQueue(env.Client, recorder)) + *queue = lo.FromPtr(eviction.NewTestingQueue(env.Client, recorder)) }) var _ = AfterEach(func() { @@ -92,24 +87,24 @@ var _ = Describe("Eviction/Queue", func() { Labels: testLabels, }, }) - terminator.NodesEvictionRequestsTotal.Reset() + eviction.NodesEvictionRequestsTotal.Reset() }) Context("Eviction API", func() { It("should succeed with no event when the pod is not found", func() { - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue()) + Expect(queue.Evict(ctx, eviction.NewQueueKey(pod))).To(BeTrue()) Expect(recorder.Events()).To(HaveLen(0)) }) It("should succeed with no event when the pod UID conflicts", func() { ExpectApplied(ctx, env.Client, pod) - Expect(queue.Evict(ctx, terminator.QueueKey{NamespacedName: client.ObjectKeyFromObject(pod), UID: uuid.NewUUID()})).To(BeTrue()) - ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{terminator.CodeLabel: "409"}) + Expect(queue.Evict(ctx, eviction.QueueKey{NamespacedName: client.ObjectKeyFromObject(pod), UID: uuid.NewUUID()})).To(BeTrue()) + ExpectMetricCounterValue(eviction.NodesEvictionRequestsTotal, 1, map[string]string{eviction.CodeLabel: "409"}) Expect(recorder.Events()).To(HaveLen(0)) }) It("should succeed with an evicted event when there are no PDBs", func() { ExpectApplied(ctx, env.Client, pod) - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue()) - ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{terminator.CodeLabel: "200"}) + Expect(queue.Evict(ctx, eviction.NewQueueKey(pod))).To(BeTrue()) + ExpectMetricCounterValue(eviction.NodesEvictionRequestsTotal, 1, map[string]string{eviction.CodeLabel: "200"}) Expect(recorder.Calls("Evicted")).To(Equal(1)) }) It("should succeed with no event when there are PDBs that allow an eviction", func() { @@ -118,12 +113,12 @@ var _ = Describe("Eviction/Queue", func() { MaxUnavailable: &intstr.IntOrString{IntVal: 1}, }) ExpectApplied(ctx, env.Client, pod) - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue()) + Expect(queue.Evict(ctx, eviction.NewQueueKey(pod))).To(BeTrue()) Expect(recorder.Calls("Evicted")).To(Equal(1)) }) It("should return a NodeDrainError event when a PDB is blocking", func() { ExpectApplied(ctx, env.Client, pdb, pod) - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeFalse()) + Expect(queue.Evict(ctx, eviction.NewQueueKey(pod))).To(BeFalse()) Expect(recorder.Calls("FailedDraining")).To(Equal(1)) }) It("should fail when two PDBs refer to the same pod", func() { @@ -132,8 +127,8 @@ var _ = Describe("Eviction/Queue", func() { MaxUnavailable: &intstr.IntOrString{IntVal: 0}, }) ExpectApplied(ctx, env.Client, pdb, pdb2, pod) - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeFalse()) - ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{terminator.CodeLabel: "500"}) + Expect(queue.Evict(ctx, eviction.NewQueueKey(pod))).To(BeFalse()) + ExpectMetricCounterValue(eviction.NodesEvictionRequestsTotal, 1, map[string]string{eviction.CodeLabel: "500"}) }) It("should ensure that calling Evict() is valid while making Add() calls", func() { cancelCtx, cancel := context.WithCancel(ctx) @@ -162,32 +157,4 @@ var _ = Describe("Eviction/Queue", func() { } }) }) - - Context("Pod Deletion API", func() { - It("should not delete a pod with no nodeTerminationTime", func() { - ExpectApplied(ctx, env.Client, pod) - - Expect(terminatorInstance.DeleteExpiringPods(ctx, []*corev1.Pod{pod}, nil)).To(Succeed()) - ExpectExists(ctx, env.Client, pod) - Expect(recorder.Calls("Disrupted")).To(Equal(0)) - }) - It("should not delete a pod with terminationGracePeriodSeconds still remaining before nodeTerminationTime", func() { - pod.Spec.TerminationGracePeriodSeconds = lo.ToPtr[int64](60) - ExpectApplied(ctx, env.Client, pod) - - nodeTerminationTime := time.Now().Add(time.Minute * 5) - Expect(terminatorInstance.DeleteExpiringPods(ctx, []*corev1.Pod{pod}, &nodeTerminationTime)).To(Succeed()) - ExpectExists(ctx, env.Client, pod) - Expect(recorder.Calls("Disrupted")).To(Equal(0)) - }) - It("should delete a pod with less than terminationGracePeriodSeconds remaining before nodeTerminationTime", func() { - pod.Spec.TerminationGracePeriodSeconds = lo.ToPtr[int64](120) - ExpectApplied(ctx, env.Client, pod) - - nodeTerminationTime := time.Now().Add(time.Minute * 1) - Expect(terminatorInstance.DeleteExpiringPods(ctx, []*corev1.Pod{pod}, &nodeTerminationTime)).To(Succeed()) - ExpectNotFound(ctx, env.Client, pod) - Expect(recorder.Calls("Disrupted")).To(Equal(1)) - }) - }) }) diff --git a/pkg/controllers/node/termination/instancetermination.go b/pkg/controllers/node/termination/instancetermination.go index 2e97c4edd0..5ea7086e43 100644 --- a/pkg/controllers/node/termination/instancetermination.go +++ b/pkg/controllers/node/termination/instancetermination.go @@ -35,9 +35,9 @@ import ( ) type InstanceTerminationReconciler struct { + clk clock.Clock kubeClient client.Client cloudProvider cloudprovider.CloudProvider - clk clock.Clock } func (i *InstanceTerminationReconciler) Reconcile(ctx context.Context, n *corev1.Node, nc *v1.NodeClaim) (reconcile.Result, error) { diff --git a/pkg/controllers/node/termination/suite_test.go b/pkg/controllers/node/termination/suite_test.go index b193deba72..deb6a5c629 100644 --- a/pkg/controllers/node/termination/suite_test.go +++ b/pkg/controllers/node/termination/suite_test.go @@ -34,7 +34,7 @@ import ( v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider/fake" "sigs.k8s.io/karpenter/pkg/controllers/node/termination" - "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" + "sigs.k8s.io/karpenter/pkg/controllers/node/termination/eviction" "sigs.k8s.io/karpenter/pkg/metrics" "sigs.k8s.io/karpenter/pkg/test" . "sigs.k8s.io/karpenter/pkg/test/expectations" @@ -49,7 +49,7 @@ var defaultOwnerRefs = []metav1.OwnerReference{{Kind: "ReplicaSet", APIVersion: var fakeClock *clock.FakeClock var cloudProvider *fake.CloudProvider var recorder *test.EventRecorder -var queue *terminator.Queue +var queue *eviction.Queue func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) @@ -67,8 +67,8 @@ var _ = BeforeSuite(func() { cloudProvider = fake.NewCloudProvider() recorder = test.NewEventRecorder() - queue = terminator.NewTestingQueue(env.Client, recorder) - terminationController = termination.NewController(fakeClock, env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder) + queue = eviction.NewTestingQueue(env.Client, recorder) + terminationController = termination.NewController(fakeClock, env.Client, cloudProvider, recorder, queue) }) var _ = AfterSuite(func() { @@ -83,7 +83,7 @@ var _ = Describe("Termination", func() { BeforeEach(func() { fakeClock.SetTime(time.Now()) cloudProvider.Reset() - *queue = lo.FromPtr(terminator.NewTestingQueue(env.Client, recorder)) + *queue = lo.FromPtr(eviction.NewTestingQueue(env.Client, recorder)) nodePool = test.NodePool() nodeClaim, node = test.NodeClaimAndNode(v1.NodeClaim{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1.TerminationFinalizer}}}) diff --git a/pkg/controllers/node/termination/terminator/terminator.go b/pkg/controllers/node/termination/terminator/terminator.go deleted file mode 100644 index 82940aa8ef..0000000000 --- a/pkg/controllers/node/termination/terminator/terminator.go +++ /dev/null @@ -1,177 +0,0 @@ -/* -Copyright The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package terminator - -import ( - "context" - "fmt" - "time" - - "github.com/samber/lo" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/utils/clock" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" - - terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" - "sigs.k8s.io/karpenter/pkg/events" - nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" - podutil "sigs.k8s.io/karpenter/pkg/utils/pod" -) - -type Terminator struct { - clock clock.Clock - kubeClient client.Client - evictionQueue *Queue - recorder events.Recorder -} - -func NewTerminator(clk clock.Clock, kubeClient client.Client, eq *Queue, recorder events.Recorder) *Terminator { - return &Terminator{ - clock: clk, - kubeClient: kubeClient, - evictionQueue: eq, - recorder: recorder, - } -} - -// Taint idempotently adds a given taint to a node with a NodeClaim -func (t *Terminator) Taint(ctx context.Context, node *corev1.Node, taint corev1.Taint) error { - stored := node.DeepCopy() - // If the node already has the correct taint (key and effect), do nothing. - if _, ok := lo.Find(node.Spec.Taints, func(t corev1.Taint) bool { - return t.MatchTaint(&taint) - }); !ok { - // Otherwise, if the taint key exists (but with a different effect), remove it. - node.Spec.Taints = lo.Reject(node.Spec.Taints, func(t corev1.Taint, _ int) bool { - return t.Key == taint.Key - }) - node.Spec.Taints = append(node.Spec.Taints, taint) - } - // Adding this label to the node ensures that the node is removed from the load-balancer target group - // while it is draining and before it is terminated. This prevents 500s coming prior to health check - // when the load balancer controller hasn't yet determined that the node and underlying connections are gone - // https://github.com/aws/aws-node-termination-handler/issues/316 - // https://github.com/aws/karpenter/pull/2518 - node.Labels = lo.Assign(node.Labels, map[string]string{ - corev1.LabelNodeExcludeBalancers: "karpenter", - }) - if !equality.Semantic.DeepEqual(node, stored) { - // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch - // can cause races due to the fact that it fully replaces the list on a change - // Here, we are updating the taint list - if err := t.kubeClient.Patch(ctx, node, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { - return err - } - taintValues := []any{ - "taint.Key", taint.Key, - "taint.Value", taint.Value, - } - if len(string(taint.Effect)) > 0 { - taintValues = append(taintValues, "taint.Effect", taint.Effect) - } - log.FromContext(ctx).WithValues(taintValues...).Info("tainted node") - } - return nil -} - -// Drain evicts pods from the node and returns true when all pods are evicted -// https://kubernetes.io/docs/concepts/architecture/nodes/#graceful-node-shutdown -func (t *Terminator) Drain(ctx context.Context, node *corev1.Node, nodeGracePeriodExpirationTime *time.Time) error { - pods, err := nodeutils.GetPods(ctx, t.kubeClient, node) - if err != nil { - return fmt.Errorf("listing pods on node, %w", err) - } - podsToDelete := lo.Filter(pods, func(p *corev1.Pod, _ int) bool { - return podutil.IsWaitingEviction(p, t.clock) && !podutil.IsTerminating(p) - }) - if err := t.DeleteExpiringPods(ctx, podsToDelete, nodeGracePeriodExpirationTime); err != nil { - return fmt.Errorf("deleting expiring pods, %w", err) - } - // Monitor pods in pod groups that either haven't been evicted or are actively evicting - podGroups := t.groupPodsByPriority(lo.Filter(pods, func(p *corev1.Pod, _ int) bool { return podutil.IsWaitingEviction(p, t.clock) })) - for _, group := range podGroups { - if len(group) > 0 { - // Only add pods to the eviction queue that haven't been evicted yet - t.evictionQueue.Add(lo.Filter(group, func(p *corev1.Pod, _ int) bool { return podutil.IsEvictable(p) })...) - return NewNodeDrainError(fmt.Errorf("%d pods are waiting to be evicted", lo.SumBy(podGroups, func(pods []*corev1.Pod) int { return len(pods) }))) - } - } - return nil -} - -func (t *Terminator) groupPodsByPriority(pods []*corev1.Pod) [][]*corev1.Pod { - // 1. Prioritize noncritical pods, non-daemon pods https://kubernetes.io/docs/concepts/architecture/nodes/#graceful-node-shutdown - var nonCriticalNonDaemon, nonCriticalDaemon, criticalNonDaemon, criticalDaemon []*corev1.Pod - for _, pod := range pods { - if pod.Spec.PriorityClassName == "system-cluster-critical" || pod.Spec.PriorityClassName == "system-node-critical" { - if podutil.IsOwnedByDaemonSet(pod) { - criticalDaemon = append(criticalDaemon, pod) - } else { - criticalNonDaemon = append(criticalNonDaemon, pod) - } - } else { - if podutil.IsOwnedByDaemonSet(pod) { - nonCriticalDaemon = append(nonCriticalDaemon, pod) - } else { - nonCriticalNonDaemon = append(nonCriticalNonDaemon, pod) - } - } - } - return [][]*corev1.Pod{nonCriticalNonDaemon, nonCriticalDaemon, criticalNonDaemon, criticalDaemon} -} - -func (t *Terminator) DeleteExpiringPods(ctx context.Context, pods []*corev1.Pod, nodeGracePeriodTerminationTime *time.Time) error { - for _, pod := range pods { - // check if the node has an expiration time and the pod needs to be deleted - deleteTime := t.podDeleteTimeWithGracePeriod(nodeGracePeriodTerminationTime, pod) - if deleteTime != nil && time.Now().After(*deleteTime) { - // delete pod proactively to give as much of its terminationGracePeriodSeconds as possible for deletion - // ensure that we clamp the maximum pod terminationGracePeriodSeconds to the node's remaining expiration time in the delete command - gracePeriodSeconds := lo.ToPtr(int64(time.Until(*nodeGracePeriodTerminationTime).Seconds())) - t.recorder.Publish(terminatorevents.DisruptPodDelete(pod, gracePeriodSeconds, nodeGracePeriodTerminationTime)) - opts := &client.DeleteOptions{ - GracePeriodSeconds: gracePeriodSeconds, - } - if err := t.kubeClient.Delete(ctx, pod, opts); err != nil && !apierrors.IsNotFound(err) { // ignore 404, not a problem - return fmt.Errorf("deleting pod, %w", err) // otherwise, bubble up the error - } - log.FromContext(ctx).WithValues( - "namespace", pod.Namespace, - "name", pod.Name, - "pod.terminationGracePeriodSeconds", *pod.Spec.TerminationGracePeriodSeconds, - "delete.gracePeriodSeconds", *gracePeriodSeconds, - "nodeclaim.terminationTime", *nodeGracePeriodTerminationTime, - ).V(1).Info("deleting pod") - } - } - return nil -} - -// if a pod should be deleted to give it the full terminationGracePeriodSeconds of time before the node will shut down, return the time the pod should be deleted -func (t *Terminator) podDeleteTimeWithGracePeriod(nodeGracePeriodExpirationTime *time.Time, pod *corev1.Pod) *time.Time { - if nodeGracePeriodExpirationTime == nil || pod.Spec.TerminationGracePeriodSeconds == nil { // k8s defaults to 30s, so we should never see a nil TerminationGracePeriodSeconds - return nil - } - - // calculate the time the pod should be deleted to allow it's full grace period for termination, equal to its terminationGracePeriodSeconds before the node's expiration time - // eg: if a node will be force terminated in 30m, but the current pod has a grace period of 45m, we return a time of 15m ago - deleteTime := nodeGracePeriodExpirationTime.Add(time.Duration(*pod.Spec.TerminationGracePeriodSeconds) * time.Second * -1) - return &deleteTime -} diff --git a/pkg/controllers/node/termination/volumedetachment.go b/pkg/controllers/node/termination/volumedetachment.go index 109d80d1a3..9b445ee02a 100644 --- a/pkg/controllers/node/termination/volumedetachment.go +++ b/pkg/controllers/node/termination/volumedetachment.go @@ -30,7 +30,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" - terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" "sigs.k8s.io/karpenter/pkg/events" storagev1 "k8s.io/api/storage/v1" @@ -42,8 +41,8 @@ import ( ) type VolumeDetachmentReconciler struct { - kubeClient client.Client clk clock.Clock + kubeClient client.Client recorder events.Recorder } @@ -78,7 +77,7 @@ func (v *VolumeDetachmentReconciler) Reconcile(ctx context.Context, n *corev1.No } return reconcile.Result{}, err } - v.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachment(n, blockingVAs...)) + v.recorder.Publish(NodeAwaitingVolumeDetachmentEvent(n, blockingVAs...)) return reconcile.Result{RequeueAfter: 5 * time.Second}, nil } diff --git a/pkg/controllers/nodeclaim/lifecycle/controller.go b/pkg/controllers/nodeclaim/lifecycle/controller.go index 89401dfdae..dd9aea7bb9 100644 --- a/pkg/controllers/nodeclaim/lifecycle/controller.go +++ b/pkg/controllers/nodeclaim/lifecycle/controller.go @@ -40,8 +40,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" - v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/events" @@ -282,7 +280,6 @@ func (c *Controller) annotateTerminationGracePeriodTerminationTime(ctx context.C return client.IgnoreNotFound(err) } log.FromContext(ctx).WithValues(v1.NodeClaimTerminationTimestampAnnotationKey, terminationTime).Info("annotated nodeclaim") - c.recorder.Publish(terminatorevents.NodeClaimTerminationGracePeriodExpiring(nodeClaim, terminationTime)) - + c.recorder.Publish(NodeClaimTerminationGracePeriodExpiringEvent(nodeClaim, terminationTime)) return nil } diff --git a/pkg/controllers/nodeclaim/lifecycle/events.go b/pkg/controllers/nodeclaim/lifecycle/events.go index 2697b1cdc0..4480057471 100644 --- a/pkg/controllers/nodeclaim/lifecycle/events.go +++ b/pkg/controllers/nodeclaim/lifecycle/events.go @@ -44,3 +44,13 @@ func NodeClassNotReadyEvent(nodeClaim *v1.NodeClaim, err error) events.Event { DedupeValues: []string{string(nodeClaim.UID)}, } } + +func NodeClaimTerminationGracePeriodExpiringEvent(nodeClaim *v1.NodeClaim, terminationTime string) events.Event { + return events.Event{ + InvolvedObject: nodeClaim, + Type: corev1.EventTypeWarning, + Reason: "TerminationGracePeriodExpiring", + Message: fmt.Sprintf("All pods will be deleted by %s", terminationTime), + DedupeValues: []string{nodeClaim.Name}, + } +} diff --git a/pkg/events/suite_test.go b/pkg/events/suite_test.go index fec8d1462a..2c53df92ef 100644 --- a/pkg/events/suite_test.go +++ b/pkg/events/suite_test.go @@ -30,8 +30,9 @@ import ( "k8s.io/client-go/util/flowcontrol" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" - terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" - schedulingevents "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" + "sigs.k8s.io/karpenter/pkg/controllers/node/termination" + "sigs.k8s.io/karpenter/pkg/controllers/node/termination/eviction" + "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/test" ) @@ -78,26 +79,26 @@ func TestRecorder(t *testing.T) { var _ = BeforeEach(func() { internalRecorder = NewInternalRecorder() eventRecorder = events.NewRecorder(internalRecorder) - schedulingevents.PodNominationRateLimiter = flowcontrol.NewTokenBucketRateLimiter(5, 10) + scheduling.PodNominationRateLimiter = flowcontrol.NewTokenBucketRateLimiter(5, 10) }) var _ = Describe("Event Creation", func() { It("should create a NominatePod event", func() { - eventRecorder.Publish(schedulingevents.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID())) - Expect(internalRecorder.Calls(schedulingevents.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID()).Reason)).To(Equal(1)) + eventRecorder.Publish(scheduling.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID())) + Expect(internalRecorder.Calls(scheduling.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID()).Reason)).To(Equal(1)) }) It("should create a EvictPod event", func() { - eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID())) - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1)) + eventRecorder.Publish(eviction.PodEvictedEvent(PodWithUID())) + Expect(internalRecorder.Calls(eviction.PodEvictedEvent(PodWithUID()).Reason)).To(Equal(1)) }) It("should create a PodFailedToSchedule event", func() { - eventRecorder.Publish(schedulingevents.PodFailedToScheduleEvent(PodWithUID(), fmt.Errorf(""))) - Expect(internalRecorder.Calls(schedulingevents.PodFailedToScheduleEvent(PodWithUID(), fmt.Errorf("")).Reason)).To(Equal(1)) + eventRecorder.Publish(scheduling.PodFailedToScheduleEvent(PodWithUID(), fmt.Errorf(""))) + Expect(internalRecorder.Calls(scheduling.PodFailedToScheduleEvent(PodWithUID(), fmt.Errorf("")).Reason)).To(Equal(1)) }) It("should create a NodeFailedToDrain event", func() { - eventRecorder.Publish(terminatorevents.NodeFailedToDrain(NodeWithUID(), fmt.Errorf(""))) - Expect(internalRecorder.Calls(terminatorevents.NodeFailedToDrain(NodeWithUID(), fmt.Errorf("")).Reason)).To(Equal(1)) + eventRecorder.Publish(termination.NodeDrainFailedEvent(NodeWithUID(), fmt.Errorf(""))) + Expect(internalRecorder.Calls(termination.NodeDrainFailedEvent(NodeWithUID(), fmt.Errorf("")).Reason)).To(Equal(1)) }) }) @@ -105,49 +106,49 @@ var _ = Describe("Dedupe", func() { It("should only create a single event when many events are created quickly", func() { pod := PodWithUID() for i := 0; i < 100; i++ { - eventRecorder.Publish(terminatorevents.EvictPod(pod)) + eventRecorder.Publish(eviction.PodEvictedEvent(pod)) } - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1)) + Expect(internalRecorder.Calls(eviction.PodEvictedEvent(PodWithUID()).Reason)).To(Equal(1)) }) It("should allow the dedupe timeout to be overridden", func() { pod := PodWithUID() - evt := terminatorevents.EvictPod(pod) + evt := eviction.PodEvictedEvent(pod) evt.DedupeTimeout = time.Second * 2 // Generate a set of events within the dedupe timeout for i := 0; i < 10; i++ { eventRecorder.Publish(evt) } - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1)) + Expect(internalRecorder.Calls(eviction.PodEvictedEvent(PodWithUID()).Reason)).To(Equal(1)) // Wait until after the overridden dedupe timeout time.Sleep(time.Second * 3) eventRecorder.Publish(evt) - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(2)) + Expect(internalRecorder.Calls(eviction.PodEvictedEvent(PodWithUID()).Reason)).To(Equal(2)) }) It("should allow events with different entities to be created", func() { for i := 0; i < 100; i++ { - eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID())) + eventRecorder.Publish(eviction.PodEvictedEvent(PodWithUID())) } - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(100)) + Expect(internalRecorder.Calls(eviction.PodEvictedEvent(PodWithUID()).Reason)).To(Equal(100)) }) }) var _ = Describe("Rate Limiting", func() { It("should only create max-burst when many events are created quickly", func() { for i := 0; i < 100; i++ { - eventRecorder.Publish(schedulingevents.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID())) + eventRecorder.Publish(scheduling.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID())) } - Expect(internalRecorder.Calls(schedulingevents.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID()).Reason)).To(Equal(10)) + Expect(internalRecorder.Calls(scheduling.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID()).Reason)).To(Equal(10)) }) It("should allow many events over time due to smoothed rate limiting", func() { for i := 0; i < 3; i++ { for j := 0; j < 5; j++ { - eventRecorder.Publish(schedulingevents.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID())) + eventRecorder.Publish(scheduling.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID())) } time.Sleep(time.Second) } - Expect(internalRecorder.Calls(schedulingevents.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID()).Reason)).To(Equal(15)) + Expect(internalRecorder.Calls(scheduling.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID()).Reason)).To(Equal(15)) }) }) diff --git a/pkg/utils/node/node.go b/pkg/utils/node/node.go index 0587c2fe1c..0e33dd5149 100644 --- a/pkg/utils/node/node.go +++ b/pkg/utils/node/node.go @@ -27,7 +27,9 @@ import ( corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" @@ -178,3 +180,22 @@ func IsManagedPredicateFuncs(cp cloudprovider.CloudProvider) predicate.Funcs { return IsManaged(o.(*corev1.Node), cp) }) } + +func NodeClaimEventHandler(c client.Client, cp cloudprovider.CloudProvider) handler.EventHandler { + return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request { + nc := o.(*v1.NodeClaim) + if nc.Status.ProviderID == "" { + return nil + } + if !nodeclaimutils.IsManaged(nc, cp) { + return nil + } + nodes := &corev1.NodeList{} + if err := c.List(ctx, nodes, client.MatchingFields{"spec.providerID": nc.Status.ProviderID}); err != nil { + return nil + } + return lo.Map(nodes.Items, func(n corev1.Node, _ int) reconcile.Request { + return reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&n)} + }) + }) +}