Skip to content

Commit

Permalink
finalizer hydration
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Dec 9, 2024
1 parent 0f76cdc commit 6df1a3c
Show file tree
Hide file tree
Showing 9 changed files with 367 additions and 189 deletions.
10 changes: 10 additions & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@ const (
NodePoolHashAnnotationKey = apis.Group + "/nodepool-hash"
NodePoolHashVersionAnnotationKey = apis.Group + "/nodepool-hash-version"
NodeClaimTerminationTimestampAnnotationKey = apis.Group + "/nodeclaim-termination-timestamp"
HydrationAnnotationKey = apis.Group + "/hydrated-by"
)

// Karpenter specific finalizers
const (
TerminationFinalizer = apis.Group + "/termination"
DrainFinalizer = apis.Group + "/drain-protection"
VolumeFinalizer = apis.Group + "/volume-protection"
)

var (
Expand Down Expand Up @@ -102,6 +105,13 @@ var (
v1.LabelInstanceType: v1.LabelInstanceTypeStable,
v1.LabelFailureDomainBetaRegion: v1.LabelTopologyRegion,
}

// HydratedFinailzers contains the finalizers which must be applied to a resource by the hydration contrller. If
// the resource is not hydrated, Karpenter can not depend on the lack of one of these finalizers as a signal.
HydratedFinailzers = sets.New(
DrainFinalizer,
VolumeFinalizer,
)
)

// IsRestrictedLabel returns an error if the label is restricted.
Expand Down
55 changes: 39 additions & 16 deletions pkg/controllers/node/hydration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -35,6 +36,7 @@ import (

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/operator"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
Expand All @@ -54,10 +56,33 @@ func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudPr
}
}

func (c *Controller) Name() string {
return "node.hydration"
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named(c.Name()).
For(&corev1.Node{}).
Watches(&v1.NodeClaim{}, nodeutils.NodeClaimEventHandler(c.kubeClient)).
WithOptions(controller.Options{
RateLimiter: reasonable.RateLimiter(),
MaxConcurrentReconciles: 1000,
}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}

func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, c.Name())
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(n.Namespace, n.Name)))

if nodeutils.IsHydrated(n) {
return reconcile.Result{}, nil
}
n.Annotations = lo.Assign(n.Annotations, map[string]string{
v1.HydrationAnnotationKey: operator.Version,
})

nc, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, n)
if err != nil {
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) {
Expand All @@ -70,29 +95,27 @@ func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.R
}

stored := n.DeepCopy()
n.Labels = lo.Assign(n.Labels, map[string]string{
v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()): nc.Spec.NodeClassRef.Name,
})
c.hydrateNodeClassLabel(n, nc)
c.hydrateFinalizers(n, nc)
if !equality.Semantic.DeepEqual(stored, n) {
if err := c.kubeClient.Patch(ctx, n, client.MergeFrom(stored)); err != nil {
if err := c.kubeClient.Patch(ctx, n, client.StrategicMergeFrom(stored)); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
}
return reconcile.Result{}, nil
}

func (c *Controller) Name() string {
return "node.hydration"
func (c *Controller) hydrateNodeClassLabel(n *corev1.Node, nc *v1.NodeClaim) {
n.Labels = lo.Assign(n.Labels, map[string]string{
v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()): nc.Spec.NodeClassRef.Name,
})
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named(c.Name()).
For(&corev1.Node{}).
Watches(&v1.NodeClaim{}, nodeutils.NodeClaimEventHandler(c.kubeClient)).
WithOptions(controller.Options{
RateLimiter: reasonable.RateLimiter(),
MaxConcurrentReconciles: 1000,
}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
func (c *Controller) hydrateFinalizers(n *corev1.Node, nc *v1.NodeClaim) {
if !nc.StatusConditions().Get(v1.ConditionTypeDrained).IsTrue() {
controllerutil.AddFinalizer(n, v1.DrainFinalizer)
}
if !nc.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsTrue() {
controllerutil.AddFinalizer(n, v1.VolumeFinalizer)
}
}
89 changes: 61 additions & 28 deletions pkg/controllers/node/hydration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ import (
. "github.com/onsi/gomega"
"github.com/samber/lo"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/karpenter/pkg/apis"
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/node/hydration"
"sigs.k8s.io/karpenter/pkg/operator"
"sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
Expand Down Expand Up @@ -64,36 +67,66 @@ var _ = AfterEach(func() {
})

var _ = Describe("Hydration", func() {
It("should hydrate the NodeClass label", func() {
nc, n := test.NodeClaimAndNode()
delete(n.Labels, v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()))
ExpectApplied(ctx, env.Client, nc, n)
ExpectObjectReconciled(ctx, env.Client, hydrationController, n)
n = ExpectExists(ctx, env.Client, n)
value := n.Labels[v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind())]
Expect(value).To(Equal(nc.Spec.NodeClassRef.Name))
})
DescribeTable(
"Hydration",
func(isNodeClaimManaged bool) {
nodeClassRef := lo.Ternary(isNodeClaimManaged, &v1.NodeClassReference{
Group: "karpenter.test.sh",
Kind: "TestNodeClass",
Name: "default",
}, &v1.NodeClassReference{
Group: "karpenter.test.sh",
Kind: "UnmanagedNodeClass",
Name: "default",
})
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Spec: v1.NodeClaimSpec{
NodeClassRef: nodeClassRef,
},
})
delete(node.Labels, v1.NodeClassLabelKey(nodeClassRef.GroupKind()))
ExpectApplied(ctx, env.Client, nodeClaim, node)
ExpectObjectReconciled(ctx, env.Client, hydrationController, node)

// The missing NodeClass label should have been propagated to the Node
node = ExpectExists(ctx, env.Client, node)
value, ok := node.Labels[v1.NodeClassLabelKey(nodeClassRef.GroupKind())]
Expect(ok).To(Equal(isNodeClaimManaged))
if isNodeClaimManaged {
Expect(value).To(Equal(nodeClassRef.Name))
"Finalizers",
func(nodeClaimConditions []string, expectedFinailzers []string) {
nc, n := test.NodeClaimAndNode()
for _, cond := range nodeClaimConditions {
nc.StatusConditions().SetTrue(cond)
}
ExpectApplied(ctx, env.Client, nc, n)
ExpectObjectReconciled(ctx, env.Client, hydrationController, n)
n = ExpectExists(ctx, env.Client, n)
Expect(len(n.Finalizers)).To(Equal(len(expectedFinailzers)))
for _, finalizer := range expectedFinailzers {
Expect(controllerutil.ContainsFinalizer(n, finalizer))
}
},
Entry("should hydrate missing metadata onto the Node", true),
Entry("should ignore Nodes which aren't managed by this Karpenter instance", false),
Entry("should hydrate all finalizers when none of the requisite status conditions are true", nil, []string{v1.DrainFinalizer, v1.VolumeFinalizer}),
Entry("should hydrate the volume finalizer when only the drain status condition is true", []string{v1.ConditionTypeDrained}, []string{v1.VolumeFinalizer}),
Entry("should hydrate the drain finalizer when only the volume status condition is true", []string{v1.ConditionTypeVolumesDetached}, []string{v1.VolumeFinalizer}),
Entry("shouldn't hydrate finalizers when all requisite conditions are true", []string{v1.ConditionTypeDrained, v1.ConditionTypeVolumesDetached}, nil),
)

It("shouldn't hydrate nodes which have already been hydrated", func() {
nc, n := test.NodeClaimAndNode(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1.HydrationAnnotationKey: operator.Version,
},
},
})
delete(n.Labels, v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()))
ExpectApplied(ctx, env.Client, nc, n)
ExpectObjectReconciled(ctx, env.Client, hydrationController, n)
n = ExpectExists(ctx, env.Client, n)
Expect(lo.Keys(n.Labels)).ToNot(ContainElement(v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind())))
Expect(len(n.Finalizers)).To(Equal(0))
})
It("shouldn't hydrate nodes which are not managed by this instance of Karpenter", func() {
nc, n := test.NodeClaimAndNode(v1.NodeClaim{
Spec: v1.NodeClaimSpec{
NodeClassRef: &v1.NodeClassReference{
Group: "karpenter.test.sh",
Kind: "UnmanagedNodeClass",
Name: "default",
},
},
})
delete(n.Labels, v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()))
ExpectApplied(ctx, env.Client, nc, n)
ExpectObjectReconciled(ctx, env.Client, hydrationController, n)
n = ExpectExists(ctx, env.Client, n)
Expect(lo.Keys(n.Labels)).ToNot(ContainElement(v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind())))
Expect(len(n.Finalizers)).To(Equal(0))
})
})
47 changes: 30 additions & 17 deletions pkg/controllers/nodeclaim/hydration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/operator"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
)
Expand All @@ -52,36 +53,48 @@ func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudPr
}
}

func (c *Controller) Name() string {
return "nodeclaim.hydration"
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named(c.Name()).
For(&v1.NodeClaim{}, builder.WithPredicates(nodeclaimutils.IsManagedPredicateFuncs(c.cloudProvider))).
WithOptions(controller.Options{
RateLimiter: reasonable.RateLimiter(),
MaxConcurrentReconciles: 1000,
}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}

func (c *Controller) Reconcile(ctx context.Context, nc *v1.NodeClaim) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, c.Name())
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodeClaim", klog.KRef(nc.Namespace, nc.Name)))

if !nodeclaimutils.IsManaged(nc, c.cloudProvider) {
return reconcile.Result{}, nil
}

stored := nc.DeepCopy()
nc.Labels = lo.Assign(nc.Labels, map[string]string{
v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()): nc.Spec.NodeClassRef.Name,
if version := nc.Annotations[v1.HydrationAnnotationKey]; version == operator.Version {
return reconcile.Result{}, nil
}
nc.Annotations = lo.Assign(nc.Annotations, map[string]string{
v1.HydrationAnnotationKey: operator.Version,
})

stored := nc.DeepCopy()
c.hydrateNodeClassLabel(nc)
if !equality.Semantic.DeepEqual(stored, nc) {
if err := c.kubeClient.Patch(ctx, nc, client.MergeFrom(stored)); err != nil {
if err := c.kubeClient.Patch(ctx, nc, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
}
return reconcile.Result{}, nil
}

func (c *Controller) Name() string {
return "nodeclaim.hydration"
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named(c.Name()).
For(&v1.NodeClaim{}, builder.WithPredicates(nodeclaimutils.IsManagedPredicateFuncs(c.cloudProvider))).
WithOptions(controller.Options{
RateLimiter: reasonable.RateLimiter(),
MaxConcurrentReconciles: 1000,
}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
func (c *Controller) hydrateNodeClassLabel(nc *v1.NodeClaim) {
nc.Labels = lo.Assign(nc.Labels, map[string]string{
v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()): nc.Spec.NodeClassRef.Name,
})
}
69 changes: 39 additions & 30 deletions pkg/controllers/nodeclaim/hydration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
. "github.com/onsi/gomega"
"github.com/samber/lo"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/karpenter/pkg/apis"
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/hydration"
"sigs.k8s.io/karpenter/pkg/operator"
"sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
Expand Down Expand Up @@ -64,35 +66,42 @@ var _ = AfterEach(func() {
})

var _ = Describe("Hydration", func() {
DescribeTable(
"Hydration",
func(isNodeClaimManaged bool) {
nodeClassRef := lo.Ternary(isNodeClaimManaged, &v1.NodeClassReference{
Group: "karpenter.test.sh",
Kind: "TestNodeClass",
Name: "default",
}, &v1.NodeClassReference{
Group: "karpenter.test.sh",
Kind: "UnmanagedNodeClass",
Name: "default",
})
nodeClaim, _ := test.NodeClaimAndNode(v1.NodeClaim{
Spec: v1.NodeClaimSpec{
NodeClassRef: nodeClassRef,
It("should hydrate the NodeClass label", func() {
nc, _ := test.NodeClaimAndNode()
delete(nc.Labels, v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()))
ExpectApplied(ctx, env.Client, nc)
ExpectObjectReconciled(ctx, env.Client, hydrationController, nc)
nc = ExpectExists(ctx, env.Client, nc)
Expect(nc.Labels[v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind())]).To(Equal(nc.Spec.NodeClassRef.Name))
})
It("shouldn't hydrate nodes which have already been hydrated", func() {
nc, _ := test.NodeClaimAndNode(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1.HydrationAnnotationKey: operator.Version,
},
})
delete(nodeClaim.Labels, v1.NodeClassLabelKey(nodeClassRef.GroupKind()))
ExpectApplied(ctx, env.Client, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, hydrationController, nodeClaim)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
value, ok := nodeClaim.Labels[v1.NodeClassLabelKey(nodeClassRef.GroupKind())]
Expect(ok).To(Equal(isNodeClaimManaged))
if isNodeClaimManaged {
Expect(value).To(Equal(nodeClassRef.Name))
}
},
Entry("should hydrate missing metadata onto the NodeClaim", true),
Entry("should ignore NodeClaims which aren't managed by this Karpenter instance", false),
)
},
})
delete(nc.Labels, v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()))
ExpectApplied(ctx, env.Client, nc)
ExpectObjectReconciled(ctx, env.Client, hydrationController, nc)
nc = ExpectExists(ctx, env.Client, nc)
Expect(lo.Keys(nc.Labels)).ToNot(ContainElement(v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind())))
})
It("shouldn't hydrate NodeClaims which aren't managed by this instance of Karpenter", func() {
nc, _ := test.NodeClaimAndNode(v1.NodeClaim{
Spec: v1.NodeClaimSpec{
NodeClassRef: &v1.NodeClassReference{
Group: "karpenter.test.sh",
Kind: "UnmanagedNodeClass",
Name: "default",
},
},
})
delete(nc.Labels, v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()))
ExpectApplied(ctx, env.Client, nc)
ExpectObjectReconciled(ctx, env.Client, hydrationController, nc)
nc = ExpectExists(ctx, env.Client, nc)
Expect(lo.Keys(nc.Labels)).ToNot(ContainElement(v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind())))
})
})
Loading

0 comments on commit 6df1a3c

Please sign in to comment.