Skip to content

Commit

Permalink
Merge branch 'kubernetes-sigs:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
DerekFrank authored Mar 6, 2025
2 parents b476e5e + af2b998 commit 0ebd3ed
Show file tree
Hide file tree
Showing 24 changed files with 676 additions and 35 deletions.
6 changes: 6 additions & 0 deletions kwok/charts/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ spec:
- type
type: object
type: array
nodeClassObservedGeneration:
description: |-
NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
format: int64
type: integer
resources:
additionalProperties:
anyOf:
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,12 @@ spec:
- type
type: object
type: array
nodeClassObservedGeneration:
description: |-
NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
format: int64
type: integer
resources:
additionalProperties:
anyOf:
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/v1/nodepool_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@ const (
ConditionTypeValidationSucceeded = "ValidationSucceeded"
// ConditionTypeNodeClassReady = "NodeClassReady" condition indicates that underlying nodeClass was resolved and is reporting as Ready
ConditionTypeNodeClassReady = "NodeClassReady"
// ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy" condition indicates if a misconfiguration exists that is preventing successful node launch/registrations that requires manual investigation
ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy"
)

// NodePoolStatus defines the observed state of NodePool
type NodePoolStatus struct {
// Resources is the list of resources that have been provisioned.
// +optional
Resources v1.ResourceList `json:"resources,omitempty"`
// NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
// the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
// +optional
NodeClassObservedGeneration int64 `json:"nodeClassObservedGeneration,omitempty"`
// Conditions contains signals for health and readiness
// +optional
Conditions []status.Condition `json:"conditions,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
nodepoolcounter "sigs.k8s.io/karpenter/pkg/controllers/nodepool/counter"
nodepoolhash "sigs.k8s.io/karpenter/pkg/controllers/nodepool/hash"
nodepoolreadiness "sigs.k8s.io/karpenter/pkg/controllers/nodepool/readiness"
nodepoolregistrationhealth "sigs.k8s.io/karpenter/pkg/controllers/nodepool/registrationhealth"
nodepoolvalidation "sigs.k8s.io/karpenter/pkg/controllers/nodepool/validation"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
"sigs.k8s.io/karpenter/pkg/controllers/state"
Expand Down Expand Up @@ -88,6 +89,7 @@ func NewControllers(
metricsnodepool.NewController(kubeClient, cloudProvider),
metricsnode.NewController(cluster),
nodepoolreadiness.NewController(kubeClient, cloudProvider),
nodepoolregistrationhealth.NewController(kubeClient, cloudProvider),
nodepoolcounter.NewController(kubeClient, cloudProvider, cluster),
nodepoolvalidation.NewController(kubeClient, cloudProvider),
podevents.NewController(clock, kubeClient, cloudProvider),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (c *Controller) MarkDisrupted(ctx context.Context, m Method, candidates ...
return client.IgnoreNotFound(err)
}
stored := nodeClaim.DeepCopy()
nodeClaim.StatusConditions().SetTrueWithReason(v1.ConditionTypeDisruptionReason, v1.ConditionTypeDisruptionReason, string(m.Reason()))
nodeClaim.StatusConditions().SetTrueWithReason(v1.ConditionTypeDisruptionReason, string(m.Reason()), string(m.Reason()))
return client.IgnoreNotFound(c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFrom(stored)))
})...)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/node/termination/terminator/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
"sigs.k8s.io/karpenter/pkg/events"
)

func EvictPod(pod *corev1.Pod, message string) events.Event {
func EvictPod(pod *corev1.Pod, reason string) events.Event {
return events.Event{
InvolvedObject: pod,
Type: corev1.EventTypeNormal,
Reason: events.Evicted,
Message: "Evicted pod: " + message,
Message: "Evicted pod: " + reason,
DedupeValues: []string{pod.Name},
}
}
Expand Down
19 changes: 7 additions & 12 deletions pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,6 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) {
// Evict returns true if successful eviction call, and false if there was an eviction-related error
func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Pod", klog.KRef(key.Namespace, key.Name)))
evictionMessage, err := evictionReason(ctx, key, q.kubeClient)
if err != nil {
// XXX(cmcavoy): this should be unreachable, but we log it if it happens
log.FromContext(ctx).V(1).Error(err, "failed looking up pod eviction reason")
}
if err := q.kubeClient.SubResource("eviction").Create(ctx,
&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: key.Namespace, Name: key.Name}},
&policyv1.Eviction{
Expand Down Expand Up @@ -214,18 +209,18 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
return false
}
NodesEvictionRequestsTotal.Inc(map[string]string{CodeLabel: "200"})
q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}, evictionMessage))
q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}, evictionReason(ctx, key, q.kubeClient)))
return true
}

func evictionReason(ctx context.Context, key QueueKey, kubeClient client.Client) (string, error) {
func evictionReason(ctx context.Context, key QueueKey, kubeClient client.Client) string {
nodeClaim, err := node.NodeClaimForNode(ctx, kubeClient, &corev1.Node{Spec: corev1.NodeSpec{ProviderID: key.providerID}})
if err != nil {
return "", err
log.FromContext(ctx).V(1).Error(err, "node has no nodeclaim, failed looking up pod eviction reason")
return ""
}
terminationCondition := nodeClaim.StatusConditions().Get(v1.ConditionTypeDisruptionReason)
if terminationCondition.IsTrue() {
return terminationCondition.Message, nil
if cond := nodeClaim.StatusConditions().Get(v1.ConditionTypeDisruptionReason); cond.IsTrue() {
return cond.Reason
}
return "Forceful Termination", nil
return "Forceful Termination"
}
20 changes: 19 additions & 1 deletion pkg/controllers/nodeclaim/lifecycle/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,24 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (re

//nolint:gocyclo
func (c *Controller) finalize(ctx context.Context, nodeClaim *v1.NodeClaim) (reconcile.Result, error) {
// setting the deletion timestamp will bump the generation, so we need to
// perform a no-op for whatever the status condition is currently set to
// so that we bump the observed generation to the latest and prevent the nodeclaim
// root status from entering an `Unknown` state
stored := nodeClaim.DeepCopy()
for _, condition := range nodeClaim.Status.Conditions {
if nodeClaim.StatusConditions().IsDependentCondition(condition.Type) {
nodeClaim.StatusConditions().Set(condition)
}
}
if !equality.Semantic.DeepEqual(stored, nodeClaim) {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
}
if !controllerutil.ContainsFinalizer(nodeClaim, v1.TerminationFinalizer) {
return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -226,7 +244,7 @@ func (c *Controller) finalize(ctx context.Context, nodeClaim *v1.NodeClaim) (rec
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
})
}
stored := nodeClaim.DeepCopy() // The NodeClaim may have been modified in the EnsureTerminated function
stored = nodeClaim.DeepCopy() // The NodeClaim may have been modified in the EnsureTerminated function
controllerutil.RemoveFinalizer(nodeClaim, v1.TerminationFinalizer)
if !equality.Semantic.DeepEqual(stored, nodeClaim) {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
Expand Down
43 changes: 41 additions & 2 deletions pkg/controllers/nodeclaim/lifecycle/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/log"

"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -51,6 +56,12 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco
if ttl := registrationTTL - l.clock.Since(registered.LastTransitionTime.Time); ttl > 0 {
return reconcile.Result{RequeueAfter: ttl}, nil
}
if err := l.updateNodePoolRegistrationHealth(ctx, nodeClaim); client.IgnoreNotFound(err) != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
// Delete the NodeClaim if we believe the NodeClaim won't register since we haven't seen the node
if err := l.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
Expand All @@ -61,6 +72,34 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: nodeClaim.Labels[v1.CapacityTypeLabelKey],
})

return reconcile.Result{}, nil
}

// updateNodePoolRegistrationHealth sets the NodeRegistrationHealthy=False
// on the NodePool if the nodeClaim fails to launch/register
func (l *Liveness) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) error {
nodePoolName := nodeClaim.Labels[v1.NodePoolLabelKey]
if nodePoolName != "" {
nodePool := &v1.NodePool{}
if err := l.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return err
}
if nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown() {
stored := nodePool.DeepCopy()
// If the nodeClaim failed to register during the TTL set NodeRegistrationHealthy status condition on
// NodePool to False. If the launch failed get the launch failure reason and message from nodeClaim.
if launchCondition := nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched); launchCondition.IsTrue() {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "RegistrationFailed", "Failed to register node")
} else {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, launchCondition.Reason, launchCondition.Message)
}
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := l.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
return err
}
}
}
return nil
}
61 changes: 61 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package lifecycle_test
import (
"time"

"github.com/awslabs/operatorpkg/status"

operatorpkg "github.com/awslabs/operatorpkg/test/expectations"
. "github.com/onsi/ginkgo/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -78,6 +81,12 @@ var _ = Describe("Liveness", func() {
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
if isManagedNodeClaim {
ExpectNotFound(ctx, env.Client, nodeClaim)
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionFalse,
Reason: "RegistrationFailed",
Message: "Failed to register node",
})
} else {
ExpectExists(ctx, env.Client, nodeClaim)
}
Expand Down Expand Up @@ -138,6 +147,58 @@ var _ = Describe("Liveness", func() {
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionFalse,
Reason: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Reason,
Message: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Message,
})
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectNotFound(ctx, env.Client, nodeClaim)
})
It("should not update NodeRegistrationHealthy status condition if it is already set to True", func() {
nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy)
nodeClaim := test.NodeClaim(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
},
},
Spec: v1.NodeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourcePods: resource.MustParse("5"),
fake.ResourceGPUVendorA: resource.MustParse("1"),
},
},
},
})
cloudProvider.AllowedCreateCalls = 0 // Don't allow Create() calls to succeed
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)

// NodeClaim registration failed, but we should not update the NodeRegistrationHealthy status condition if it is already True
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{Type: v1.ConditionTypeNodeRegistrationHealthy, Status: metav1.ConditionTrue})
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectNotFound(ctx, env.Client, nodeClaim)
})
It("should not block on updating NodeRegistrationHealthy status condition if nodeClaim is not owned by a nodePool", func() {
nodeClaim := test.NodeClaim()
cloudProvider.AllowedCreateCalls = 0 // Don't allow Create() calls to succeed
ExpectApplied(ctx, env.Client, nodeClaim)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
Expand Down
30 changes: 30 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/types"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -83,9 +85,37 @@ func (r *Registration) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (
metrics.NodesCreatedTotal.Inc(map[string]string{
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
})
if err := r.updateNodePoolRegistrationHealth(ctx, nodeClaim); client.IgnoreNotFound(err) != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

// updateNodePoolRegistrationHealth sets the NodeRegistrationHealthy=True
// on the NodePool if the nodeClaim that registered is owned by a NodePool
func (r *Registration) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) error {
nodePoolName := nodeClaim.Labels[v1.NodePoolLabelKey]
if nodePoolName != "" {
nodePool := &v1.NodePool{}
if err := r.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return err
}
stored := nodePool.DeepCopy()
if nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := r.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
return err
}
}
}
return nil
}

func (r *Registration) syncNode(ctx context.Context, nodeClaim *v1.NodeClaim, node *corev1.Node) error {
stored := node.DeepCopy()
controllerutil.AddFinalizer(node, v1.TerminationFinalizer)
Expand Down
Loading

0 comments on commit 0ebd3ed

Please sign in to comment.