Skip to content

Commit

Permalink
remaining refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Dec 4, 2024
1 parent 52a0907 commit 6c8bccd
Show file tree
Hide file tree
Showing 17 changed files with 330 additions and 362 deletions.
6 changes: 3 additions & 3 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
metricspod "sigs.k8s.io/karpenter/pkg/controllers/metrics/pod"
"sigs.k8s.io/karpenter/pkg/controllers/node/health"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/eviction"
nodeclaimconsistency "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/consistency"
nodeclaimdisruption "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/disruption"
"sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/expiration"
Expand Down Expand Up @@ -64,7 +64,7 @@ func NewControllers(
) []controller.Controller {
cluster := state.NewCluster(clock, kubeClient, cloudProvider)
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster, clock)
evictionQueue := terminator.NewQueue(kubeClient, recorder)
evictionQueue := eviction.NewQueue(kubeClient, recorder)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)

controllers := []controller.Controller{
Expand All @@ -79,7 +79,7 @@ func NewControllers(
informer.NewPodController(kubeClient, cluster),
informer.NewNodePoolController(kubeClient, cloudProvider, cluster),
informer.NewNodeClaimController(kubeClient, cloudProvider, cluster),
termination.NewController(clock, kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder),
termination.NewController(clock, kubeClient, cloudProvider, recorder, evictionQueue),
metricspod.NewController(kubeClient, cluster),
metricsnodepool.NewController(kubeClient, cloudProvider),
metricsnode.NewController(cluster),
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/node/health/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/node/health"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/eviction"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
Expand All @@ -47,7 +47,7 @@ var env *test.Environment
var fakeClock *clock.FakeClock
var cloudProvider *fake.CloudProvider
var recorder *test.EventRecorder
var queue *terminator.Queue
var queue *eviction.Queue

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
Expand All @@ -65,7 +65,7 @@ var _ = BeforeSuite(func() {
cloudProvider = fake.NewCloudProvider()
cloudProvider = fake.NewCloudProvider()
recorder = test.NewEventRecorder()
queue = terminator.NewTestingQueue(env.Client, recorder)
queue = eviction.NewTestingQueue(env.Client, recorder)
healthController = health.NewController(env.Client, cloudProvider, fakeClock)
})

Expand Down
112 changes: 71 additions & 41 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

"github.com/samber/lo"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -41,7 +42,7 @@ import (

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/eviction"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/injection"
Expand All @@ -53,53 +54,60 @@ type Controller struct {
clock clock.Clock
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
terminator *terminator.Terminator
reconcilers []terminationReconciler
reconcilers []reconciler
}

// TODO (jmdeal@): Split subreconcilers into individual controllers
type terminationReconciler interface {
type reconciler interface {
Reconcile(context.Context, *corev1.Node, *v1.NodeClaim) (reconcile.Result, error)
}

// NewController constructs a controller instance
func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, terminator *terminator.Terminator, recorder events.Recorder) *Controller {
func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, recorder events.Recorder, evictionQueue *eviction.Queue) *Controller {
return &Controller{
clock: clk,
kubeClient: kubeClient,
cloudProvider: cloudProvider,
terminator: terminator,
reconcilers: []terminationReconciler{
&DrainReconciler{kubeClient, cloudProvider, recorder, terminator},
&VolumeDetachmentReconciler{kubeClient, clk, recorder},
&InstanceTerminationReconciler{kubeClient, cloudProvider, clk},
reconcilers: []reconciler{
&DrainReconciler{clk, kubeClient, cloudProvider, recorder, evictionQueue},
&VolumeDetachmentReconciler{clk, kubeClient, recorder},
&InstanceTerminationReconciler{clk, kubeClient, cloudProvider},
},
}
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("node.termination").
For(&corev1.Node{}, builder.WithPredicates(nodeutils.IsManagedPredicateFuncs(c.cloudProvider))).
Watches(&v1.NodeClaim{}, nodeutils.NodeClaimEventHandler(c.kubeClient, c.cloudProvider)).
WithOptions(
controller.Options{
RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request](
workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](100*time.Millisecond, 10*time.Second),
// 10 qps, 100 bucket size
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
MaxConcurrentReconciles: 100,
},
).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}

// nolint:gocyclo
func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "node.termination")
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(n.Namespace, n.Name)))
if !nodeutils.IsManaged(n, c.cloudProvider) {
return reconcile.Result{}, nil
}

if n.GetDeletionTimestamp().IsZero() {
return reconcile.Result{}, nil
}
if !controllerutil.ContainsFinalizer(n, v1.TerminationFinalizer) {
return reconcile.Result{}, nil
}

if err := c.terminator.Taint(ctx, n, v1.DisruptedNoScheduleTaint); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)
if !nodeutils.IsManaged(n, c.cloudProvider) {
return reconcile.Result{}, nil
}

nc, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, n)
if err != nil {
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) {
Expand All @@ -115,16 +123,55 @@ func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.R
}
}

if err := c.prepareNode(ctx, n); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)
}

for _, r := range c.reconcilers {
res, err := r.Reconcile(ctx, n, nc)
if res.Requeue || res.RequeueAfter != 0 || err != nil {
return res, err
}
}

return reconcile.Result{}, nil
}

// prepareNode ensures that the node is ready to begin the drain / termination process. This includes ensuring that it
// is tainted appropriately and annotated to be ignored by external load balancers.
func (c *Controller) prepareNode(ctx context.Context, n *corev1.Node) error {
stored := n.DeepCopy()

// Add the karpenter.sh/disrupted:NoSchedule taint to ensure no additional pods schedule to the Node during the
// drain process.
if !lo.ContainsBy(n.Spec.Taints, func(t corev1.Taint) bool {
return t.MatchTaint(&v1.DisruptedNoScheduleTaint)
}) {
n.Spec.Taints = append(n.Spec.Taints, v1.DisruptedNoScheduleTaint)
}
// Adding this label to the node ensures that the node is removed from the load-balancer target group while it is
// draining and before it is terminated. This prevents 500s coming prior to health check when the load balancer
// controller hasn't yet determined that the node and underlying connections are gone.
// https://github.com/aws/aws-node-termination-handler/issues/316
// https://github.com/aws/karpenter/pull/2518
n.Labels = lo.Assign(n.Labels, map[string]string{
corev1.LabelNodeExcludeBalancers: "karpenter",
})

if equality.Semantic.DeepEqual(n, stored) {
return nil
}
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch can cause races due
// to the fact that it fully replaces the list on a change. Here, we are updating the taint list.
return c.kubeClient.Patch(ctx, n, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{}))
}

// removeFinalizer removes Karpenter's termination finalizer from the given Node, updating the termination metrics in the process.
func removeFinalizer(ctx context.Context, kubeClient client.Client, n *corev1.Node) error {
stored := n.DeepCopy()
controllerutil.RemoveFinalizer(n, v1.TerminationFinalizer)
Expand Down Expand Up @@ -153,20 +200,3 @@ func removeFinalizer(ctx context.Context, kubeClient client.Client, n *corev1.No
}
return nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("node.termination").
For(&corev1.Node{}, builder.WithPredicates(nodeutils.IsManagedPredicateFuncs(c.cloudProvider))).
WithOptions(
controller.Options{
RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request](
workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](100*time.Millisecond, 10*time.Second),
// 10 qps, 100 bucket size
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
MaxConcurrentReconciles: 100,
},
).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}
Loading

0 comments on commit 6c8bccd

Please sign in to comment.