From f7e2b8072f83a9c3e8ada6a7ed81c52cedae1c99 Mon Sep 17 00:00:00 2001 From: Jason Deal Date: Sun, 24 Nov 2024 07:56:28 -0800 Subject: [PATCH] feat: drain and volume attachment observability --- pkg/apis/v1/nodeclaim_status.go | 2 + .../node/termination/controller.go | 228 ++++-------------- pkg/controllers/node/termination/drain.go | 110 +++++++++ .../node/termination/instancetermination.go | 71 ++++++ .../node/termination/suite_test.go | 34 +-- .../termination/terminator/events/events.go | 16 +- .../node/termination/volumedetachment.go | 138 +++++++++++ pkg/utils/nodeclaim/nodeclaim.go | 25 ++ 8 files changed, 409 insertions(+), 215 deletions(-) create mode 100644 pkg/controllers/node/termination/drain.go create mode 100644 pkg/controllers/node/termination/instancetermination.go create mode 100644 pkg/controllers/node/termination/volumedetachment.go diff --git a/pkg/apis/v1/nodeclaim_status.go b/pkg/apis/v1/nodeclaim_status.go index aca25a4e77..ec0bf2f102 100644 --- a/pkg/apis/v1/nodeclaim_status.go +++ b/pkg/apis/v1/nodeclaim_status.go @@ -28,6 +28,8 @@ const ( ConditionTypeInitialized = "Initialized" ConditionTypeConsolidatable = "Consolidatable" ConditionTypeDrifted = "Drifted" + ConditionTypeDrained = "Drained" + ConditionTypeVolumesDetached = "VolumesDetached" ConditionTypeInstanceTerminating = "InstanceTerminating" ConditionTypeConsistentStateFound = "ConsistentStateFound" ) diff --git a/pkg/controllers/node/termination/controller.go b/pkg/controllers/node/termination/controller.go index 6249d1b27e..8e36c3b44a 100644 --- a/pkg/controllers/node/termination/controller.go +++ b/pkg/controllers/node/termination/controller.go @@ -21,14 +21,12 @@ import ( "fmt" "time" - "github.com/samber/lo" "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" - storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -43,198 +41,85 @@ import ( v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" - terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/metrics" "sigs.k8s.io/karpenter/pkg/operator/injection" nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" - "sigs.k8s.io/karpenter/pkg/utils/pod" - "sigs.k8s.io/karpenter/pkg/utils/termination" - volumeutil "sigs.k8s.io/karpenter/pkg/utils/volume" ) // Controller for the resource type Controller struct { - clock clock.Clock - kubeClient client.Client - cloudProvider cloudprovider.CloudProvider - terminator *terminator.Terminator - recorder events.Recorder + clock clock.Clock + kubeClient client.Client + terminator *terminator.Terminator + reconcilers []terminationReconciler +} + +// TODO (jmdeal@): Split subreconcilers into individual controllers +type terminationReconciler interface { + Reconcile(context.Context, *corev1.Node, *v1.NodeClaim) (reconcile.Result, error) } // NewController constructs a controller instance func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, terminator *terminator.Terminator, recorder events.Recorder) *Controller { return &Controller{ - clock: clk, - kubeClient: kubeClient, - cloudProvider: cloudProvider, - terminator: terminator, - recorder: recorder, + clock: clk, + kubeClient: kubeClient, + terminator: terminator, + reconcilers: []terminationReconciler{ + &DrainReconciler{kubeClient, cloudProvider, recorder, terminator}, + &VolumeDetachmentReconciler{kubeClient, clk, recorder}, + &InstanceTerminationReconciler{kubeClient, cloudProvider, clk}, + }, } } +// nolint:gocyclo func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.Result, error) { ctx = injection.WithControllerName(ctx, "node.termination") - - if !n.GetDeletionTimestamp().IsZero() { - return c.finalize(ctx, n) - } - return reconcile.Result{}, nil -} - -//nolint:gocyclo -func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile.Result, error) { - if !controllerutil.ContainsFinalizer(node, v1.TerminationFinalizer) { + ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(n.Namespace, n.Name))) + if n.GetDeletionTimestamp().IsZero() { return reconcile.Result{}, nil } - nodeClaims, err := nodeutils.GetNodeClaims(ctx, node, c.kubeClient) - if err != nil { - return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err) - } - - if err = c.deleteAllNodeClaims(ctx, nodeClaims...); err != nil { - return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err) - } - - nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaims...) - if err != nil { - return reconcile.Result{}, err + if !controllerutil.ContainsFinalizer(n, v1.TerminationFinalizer) { + return reconcile.Result{}, nil } - if err = c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil { + if err := c.terminator.Taint(ctx, n, v1.DisruptedNoScheduleTaint); err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } if errors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } - return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)) + return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err) } - if err = c.terminator.Drain(ctx, node, nodeTerminationTime); err != nil { - if !terminator.IsNodeDrainError(err) { - return reconcile.Result{}, fmt.Errorf("draining node, %w", err) - } - c.recorder.Publish(terminatorevents.NodeFailedToDrain(node, err)) - // If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully draining - // on nodes that are no longer alive. We do a check on the Ready condition of the node since, even - // though the CloudProvider says the instance is not around, we know that the kubelet process is still running - // if the Node Ready condition is true - // Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144 - if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue { - if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil { - if cloudprovider.IsNodeClaimNotFoundError(err) { - return reconcile.Result{}, c.removeFinalizer(ctx, node) - } - return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err) - } - } - - return reconcile.Result{RequeueAfter: 1 * time.Second}, nil - } - NodesDrainedTotal.Inc(map[string]string{ - metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey], - }) - // In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait - // for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer. - // However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting. - if nodeTerminationTime == nil || c.clock.Now().Before(*nodeTerminationTime) { - areVolumesDetached, err := c.ensureVolumesDetached(ctx, node) - if err != nil { - return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err) - } - if !areVolumesDetached { - return reconcile.Result{RequeueAfter: 1 * time.Second}, nil - } - } - nodeClaims, err = nodeutils.GetNodeClaims(ctx, node, c.kubeClient) + nc, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, n) if err != nil { - return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err) - } - for _, nodeClaim := range nodeClaims { - isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider) - if err != nil { - // 404 = the nodeClaim no longer exists - if errors.IsNotFound(err) { - continue - } - // 409 - The nodeClaim exists, but its status has already been modified - if errors.IsConflict(err) { - return reconcile.Result{Requeue: true}, nil - } - return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err) + if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) { + log.FromContext(ctx).Error(err, "failed to terminate node") + return reconcile.Result{}, nil } - if !isInstanceTerminated { - return reconcile.Result{RequeueAfter: 5 * time.Second}, nil - } - } - if err := c.removeFinalizer(ctx, node); err != nil { return reconcile.Result{}, err } - return reconcile.Result{}, nil -} - -func (c *Controller) deleteAllNodeClaims(ctx context.Context, nodeClaims ...*v1.NodeClaim) error { - for _, nodeClaim := range nodeClaims { - // If we still get the NodeClaim, but it's already marked as terminating, we don't need to call Delete again - if nodeClaim.DeletionTimestamp.IsZero() { - if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil { - return client.IgnoreNotFound(err) - } + ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodeClaim", klog.KRef(nc.Namespace, nc.Name))) + if nc.DeletionTimestamp.IsZero() { + if err := c.kubeClient.Delete(ctx, nc); err != nil { + return reconcile.Result{}, client.IgnoreNotFound(err) } } - return nil -} - -func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) { - volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node) - if err != nil { - return false, err - } - // Filter out VolumeAttachments associated with not drain-able Pods - filteredVolumeAttachments, err := filterVolumeAttachments(ctx, c.kubeClient, node, volumeAttachments, c.clock) - if err != nil { - return false, err - } - return len(filteredVolumeAttachments) == 0, nil -} -// filterVolumeAttachments filters out storagev1.VolumeAttachments that should not block the termination -// of the passed corev1.Node -func filterVolumeAttachments(ctx context.Context, kubeClient client.Client, node *corev1.Node, volumeAttachments []*storagev1.VolumeAttachment, clk clock.Clock) ([]*storagev1.VolumeAttachment, error) { - // No need to filter empty VolumeAttachments list - if len(volumeAttachments) == 0 { - return volumeAttachments, nil - } - // Create list of non-drain-able Pods associated with Node - pods, err := nodeutils.GetPods(ctx, kubeClient, node) - if err != nil { - return nil, err - } - unDrainablePods := lo.Reject(pods, func(p *corev1.Pod, _ int) bool { - return pod.IsDrainable(p, clk) - }) - // Filter out VolumeAttachments associated with non-drain-able Pods - // Match on Pod -> PersistentVolumeClaim -> PersistentVolume Name <- VolumeAttachment - shouldFilterOutVolume := sets.New[string]() - for _, p := range unDrainablePods { - for _, v := range p.Spec.Volumes { - pvc, err := volumeutil.GetPersistentVolumeClaim(ctx, kubeClient, p, v) - if errors.IsNotFound(err) { - continue - } - if err != nil { - return nil, err - } - if pvc != nil { - shouldFilterOutVolume.Insert(pvc.Spec.VolumeName) - } + for _, r := range c.reconcilers { + res, err := r.Reconcile(ctx, n, nc) + if res.Requeue || res.RequeueAfter != 0 || err != nil { + return res, err } } - filteredVolumeAttachments := lo.Reject(volumeAttachments, func(v *storagev1.VolumeAttachment, _ int) bool { - pvName := v.Spec.Source.PersistentVolumeName - return pvName == nil || shouldFilterOutVolume.Has(*pvName) - }) - return filteredVolumeAttachments, nil + + return reconcile.Result{}, nil } -func (c *Controller) removeFinalizer(ctx context.Context, n *corev1.Node) error { +func removeFinalizer(ctx context.Context, kubeClient client.Client, n *corev1.Node) error { stored := n.DeepCopy() controllerutil.RemoveFinalizer(n, v1.TerminationFinalizer) if !equality.Semantic.DeepEqual(stored, n) { @@ -242,44 +127,27 @@ func (c *Controller) removeFinalizer(ctx context.Context, n *corev1.Node) error // a strategic merge patch represents the finalizer list as a keyed "set" so removing // an item from the list doesn't replace the full list // https://github.com/kubernetes/kubernetes/issues/111643#issuecomment-2016489732 - if err := c.kubeClient.Patch(ctx, n, client.StrategicMergeFrom(stored)); err != nil { - return client.IgnoreNotFound(fmt.Errorf("removing finalizer, %w", err)) + if err := kubeClient.Patch(ctx, n, client.StrategicMergeFrom(stored)); err != nil { + if errors.IsNotFound(err) { + return nil + } + return fmt.Errorf("removing finalizer, %w", err) } - metrics.NodesTerminatedTotal.Inc(map[string]string{ metrics.NodePoolLabel: n.Labels[v1.NodePoolLabelKey], }) - // We use stored.DeletionTimestamp since the api-server may give back a node after the patch without a deletionTimestamp DurationSeconds.Observe(time.Since(stored.DeletionTimestamp.Time).Seconds(), map[string]string{ metrics.NodePoolLabel: n.Labels[v1.NodePoolLabelKey], }) - NodeLifetimeDurationSeconds.Observe(time.Since(n.CreationTimestamp.Time).Seconds(), map[string]string{ metrics.NodePoolLabel: n.Labels[v1.NodePoolLabelKey], }) - log.FromContext(ctx).Info("deleted node") } return nil } -func (c *Controller) nodeTerminationTime(node *corev1.Node, nodeClaims ...*v1.NodeClaim) (*time.Time, error) { - if len(nodeClaims) == 0 { - return nil, nil - } - expirationTimeString, exists := nodeClaims[0].ObjectMeta.Annotations[v1.NodeClaimTerminationTimestampAnnotationKey] - if !exists { - return nil, nil - } - c.recorder.Publish(terminatorevents.NodeTerminationGracePeriodExpiring(node, expirationTimeString)) - expirationTime, err := time.Parse(time.RFC3339, expirationTimeString) - if err != nil { - return nil, fmt.Errorf("parsing %s annotation, %w", v1.NodeClaimTerminationTimestampAnnotationKey, err) - } - return &expirationTime, nil -} - func (c *Controller) Register(_ context.Context, m manager.Manager) error { return controllerruntime.NewControllerManagedBy(m). Named("node.termination"). diff --git a/pkg/controllers/node/termination/drain.go b/pkg/controllers/node/termination/drain.go new file mode 100644 index 0000000000..e18c13f7fc --- /dev/null +++ b/pkg/controllers/node/termination/drain.go @@ -0,0 +1,110 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package termination + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" + "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" + terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" + "sigs.k8s.io/karpenter/pkg/events" + "sigs.k8s.io/karpenter/pkg/metrics" + nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" + "sigs.k8s.io/karpenter/pkg/utils/nodeclaim" +) + +type DrainReconciler struct { + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider + recorder events.Recorder + terminator *terminator.Terminator +} + +// nolint:gocyclo +func (d *DrainReconciler) Reconcile(ctx context.Context, n *corev1.Node, nc *v1.NodeClaim) (reconcile.Result, error) { + if nc.StatusConditions().IsTrue(v1.ConditionTypeDrained) { + return reconcile.Result{}, nil + } + + tgpExpirationTime, err := nodeclaim.TerminationGracePeriodExpirationTime(nc) + if err != nil { + log.FromContext(ctx).Error(err, "failed to terminate node") + return reconcile.Result{}, nil + } + if tgpExpirationTime != nil { + d.recorder.Publish(terminatorevents.NodeTerminationGracePeriodExpiring(n, *tgpExpirationTime)) + } + + if err := d.terminator.Drain(ctx, n, tgpExpirationTime); err != nil { + if !terminator.IsNodeDrainError(err) { + return reconcile.Result{}, fmt.Errorf("draining node, %w", err) + } + // If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully draining + // on nodes that are no longer alive. We do a check on the Ready condition of the node since, even + // though the CloudProvider says the instance is not around, we know that the kubelet process is still running + // if the Node Ready condition is true + // Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144 + if nodeutils.GetCondition(n, corev1.NodeReady).Status != corev1.ConditionTrue { + if _, err = d.cloudProvider.Get(ctx, n.Spec.ProviderID); err != nil { + if cloudprovider.IsNodeClaimNotFoundError(err) { + return reconcile.Result{}, removeFinalizer(ctx, d.kubeClient, n) + } + return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err) + } + } + stored := nc.DeepCopy() + if nc.StatusConditions().SetFalse(v1.ConditionTypeDrained, "Draining", "Draining") { + if err := d.kubeClient.Status().Patch(ctx, nc, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, err + } + } + d.recorder.Publish(terminatorevents.NodeFailedToDrain(n, err)) + return reconcile.Result{RequeueAfter: 5 * time.Second}, nil + } + + stored := nc.DeepCopy() + _ = nc.StatusConditions().SetTrue(v1.ConditionTypeDrained) + if err := d.kubeClient.Status().Patch(ctx, nc, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, err + } + NodesDrainedTotal.Inc(map[string]string{ + metrics.NodePoolLabel: n.Labels[v1.NodePoolLabelKey], + }) + return reconcile.Result{}, nil +} diff --git a/pkg/controllers/node/termination/instancetermination.go b/pkg/controllers/node/termination/instancetermination.go new file mode 100644 index 0000000000..267f194690 --- /dev/null +++ b/pkg/controllers/node/termination/instancetermination.go @@ -0,0 +1,71 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package termination + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/utils/clock" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" + nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim" + "sigs.k8s.io/karpenter/pkg/utils/termination" +) + +type InstanceTerminationReconciler struct { + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider + clk clock.Clock +} + +func (i *InstanceTerminationReconciler) Reconcile(ctx context.Context, n *corev1.Node, nc *v1.NodeClaim) (reconcile.Result, error) { + elapsed, err := nodeclaimutils.HasTerminationGracePeriodElapsed(i.clk, nc) + if err != nil { + log.FromContext(ctx).Error(err, "failed to terminate node") + return reconcile.Result{}, nil + } + if !nc.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsTrue() && !elapsed { + return reconcile.Result{}, nil + } + isInstanceTerminated, err := termination.EnsureTerminated(ctx, i.kubeClient, nc, i.cloudProvider) + if err != nil { + // 404 = the nodeClaim no longer exists + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + // 409 - The nodeClaim exists, but its status has already been modified + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err) + } + if !isInstanceTerminated { + return reconcile.Result{RequeueAfter: 5 * time.Second}, nil + } + if err := removeFinalizer(ctx, i.kubeClient, n); err != nil { + return reconcile.Result{}, err + } + return reconcile.Result{}, nil +} diff --git a/pkg/controllers/node/termination/suite_test.go b/pkg/controllers/node/termination/suite_test.go index d7727c48d9..823819c08e 100644 --- a/pkg/controllers/node/termination/suite_test.go +++ b/pkg/controllers/node/termination/suite_test.go @@ -18,7 +18,6 @@ package termination_test import ( "context" - "sync" "testing" "time" @@ -125,37 +124,6 @@ var _ = Describe("Termination", func() { ExpectFinalizersRemoved(ctx, env.Client, nodeClaim) ExpectNotFound(ctx, env.Client, node, nodeClaim) }) - It("should not race if deleting nodes in parallel", func() { - const numNodes = 10 - var nodes []*corev1.Node - for i := 0; i < numNodes; i++ { - node = test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Finalizers: []string{v1.TerminationFinalizer}, - }, - }) - ExpectApplied(ctx, env.Client, node, nodeClaim) - Expect(env.Client.Delete(ctx, node)).To(Succeed()) - node = ExpectNodeExists(ctx, env.Client, node.Name) - nodes = append(nodes, node) - } - - // Reconcile twice, once to set the NodeClaim to terminating, another to check the instance termination status (and delete the node). - for range 2 { - var wg sync.WaitGroup - // this is enough to trip the race detector - for i := 0; i < numNodes; i++ { - wg.Add(1) - go func(node *corev1.Node) { - defer GinkgoRecover() - defer wg.Done() - ExpectObjectReconciled(ctx, env.Client, terminationController, node) - }(nodes[i]) - } - wg.Wait() - } - ExpectNotFound(ctx, env.Client, lo.Map(nodes, func(n *corev1.Node, _ int) client.Object { return n })...) - }) It("should exclude nodes from load balancers when terminating", func() { labels := map[string]string{"foo": "bar"} pod := test.Pod(test.PodOptions{ @@ -869,7 +837,7 @@ var _ = Describe("Termination", func() { // Don't let any pod evict MinAvailable: &minAvailable, }) - ExpectApplied(ctx, env.Client, pdb, node) + ExpectApplied(ctx, env.Client, pdb, node, nodeClaim) pods := test.Pods(5, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{ OwnerReferences: defaultOwnerRefs, Labels: labelSelector, diff --git a/pkg/controllers/node/termination/terminator/events/events.go b/pkg/controllers/node/termination/terminator/events/events.go index d626173d56..3c39e3cc1e 100644 --- a/pkg/controllers/node/termination/terminator/events/events.go +++ b/pkg/controllers/node/termination/terminator/events/events.go @@ -22,6 +22,8 @@ import ( corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/events" ) @@ -56,12 +58,22 @@ func NodeFailedToDrain(node *corev1.Node, err error) events.Event { } } -func NodeTerminationGracePeriodExpiring(node *corev1.Node, terminationTime string) events.Event { +func NodeAwaitingVolumeDetachment(node *corev1.Node, volumeAttachments ...*storagev1.VolumeAttachment) events.Event { + return events.Event{ + InvolvedObject: node, + Type: corev1.EventTypeWarning, + Reason: "AwaitingVolumeDetachment", + Message: fmt.Sprintf("Awaiting deletion of %d VolumeAttachments bound to node", len(volumeAttachments)), + DedupeValues: []string{node.Name}, + } +} + +func NodeTerminationGracePeriodExpiring(node *corev1.Node, t time.Time) events.Event { return events.Event{ InvolvedObject: node, Type: corev1.EventTypeWarning, Reason: "TerminationGracePeriodExpiring", - Message: fmt.Sprintf("All pods will be deleted by %s", terminationTime), + Message: fmt.Sprintf("All pods will be deleted by %s", t.Format(time.RFC3339)), DedupeValues: []string{node.Name}, } } diff --git a/pkg/controllers/node/termination/volumedetachment.go b/pkg/controllers/node/termination/volumedetachment.go new file mode 100644 index 0000000000..109d80d1a3 --- /dev/null +++ b/pkg/controllers/node/termination/volumedetachment.go @@ -0,0 +1,138 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package termination + +import ( + "context" + "time" + + "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/utils/clock" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" + "sigs.k8s.io/karpenter/pkg/events" + + storagev1 "k8s.io/api/storage/v1" + + nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" + "sigs.k8s.io/karpenter/pkg/utils/nodeclaim" + podutils "sigs.k8s.io/karpenter/pkg/utils/pod" + volumeutil "sigs.k8s.io/karpenter/pkg/utils/volume" +) + +type VolumeDetachmentReconciler struct { + kubeClient client.Client + clk clock.Clock + recorder events.Recorder +} + +// nolint:gocyclo +func (v *VolumeDetachmentReconciler) Reconcile(ctx context.Context, n *corev1.Node, nc *v1.NodeClaim) (reconcile.Result, error) { + if !nc.StatusConditions().Get(v1.ConditionTypeDrained).IsTrue() { + return reconcile.Result{}, nil + } + if nc.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsTrue() { + return reconcile.Result{}, nil + } + if elapsed, err := nodeclaim.HasTerminationGracePeriodElapsed(v.clk, nc); err != nil { + log.FromContext(ctx).Error(err, "failed to terminate node") + return reconcile.Result{}, nil + } else if elapsed { + return reconcile.Result{}, nil + } + + blockingVAs, err := v.blockingVolumeAttachments(ctx, n) + if err != nil { + return reconcile.Result{}, err + } + if len(blockingVAs) != 0 { + stored := nc.DeepCopy() + _ = nc.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "AwaitingVolumeDetachment", "AwaitingVolumeDetachment") + if err := v.kubeClient.Status().Patch(ctx, nc, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, err + } + v.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachment(n, blockingVAs...)) + return reconcile.Result{RequeueAfter: 5 * time.Second}, nil + } + + stored := nc.DeepCopy() + _ = nc.StatusConditions().SetTrue(v1.ConditionTypeVolumesDetached) + if err := v.kubeClient.Status().Patch(ctx, nc, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, err + } + return reconcile.Result{}, nil +} + +func (v *VolumeDetachmentReconciler) blockingVolumeAttachments(ctx context.Context, n *corev1.Node) ([]*storagev1.VolumeAttachment, error) { + vas, err := nodeutils.GetVolumeAttachments(ctx, v.kubeClient, n) + if err != nil { + return nil, err + } + if len(vas) == 0 { + return nil, nil + } + + pods, err := nodeutils.GetPods(ctx, v.kubeClient, n) + if err != nil { + return nil, err + } + pods = lo.Reject(pods, func(p *corev1.Pod, _ int) bool { + return podutils.IsDrainable(p, v.clk) + }) + + // Determine the VolumeAttachments associated with non-drainable pods. We consider these non-blocking since they + // will never be detached without intervention (since the pods aren't drained). + nonBlockingVolumes := sets.New[string]() + for _, p := range pods { + for _, vol := range p.Spec.Volumes { + pvc, err := volumeutil.GetPersistentVolumeClaim(ctx, v.kubeClient, p, vol) + if errors.IsNotFound(err) { + continue + } + if err != nil { + return nil, err + } + if pvc != nil { + nonBlockingVolumes.Insert(pvc.Spec.VolumeName) + } + } + } + blockingVAs := lo.Reject(vas, func(v *storagev1.VolumeAttachment, _ int) bool { + pvName := v.Spec.Source.PersistentVolumeName + return pvName == nil || nonBlockingVolumes.Has(*pvName) + }) + return blockingVAs, nil +} diff --git a/pkg/utils/nodeclaim/nodeclaim.go b/pkg/utils/nodeclaim/nodeclaim.go index 02d326df7b..c1b1ec3c0a 100644 --- a/pkg/utils/nodeclaim/nodeclaim.go +++ b/pkg/utils/nodeclaim/nodeclaim.go @@ -20,12 +20,14 @@ import ( "context" "errors" "fmt" + "time" "github.com/awslabs/operatorpkg/object" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -200,3 +202,26 @@ func UpdateNodeOwnerReferences(nodeClaim *v1.NodeClaim, node *corev1.Node) *core }) return node } + +func TerminationGracePeriodExpirationTime(nc *v1.NodeClaim) (*time.Time, error) { + expirationTimeString, exists := nc.ObjectMeta.Annotations[v1.NodeClaimTerminationTimestampAnnotationKey] + if !exists { + return nil, nil + } + expirationTime, err := time.Parse(time.RFC3339, expirationTimeString) + if err != nil { + return nil, fmt.Errorf("parsing %s annotation, %w", v1.NodeClaimTerminationTimestampAnnotationKey, err) + } + return &expirationTime, nil +} + +func HasTerminationGracePeriodElapsed(clk clock.Clock, nc *v1.NodeClaim) (bool, error) { + expirationTime, err := TerminationGracePeriodExpirationTime(nc) + if err != nil { + return false, err + } + if expirationTime == nil { + return false, nil + } + return clk.Now().After(*expirationTime), nil +}