Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nodeclaim): use PodScheduled's status for lastPodEventTime #2054

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/controllers/nodeclaim/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (c *Consolidation) Reconcile(ctx context.Context, nodePool *v1.NodePool, no
timeToCheck := lo.Ternary(!nodeClaim.Status.LastPodEventTime.IsZero(), nodeClaim.Status.LastPodEventTime.Time, initialized.LastTransitionTime.Time)

// Consider a node consolidatable by looking at the lastPodEvent status field on the nodeclaim.
// This time is now based on the PodScheduled condition's lastTransitionTime or pod is being removed(terminal or terminating)
if c.clock.Since(timeToCheck) < lo.FromPtr(nodePool.Spec.Disruption.ConsolidateAfter.Duration) {
if hasConsolidatableCondition {
_ = nodeClaim.StatusConditions().Clear(v1.ConditionTypeConsolidatable)
Expand Down
10 changes: 6 additions & 4 deletions pkg/controllers/nodeclaim/hydration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ import (
. "sigs.k8s.io/karpenter/pkg/utils/testing"
)

var ctx context.Context
var hydrationController *hydration.Controller
var env *test.Environment
var cloudProvider *fake.CloudProvider
var (
ctx context.Context
hydrationController *hydration.Controller
env *test.Environment
cloudProvider *fake.CloudProvider
)

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
Expand Down
65 changes: 46 additions & 19 deletions pkg/controllers/nodeclaim/podevents/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/samber/lo"

"sigs.k8s.io/karpenter/pkg/cloudprovider"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
podutils "sigs.k8s.io/karpenter/pkg/utils/pod"
)

// dedupeTimeout is 10 seconds to reduce the number of writes to the APIServer, since pod scheduling and deletion events are very frequent.
// The smaller this value is, the more writes Karpenter will make in a busy cluster. This timeout is intentionally smaller than the consolidation
// 15 second validation period, so that we can ensure that we invalidate consolidation commands that are decided while we're de-duping pod events.
const dedupeTimeout = 10 * time.Second

// Podevents is a nodeclaim controller that deletes adds the lastPodEvent status onto the nodeclaim
// Podevents is a nodeclaim controller that updates the lastPodEvent status based on PodScheduled condition
type Controller struct {
clock clock.Clock
kubeClient client.Client
Expand Down Expand Up @@ -82,12 +79,28 @@ func (c *Controller) Reconcile(ctx context.Context, pod *corev1.Pod) (reconcile.
}

stored := nc.DeepCopy()
// If we've set the lastPodEvent before and it hasn't been before the timeout, don't do anything
if !nc.Status.LastPodEventTime.Time.IsZero() && c.clock.Since(nc.Status.LastPodEventTime.Time) < dedupeTimeout {

var eventTime time.Time

// If pod is being removed (terminal or terminating), use current time
if podutils.IsTerminal(pod) || podutils.IsTerminating(pod) {
eventTime = c.clock.Now()
} else {
// Otherwise check for PodScheduled condition
if cond, ok := lo.Find(pod.Status.Conditions, func(c corev1.PodCondition) bool {
return c.Type == corev1.PodScheduled && c.Status == corev1.ConditionTrue
}); ok {
eventTime = cond.LastTransitionTime.Time
}
}

// If we don't have a valid time, skip
if eventTime.IsZero() {
return reconcile.Result{}, nil
}
// otherwise, set the pod event time to now
nc.Status.LastPodEventTime.Time = c.clock.Now()

// Update the lastPodEvent time
nc.Status.LastPodEventTime.Time = eventTime
if !equality.Semantic.DeepEqual(stored, nc) {
if err = c.kubeClient.Status().Patch(ctx, nc, client.MergeFrom(stored)); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
Expand All @@ -101,18 +114,32 @@ func (c *Controller) Register(ctx context.Context, m manager.Manager) error {
Named("nodeclaim.podevents").
For(&corev1.Pod{}).
WithEventFilter(predicate.TypedFuncs[client.Object]{
// If a pod is bound to a node or goes terminal
UpdateFunc: func(e event.TypedUpdateEvent[client.Object]) bool {
oldPod := (e.ObjectOld).(*corev1.Pod)
newPod := (e.ObjectNew).(*corev1.Pod)
// if this is a newly bound pod
bound := oldPod.Spec.NodeName == "" && newPod.Spec.NodeName != ""
// if this is a newly terminal pod
terminal := (newPod.Spec.NodeName != "" && !podutils.IsTerminal(oldPod) && podutils.IsTerminal(newPod))
// if this is a newly terminating pod
terminating := (newPod.Spec.NodeName != "" && !podutils.IsTerminating(oldPod) && podutils.IsTerminating(newPod))
// return true if it was bound to a node, went terminal, or went terminating
return bound || terminal || terminating

// Check for pod scheduling changes
oldCond, oldOk := lo.Find(oldPod.Status.Conditions, func(c corev1.PodCondition) bool {
return c.Type == corev1.PodScheduled
})
newCond, newOk := lo.Find(newPod.Status.Conditions, func(c corev1.PodCondition) bool {
return c.Type == corev1.PodScheduled
})

// Trigger on PodScheduled condition changes
if (!oldOk && newOk) || (oldOk && newOk && (oldCond.Status != newCond.Status || !oldCond.LastTransitionTime.Equal(&newCond.LastTransitionTime))) {
return true
}

// Trigger on pod removal (terminal or terminating)
if !podutils.IsTerminal(oldPod) && podutils.IsTerminal(newPod) {
return true
}
if !podutils.IsTerminating(oldPod) && podutils.IsTerminating(newPod) {
return true
}

return false
},
}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
Expand Down
101 changes: 80 additions & 21 deletions pkg/controllers/nodeclaim/podevents/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ import (
. "sigs.k8s.io/karpenter/pkg/utils/testing"
)

var ctx context.Context
var podEventsController *podevents.Controller
var env *test.Environment
var fakeClock *clock.FakeClock
var cp *fake.CloudProvider
var (
ctx context.Context
podEventsController *podevents.Controller
env *test.Environment
fakeClock *clock.FakeClock
cp *fake.CloudProvider
)

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
Expand Down Expand Up @@ -75,6 +77,7 @@ var _ = AfterEach(func() {
cp.Reset()
ExpectCleanedUp(ctx, env.Client)
})

var _ = Describe("PodEvents", func() {
var nodePool *v1.NodePool
var nodeClaim *v1.NodeClaim
Expand All @@ -98,13 +101,31 @@ var _ = Describe("PodEvents", func() {
NodeName: node.Name,
})
})
It("should set the nodeclaim lastPodEvent", func() {
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod)
timeToCheck := fakeClock.Now().Truncate(time.Second)
It("should set the nodeclaim lastPodEvent based on PodScheduled condition", func() {
scheduledTime := fakeClock.Now().Truncate(time.Second)
pod.Status.Conditions = []corev1.PodCondition{{
Type: corev1.PodScheduled,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: scheduledTime},
}}
ExpectApplied(ctx, env.Client, nodePool, node, nodeClaim, pod)
ExpectObjectReconciled(ctx, env.Client, podEventsController, pod)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Status.LastPodEventTime.Time).To(BeEquivalentTo(timeToCheck))
Expect(nodeClaim.Status.LastPodEventTime.Time).To(BeEquivalentTo(scheduledTime))

// Update the PodScheduled condition's lastTransitionTime
newScheduledTime := fakeClock.Now().Add(time.Minute).Truncate(time.Second)
pod.Status.Conditions = []corev1.PodCondition{{
Type: corev1.PodScheduled,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: newScheduledTime},
}}
ExpectApplied(ctx, env.Client, pod)
ExpectObjectReconciled(ctx, env.Client, podEventsController, pod)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Status.LastPodEventTime.Time).To(BeEquivalentTo(newScheduledTime))
})
It("should not set the nodeclaim lastPodEvent when the node does not exist", func() {
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, pod)
Expand All @@ -117,36 +138,74 @@ var _ = Describe("PodEvents", func() {
ExpectApplied(ctx, env.Client, nodePool, node, pod)
ExpectObjectReconciled(ctx, env.Client, podEventsController, pod)
})
It("should set the nodeclaim lastPodEvent when it's been set before", func() {
nodeClaim.Status.LastPodEventTime.Time = fakeClock.Now().Add(-5 * time.Minute)
It("should set the nodeclaim lastPodEvent when pod becomes terminal", func() {
// First set up a regular pod
scheduledTime := fakeClock.Now().Truncate(time.Second)
pod.Status.Conditions = []corev1.PodCondition{{
Type: corev1.PodScheduled,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: scheduledTime},
}}
ExpectApplied(ctx, env.Client, nodePool, node, nodeClaim, pod)
timeToCheck := fakeClock.Now().Truncate(time.Second)
ExpectObjectReconciled(ctx, env.Client, podEventsController, pod)

// Make pod terminal now
timeToCheck := fakeClock.Now().Add(time.Minute).Truncate(time.Second)
fakeClock.SetTime(timeToCheck)
pod.Status.Phase = corev1.PodSucceeded // Setting pod as terminal directly
ExpectApplied(ctx, env.Client, pod)
ExpectObjectReconciled(ctx, env.Client, podEventsController, pod)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Status.LastPodEventTime.Time).To(BeEquivalentTo(timeToCheck))
})
It("should only set the nodeclaim lastPodEvent once within the dedupe timeframe", func() {
It("should set the nodeclaim lastPodEvent when pod becomes terminating", func() {
ExpectApplied(ctx, env.Client, nodePool, node, nodeClaim, pod)
timeToCheck := fakeClock.Now().Truncate(time.Second)
ExpectObjectReconciled(ctx, env.Client, podEventsController, pod)

// Expect that the lastPodEventTime is set
timeToCheck := fakeClock.Now().Truncate(time.Second)
fakeClock.SetTime(timeToCheck)
// Make pod terminating by deleting it
ExpectDeletionTimestampSet(ctx, env.Client, pod)
// Reconcile for the terminating pod
ExpectObjectReconciled(ctx, env.Client, podEventsController, pod)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Status.LastPodEventTime.Time).To(BeEquivalentTo(timeToCheck))
})
It("should not update lastPodEvent when pod has no PodScheduled condition", func() {
// Pod with no conditions
ExpectApplied(ctx, env.Client, nodePool, node, nodeClaim, pod)
ExpectObjectReconciled(ctx, env.Client, podEventsController, pod)

// step through half of the dedupe timeout, and re-reconcile, expecting the status to not change
fakeClock.Step(5 * time.Second)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Status.LastPodEventTime.Time.IsZero()).To(BeTrue())
})
It("should not update lastPodEvent when PodScheduled condition is not True", func() {
pod.Status.Conditions = []corev1.PodCondition{{
Type: corev1.PodScheduled,
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
}}
ExpectApplied(ctx, env.Client, nodePool, node, nodeClaim, pod)
ExpectObjectReconciled(ctx, env.Client, podEventsController, pod)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Status.LastPodEventTime.Time).To(BeEquivalentTo(timeToCheck))
Expect(nodeClaim.Status.LastPodEventTime.Time.IsZero()).To(BeTrue())
})
It("should set the nodeclaim lastPodEvent when pod is already in a terminal state", func() {
// Setup time
timeToCheck := fakeClock.Now().Truncate(time.Second)
fakeClock.SetTime(timeToCheck)

// step through rest of the dedupe timeout, and re-reconcile, expecting the status to change
fakeClock.Step(5 * time.Second)
// Set the pod to a terminal state directly - mocks podutils.IsTerminal() return true
pod.Status.Phase = corev1.PodSucceeded

// Apply objects and reconcile
ExpectApplied(ctx, env.Client, nodePool, node, nodeClaim, pod)
ExpectObjectReconciled(ctx, env.Client, podEventsController, pod)

// Verify the last pod event time is set
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Status.LastPodEventTime.Time).ToNot(BeEquivalentTo(timeToCheck))
Expect(nodeClaim.Status.LastPodEventTime.Time).To(BeEquivalentTo(timeToCheck))
})
})