Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] refactor: split termination controller #1837

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@ const (
NodePoolHashAnnotationKey = apis.Group + "/nodepool-hash"
NodePoolHashVersionAnnotationKey = apis.Group + "/nodepool-hash-version"
NodeClaimTerminationTimestampAnnotationKey = apis.Group + "/nodeclaim-termination-timestamp"
HydrationAnnotationKey = apis.Group + "/hydrated-by"
)

// Karpenter specific finalizers
const (
TerminationFinalizer = apis.Group + "/termination"
DrainFinalizer = apis.Group + "/drain-protection"
VolumeFinalizer = apis.Group + "/volume-protection"
)

var (
Expand Down Expand Up @@ -102,6 +105,13 @@ var (
v1.LabelInstanceType: v1.LabelInstanceTypeStable,
v1.LabelFailureDomainBetaRegion: v1.LabelTopologyRegion,
}

// HydratedFinailzers contains the finalizers which must be applied to a resource by the hydration contrller. If
// the resource is not hydrated, Karpenter can not depend on the lack of one of these finalizers as a signal.
HydratedFinailzers = sets.New(
DrainFinalizer,
VolumeFinalizer,
)
)

// IsRestrictedLabel returns an error if the label is restricted.
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/v1/nodeclaim_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
ConditionTypeInitialized = "Initialized"
ConditionTypeConsolidatable = "Consolidatable"
ConditionTypeDrifted = "Drifted"
ConditionTypeDrained = "Drained"
ConditionTypeVolumesDetached = "VolumesDetached"
ConditionTypeInstanceTerminating = "InstanceTerminating"
ConditionTypeConsistentStateFound = "ConsistentStateFound"
)
Expand Down
12 changes: 8 additions & 4 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ import (
metricspod "sigs.k8s.io/karpenter/pkg/controllers/metrics/pod"
"sigs.k8s.io/karpenter/pkg/controllers/node/health"
nodehydration "sigs.k8s.io/karpenter/pkg/controllers/node/hydration"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/drain"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/eviction"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/instancetermination"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/volumedetachment"
nodeclaimconsistency "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/consistency"
nodeclaimdisruption "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/disruption"
"sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/expiration"
Expand Down Expand Up @@ -68,7 +70,7 @@ func NewControllers(
) []controller.Controller {
cluster := state.NewCluster(clock, kubeClient, cloudProvider)
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster, clock)
evictionQueue := terminator.NewQueue(kubeClient, recorder)
evictionQueue := eviction.NewQueue(kubeClient, recorder)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)

controllers := []controller.Controller{
Expand All @@ -83,7 +85,9 @@ func NewControllers(
informer.NewPodController(kubeClient, cluster),
informer.NewNodePoolController(kubeClient, cloudProvider, cluster),
informer.NewNodeClaimController(kubeClient, cloudProvider, cluster),
termination.NewController(clock, kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder),
drain.NewController(clock, kubeClient, cloudProvider, recorder, evictionQueue),
volumedetachment.NewController(clock, kubeClient, cloudProvider, recorder),
instancetermination.NewController(clock, kubeClient, cloudProvider),
metricspod.NewController(kubeClient, cluster),
metricsnodepool.NewController(kubeClient, cloudProvider),
metricsnode.NewController(cluster),
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/node/health/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/node/health"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/eviction"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
Expand All @@ -48,7 +48,7 @@ var env *test.Environment
var fakeClock *clock.FakeClock
var cloudProvider *fake.CloudProvider
var recorder *test.EventRecorder
var queue *terminator.Queue
var queue *eviction.Queue

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
Expand All @@ -66,7 +66,7 @@ var _ = BeforeSuite(func() {
cloudProvider = fake.NewCloudProvider()
cloudProvider = fake.NewCloudProvider()
recorder = test.NewEventRecorder()
queue = terminator.NewTestingQueue(env.Client, recorder)
queue = eviction.NewTestingQueue(env.Client, recorder)
healthController = health.NewController(env.Client, cloudProvider, fakeClock, recorder)
})

Expand Down
55 changes: 39 additions & 16 deletions pkg/controllers/node/hydration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -35,6 +36,7 @@ import (

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/operator"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
Expand All @@ -54,10 +56,33 @@ func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudPr
}
}

func (c *Controller) Name() string {
return "node.hydration"
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named(c.Name()).
For(&corev1.Node{}).
Watches(&v1.NodeClaim{}, nodeutils.NodeClaimEventHandler(c.kubeClient, c.cloudProvider)).
WithOptions(controller.Options{
RateLimiter: reasonable.RateLimiter(),
MaxConcurrentReconciles: 1000,
}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}

func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, c.Name())
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(n.Namespace, n.Name)))

if nodeutils.IsHydrated(n) {
return reconcile.Result{}, nil
}
n.Annotations = lo.Assign(n.Annotations, map[string]string{
v1.HydrationAnnotationKey: operator.Version,
})

nc, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, n)
if err != nil {
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) {
Expand All @@ -70,29 +95,27 @@ func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.R
}

stored := n.DeepCopy()
n.Labels = lo.Assign(n.Labels, map[string]string{
v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()): nc.Spec.NodeClassRef.Name,
})
c.hydrateNodeClassLabel(n, nc)
c.hydrateFinalizers(n, nc)
if !equality.Semantic.DeepEqual(stored, n) {
if err := c.kubeClient.Patch(ctx, n, client.MergeFrom(stored)); err != nil {
if err := c.kubeClient.Patch(ctx, n, client.StrategicMergeFrom(stored)); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
}
return reconcile.Result{}, nil
}

func (c *Controller) Name() string {
return "node.hydration"
func (c *Controller) hydrateNodeClassLabel(n *corev1.Node, nc *v1.NodeClaim) {
n.Labels = lo.Assign(n.Labels, map[string]string{
v1.NodeClassLabelKey(nc.Spec.NodeClassRef.GroupKind()): nc.Spec.NodeClassRef.Name,
})
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named(c.Name()).
For(&corev1.Node{}).
Watches(&v1.NodeClaim{}, nodeutils.NodeClaimEventHandler(c.kubeClient)).
WithOptions(controller.Options{
RateLimiter: reasonable.RateLimiter(),
MaxConcurrentReconciles: 1000,
}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
func (c *Controller) hydrateFinalizers(n *corev1.Node, nc *v1.NodeClaim) {
if !nc.StatusConditions().Get(v1.ConditionTypeDrained).IsTrue() {
controllerutil.AddFinalizer(n, v1.DrainFinalizer)
}
if !nc.StatusConditions().Get(v1.ConditionTypeVolumesDetached).IsTrue() {
controllerutil.AddFinalizer(n, v1.VolumeFinalizer)
}
}
83 changes: 58 additions & 25 deletions pkg/controllers/node/hydration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ import (
. "github.com/onsi/gomega"
"github.com/samber/lo"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"sigs.k8s.io/karpenter/pkg/apis"
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/node/hydration"
"sigs.k8s.io/karpenter/pkg/operator"
"sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
Expand Down Expand Up @@ -64,36 +69,64 @@ var _ = AfterEach(func() {
})

var _ = Describe("Hydration", func() {
var nodeClaim *v1.NodeClaim
var node *corev1.Node

BeforeEach(func() {
nodeClaim, node = test.NodeClaimAndNode(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{v1.HydrationAnnotationKey: "not-hydrated"},
},
})
})

It("should hydrate the NodeClass label", func() {
delete(node.Labels, v1.NodeClassLabelKey(nodeClaim.Spec.NodeClassRef.GroupKind()))
ExpectApplied(ctx, env.Client, nodeClaim, node)
ExpectObjectReconciled(ctx, env.Client, hydrationController, node)
node = ExpectExists(ctx, env.Client, node)
value := node.Labels[v1.NodeClassLabelKey(nodeClaim.Spec.NodeClassRef.GroupKind())]
Expect(value).To(Equal(nodeClaim.Spec.NodeClassRef.Name))
})
DescribeTable(
"Hydration",
func(isNodeClaimManaged bool) {
nodeClassRef := lo.Ternary(isNodeClaimManaged, &v1.NodeClassReference{
Group: "karpenter.test.sh",
Kind: "TestNodeClass",
Name: "default",
}, &v1.NodeClassReference{
Group: "karpenter.test.sh",
Kind: "UnmanagedNodeClass",
Name: "default",
})
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Spec: v1.NodeClaimSpec{
NodeClassRef: nodeClassRef,
},
})
delete(node.Labels, v1.NodeClassLabelKey(nodeClassRef.GroupKind()))
"Finalizers",
func(nodeClaimConditions []string, expectedFinailzers []string) {
for _, cond := range nodeClaimConditions {
nodeClaim.StatusConditions().SetTrue(cond)
}
ExpectApplied(ctx, env.Client, nodeClaim, node)
ExpectObjectReconciled(ctx, env.Client, hydrationController, node)

// The missing NodeClass label should have been propagated to the Node
node = ExpectExists(ctx, env.Client, node)
value, ok := node.Labels[v1.NodeClassLabelKey(nodeClassRef.GroupKind())]
Expect(ok).To(Equal(isNodeClaimManaged))
if isNodeClaimManaged {
Expect(value).To(Equal(nodeClassRef.Name))
Expect(len(node.Finalizers)).To(Equal(len(expectedFinailzers)))
for _, finalizer := range expectedFinailzers {
Expect(controllerutil.ContainsFinalizer(node, finalizer))
}
},
Entry("should hydrate missing metadata onto the Node", true),
Entry("should ignore Nodes which aren't managed by this Karpenter instance", false),
Entry("should hydrate all finalizers when none of the requisite status conditions are true", nil, []string{v1.DrainFinalizer, v1.VolumeFinalizer}),
Entry("should hydrate the volume finalizer when only the drain status condition is true", []string{v1.ConditionTypeDrained}, []string{v1.VolumeFinalizer}),
Entry("should hydrate the drain finalizer when only the volume status condition is true", []string{v1.ConditionTypeVolumesDetached}, []string{v1.VolumeFinalizer}),
Entry("shouldn't hydrate finalizers when all requisite conditions are true", []string{v1.ConditionTypeDrained, v1.ConditionTypeVolumesDetached}, nil),
)
It("shouldn't hydrate nodes which have already been hydrated", func() {
node.Annotations[v1.HydrationAnnotationKey] = operator.Version
delete(node.Labels, v1.NodeClassLabelKey(nodeClaim.Spec.NodeClassRef.GroupKind()))
ExpectApplied(ctx, env.Client, nodeClaim, node)
ExpectObjectReconciled(ctx, env.Client, hydrationController, node)
node = ExpectExists(ctx, env.Client, node)
Expect(lo.Keys(node.Labels)).ToNot(ContainElement(v1.NodeClassLabelKey(nodeClaim.Spec.NodeClassRef.GroupKind())))
Expect(len(node.Finalizers)).To(Equal(0))
})
It("shouldn't hydrate nodes which are not managed by this instance of Karpenter", func() {
nodeClaim.Spec.NodeClassRef = &v1.NodeClassReference{
Group: "karpenter.test.sh",
Kind: "UnmanagedNodeClass",
Name: "default",
}
delete(node.Labels, v1.NodeClassLabelKey(nodeClaim.Spec.NodeClassRef.GroupKind()))
ExpectApplied(ctx, env.Client, nodeClaim, node)
ExpectObjectReconciled(ctx, env.Client, hydrationController, node)
node = ExpectExists(ctx, env.Client, node)
Expect(lo.Keys(node.Labels)).ToNot(ContainElement(v1.NodeClassLabelKey(nodeClaim.Spec.NodeClassRef.GroupKind())))
Expect(len(node.Finalizers)).To(Equal(0))
})
})
Loading
Loading