diff --git a/pkg/controllers/nodeclaim/disruption/consolidation.go b/pkg/controllers/nodeclaim/disruption/consolidation.go index 1e993129fd..e9e99dc5e2 100644 --- a/pkg/controllers/nodeclaim/disruption/consolidation.go +++ b/pkg/controllers/nodeclaim/disruption/consolidation.go @@ -60,6 +60,7 @@ func (c *Consolidation) Reconcile(ctx context.Context, nodePool *v1.NodePool, no timeToCheck := lo.Ternary(!nodeClaim.Status.LastPodEventTime.IsZero(), nodeClaim.Status.LastPodEventTime.Time, initialized.LastTransitionTime.Time) // Consider a node consolidatable by looking at the lastPodEvent status field on the nodeclaim. + // This time is now based on the PodScheduled condition's lastTransitionTime or pod is being removed(terminal or terminating) if c.clock.Since(timeToCheck) < lo.FromPtr(nodePool.Spec.Disruption.ConsolidateAfter.Duration) { if hasConsolidatableCondition { _ = nodeClaim.StatusConditions().Clear(v1.ConditionTypeConsolidatable) diff --git a/pkg/controllers/nodeclaim/hydration/suite_test.go b/pkg/controllers/nodeclaim/hydration/suite_test.go index 559eee4061..e49c7812c4 100644 --- a/pkg/controllers/nodeclaim/hydration/suite_test.go +++ b/pkg/controllers/nodeclaim/hydration/suite_test.go @@ -35,10 +35,12 @@ import ( . "sigs.k8s.io/karpenter/pkg/utils/testing" ) -var ctx context.Context -var hydrationController *hydration.Controller -var env *test.Environment -var cloudProvider *fake.CloudProvider +var ( + ctx context.Context + hydrationController *hydration.Controller + env *test.Environment + cloudProvider *fake.CloudProvider +) func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) diff --git a/pkg/controllers/nodeclaim/podevents/controller.go b/pkg/controllers/nodeclaim/podevents/controller.go index 803195505d..289fce6bfb 100644 --- a/pkg/controllers/nodeclaim/podevents/controller.go +++ b/pkg/controllers/nodeclaim/podevents/controller.go @@ -32,18 +32,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/samber/lo" + "sigs.k8s.io/karpenter/pkg/cloudprovider" nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim" podutils "sigs.k8s.io/karpenter/pkg/utils/pod" ) -// dedupeTimeout is 10 seconds to reduce the number of writes to the APIServer, since pod scheduling and deletion events are very frequent. -// The smaller this value is, the more writes Karpenter will make in a busy cluster. This timeout is intentionally smaller than the consolidation -// 15 second validation period, so that we can ensure that we invalidate consolidation commands that are decided while we're de-duping pod events. -const dedupeTimeout = 10 * time.Second - -// Podevents is a nodeclaim controller that deletes adds the lastPodEvent status onto the nodeclaim +// Podevents is a nodeclaim controller that updates the lastPodEvent status based on PodScheduled condition type Controller struct { clock clock.Clock kubeClient client.Client @@ -82,12 +79,28 @@ func (c *Controller) Reconcile(ctx context.Context, pod *corev1.Pod) (reconcile. } stored := nc.DeepCopy() - // If we've set the lastPodEvent before and it hasn't been before the timeout, don't do anything - if !nc.Status.LastPodEventTime.Time.IsZero() && c.clock.Since(nc.Status.LastPodEventTime.Time) < dedupeTimeout { + + var eventTime time.Time + + // If pod is being removed (terminal or terminating), use current time + if podutils.IsTerminal(pod) || podutils.IsTerminating(pod) { + eventTime = c.clock.Now() + } else { + // Otherwise check for PodScheduled condition + if cond, ok := lo.Find(pod.Status.Conditions, func(c corev1.PodCondition) bool { + return c.Type == corev1.PodScheduled && c.Status == corev1.ConditionTrue + }); ok { + eventTime = cond.LastTransitionTime.Time + } + } + + // If we don't have a valid time, skip + if eventTime.IsZero() { return reconcile.Result{}, nil } - // otherwise, set the pod event time to now - nc.Status.LastPodEventTime.Time = c.clock.Now() + + // Update the lastPodEvent time + nc.Status.LastPodEventTime.Time = eventTime if !equality.Semantic.DeepEqual(stored, nc) { if err = c.kubeClient.Status().Patch(ctx, nc, client.MergeFrom(stored)); err != nil { return reconcile.Result{}, client.IgnoreNotFound(err) @@ -101,18 +114,32 @@ func (c *Controller) Register(ctx context.Context, m manager.Manager) error { Named("nodeclaim.podevents"). For(&corev1.Pod{}). WithEventFilter(predicate.TypedFuncs[client.Object]{ - // If a pod is bound to a node or goes terminal UpdateFunc: func(e event.TypedUpdateEvent[client.Object]) bool { oldPod := (e.ObjectOld).(*corev1.Pod) newPod := (e.ObjectNew).(*corev1.Pod) - // if this is a newly bound pod - bound := oldPod.Spec.NodeName == "" && newPod.Spec.NodeName != "" - // if this is a newly terminal pod - terminal := (newPod.Spec.NodeName != "" && !podutils.IsTerminal(oldPod) && podutils.IsTerminal(newPod)) - // if this is a newly terminating pod - terminating := (newPod.Spec.NodeName != "" && !podutils.IsTerminating(oldPod) && podutils.IsTerminating(newPod)) - // return true if it was bound to a node, went terminal, or went terminating - return bound || terminal || terminating + + // Check for pod scheduling changes + oldCond, oldOk := lo.Find(oldPod.Status.Conditions, func(c corev1.PodCondition) bool { + return c.Type == corev1.PodScheduled + }) + newCond, newOk := lo.Find(newPod.Status.Conditions, func(c corev1.PodCondition) bool { + return c.Type == corev1.PodScheduled + }) + + // Trigger on PodScheduled condition changes + if (!oldOk && newOk) || (oldOk && newOk && (oldCond.Status != newCond.Status || !oldCond.LastTransitionTime.Equal(&newCond.LastTransitionTime))) { + return true + } + + // Trigger on pod removal (terminal or terminating) + if !podutils.IsTerminal(oldPod) && podutils.IsTerminal(newPod) { + return true + } + if !podutils.IsTerminating(oldPod) && podutils.IsTerminating(newPod) { + return true + } + + return false }, }). Complete(reconcile.AsReconciler(m.GetClient(), c)) diff --git a/pkg/controllers/nodeclaim/podevents/suite_test.go b/pkg/controllers/nodeclaim/podevents/suite_test.go index 7ac206c4dd..5fb89303a3 100644 --- a/pkg/controllers/nodeclaim/podevents/suite_test.go +++ b/pkg/controllers/nodeclaim/podevents/suite_test.go @@ -38,11 +38,13 @@ import ( . "sigs.k8s.io/karpenter/pkg/utils/testing" ) -var ctx context.Context -var podEventsController *podevents.Controller -var env *test.Environment -var fakeClock *clock.FakeClock -var cp *fake.CloudProvider +var ( + ctx context.Context + podEventsController *podevents.Controller + env *test.Environment + fakeClock *clock.FakeClock + cp *fake.CloudProvider +) func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) @@ -75,6 +77,7 @@ var _ = AfterEach(func() { cp.Reset() ExpectCleanedUp(ctx, env.Client) }) + var _ = Describe("PodEvents", func() { var nodePool *v1.NodePool var nodeClaim *v1.NodeClaim @@ -98,13 +101,31 @@ var _ = Describe("PodEvents", func() { NodeName: node.Name, }) }) - It("should set the nodeclaim lastPodEvent", func() { - ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod) - timeToCheck := fakeClock.Now().Truncate(time.Second) + It("should set the nodeclaim lastPodEvent based on PodScheduled condition", func() { + scheduledTime := fakeClock.Now().Truncate(time.Second) + pod.Status.Conditions = []corev1.PodCondition{{ + Type: corev1.PodScheduled, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: scheduledTime}, + }} + ExpectApplied(ctx, env.Client, nodePool, node, nodeClaim, pod) ExpectObjectReconciled(ctx, env.Client, podEventsController, pod) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) - Expect(nodeClaim.Status.LastPodEventTime.Time).To(BeEquivalentTo(timeToCheck)) + Expect(nodeClaim.Status.LastPodEventTime.Time).To(BeEquivalentTo(scheduledTime)) + + // Update the PodScheduled condition's lastTransitionTime + newScheduledTime := fakeClock.Now().Add(time.Minute).Truncate(time.Second) + pod.Status.Conditions = []corev1.PodCondition{{ + Type: corev1.PodScheduled, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: newScheduledTime}, + }} + ExpectApplied(ctx, env.Client, pod) + ExpectObjectReconciled(ctx, env.Client, podEventsController, pod) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.Status.LastPodEventTime.Time).To(BeEquivalentTo(newScheduledTime)) }) It("should not set the nodeclaim lastPodEvent when the node does not exist", func() { ExpectApplied(ctx, env.Client, nodePool, nodeClaim, pod) @@ -117,36 +138,74 @@ var _ = Describe("PodEvents", func() { ExpectApplied(ctx, env.Client, nodePool, node, pod) ExpectObjectReconciled(ctx, env.Client, podEventsController, pod) }) - It("should set the nodeclaim lastPodEvent when it's been set before", func() { - nodeClaim.Status.LastPodEventTime.Time = fakeClock.Now().Add(-5 * time.Minute) + It("should set the nodeclaim lastPodEvent when pod becomes terminal", func() { + // First set up a regular pod + scheduledTime := fakeClock.Now().Truncate(time.Second) + pod.Status.Conditions = []corev1.PodCondition{{ + Type: corev1.PodScheduled, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: scheduledTime}, + }} ExpectApplied(ctx, env.Client, nodePool, node, nodeClaim, pod) - timeToCheck := fakeClock.Now().Truncate(time.Second) + ExpectObjectReconciled(ctx, env.Client, podEventsController, pod) + + // Make pod terminal now + timeToCheck := fakeClock.Now().Add(time.Minute).Truncate(time.Second) + fakeClock.SetTime(timeToCheck) + pod.Status.Phase = corev1.PodSucceeded // Setting pod as terminal directly + ExpectApplied(ctx, env.Client, pod) ExpectObjectReconciled(ctx, env.Client, podEventsController, pod) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) Expect(nodeClaim.Status.LastPodEventTime.Time).To(BeEquivalentTo(timeToCheck)) }) - It("should only set the nodeclaim lastPodEvent once within the dedupe timeframe", func() { + It("should set the nodeclaim lastPodEvent when pod becomes terminating", func() { ExpectApplied(ctx, env.Client, nodePool, node, nodeClaim, pod) - timeToCheck := fakeClock.Now().Truncate(time.Second) ExpectObjectReconciled(ctx, env.Client, podEventsController, pod) - // Expect that the lastPodEventTime is set + timeToCheck := fakeClock.Now().Truncate(time.Second) + fakeClock.SetTime(timeToCheck) + // Make pod terminating by deleting it + ExpectDeletionTimestampSet(ctx, env.Client, pod) + // Reconcile for the terminating pod + ExpectObjectReconciled(ctx, env.Client, podEventsController, pod) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) Expect(nodeClaim.Status.LastPodEventTime.Time).To(BeEquivalentTo(timeToCheck)) + }) + It("should not update lastPodEvent when pod has no PodScheduled condition", func() { + // Pod with no conditions + ExpectApplied(ctx, env.Client, nodePool, node, nodeClaim, pod) + ExpectObjectReconciled(ctx, env.Client, podEventsController, pod) - // step through half of the dedupe timeout, and re-reconcile, expecting the status to not change - fakeClock.Step(5 * time.Second) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.Status.LastPodEventTime.Time.IsZero()).To(BeTrue()) + }) + It("should not update lastPodEvent when PodScheduled condition is not True", func() { + pod.Status.Conditions = []corev1.PodCondition{{ + Type: corev1.PodScheduled, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, + }} + ExpectApplied(ctx, env.Client, nodePool, node, nodeClaim, pod) ExpectObjectReconciled(ctx, env.Client, podEventsController, pod) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) - Expect(nodeClaim.Status.LastPodEventTime.Time).To(BeEquivalentTo(timeToCheck)) + Expect(nodeClaim.Status.LastPodEventTime.Time.IsZero()).To(BeTrue()) + }) + It("should set the nodeclaim lastPodEvent when pod is already in a terminal state", func() { + // Setup time + timeToCheck := fakeClock.Now().Truncate(time.Second) + fakeClock.SetTime(timeToCheck) - // step through rest of the dedupe timeout, and re-reconcile, expecting the status to change - fakeClock.Step(5 * time.Second) + // Set the pod to a terminal state directly - mocks podutils.IsTerminal() return true + pod.Status.Phase = corev1.PodSucceeded + + // Apply objects and reconcile + ExpectApplied(ctx, env.Client, nodePool, node, nodeClaim, pod) ExpectObjectReconciled(ctx, env.Client, podEventsController, pod) + // Verify the last pod event time is set nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) - Expect(nodeClaim.Status.LastPodEventTime.Time).ToNot(BeEquivalentTo(timeToCheck)) + Expect(nodeClaim.Status.LastPodEventTime.Time).To(BeEquivalentTo(timeToCheck)) }) })