Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 70 additions & 95 deletions operator/internal/controller/event_handler.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
*
Expand All @@ -20,135 +20,110 @@ package controller

import (
"context"
"time"

"github.com/NVIDIA/nodewright/operator/api/v1alpha1"
"github.com/NVIDIA/nodewright/operator/internal/dal"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type eventHandler struct {
// globalReconcileKey is the single sentinel request that every relevant Skyhook
// and Node event collapses onto. The heavy reconcile path ignores req identity
// (it grabs the whole world), so routing all such events to one key lets the
// workqueue dedup a burst into one pass; paired with MaxConcurrentReconciles: 1
// it also guarantees only one heavy reconcile runs at a time. The Name is an
// arbitrary sentinel (see globalReconcileName) — not a namespace or object
// reference.
var globalReconcileKey = reconcile.Request{
NamespacedName: types.NamespacedName{Name: globalReconcileName},
}

// globalDelayHandler enqueues globalReconcileKey after a fixed delay on any
// watched event that is relevant to at least one Skyhook. A Skyhook event is
// always relevant; a Node event is relevant only if some Skyhook's node
// selector matches it — without that filter every kubelet heartbeat across the
// cluster would wake the heavy reconcile.
//
// We enqueue via AddAfter rather than a plain EnqueueRequestsFromMapFunc because
// the controller's priority queue only applies a delay on AddAfter/AddRateLimited:
// a plain Add lands the item in the ready tree immediately, leaving no window for
// the write storm a single pass produces across many Nodes to coalesce into one
// follow-up pass. The delay is that coalescing window.
type globalDelayHandler struct {
logger logr.Logger
dal dal.DAL
delay time.Duration
}

// force compiler to check that we implement the interface
var _ handler.EventHandler = &eventHandler{}

// the EventHandler interface
func (e *eventHandler) Create(ctx context.Context, event event.TypedCreateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {

matches, err := e.genericHandler(ctx, event.Object)
if err != nil {
e.logger.Error(err, "error handling create event",
"namespace", event.Object.GetNamespace(),
"name", event.Object.GetName(),
"kind", event.Object.GetObjectKind())
}
var _ handler.EventHandler = &globalDelayHandler{}

// not sure if actually we want to many or one
for _, match := range matches {
queue.Add(reconcile.Request{NamespacedName: match})
}
func (h *globalDelayHandler) Create(ctx context.Context, evt event.TypedCreateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.enqueue(ctx, evt.Object, queue)
}

func (e *eventHandler) Update(ctx context.Context, event event.TypedUpdateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {

// ignoring the old for now, might need to do some comparing to decided
// if we want to do something, but for now starting simple

matches, err := e.genericHandler(ctx, event.ObjectNew)
if err != nil {
e.logger.Error(err, "error handling update event",
"namespace", event.ObjectNew.GetNamespace(),
"name", event.ObjectNew.GetName(),
"kind", event.ObjectNew.GetObjectKind())
}

// not sure if actually we want to many or one
for _, match := range matches {
queue.Add(reconcile.Request{NamespacedName: match})
}
func (h *globalDelayHandler) Update(ctx context.Context, evt event.TypedUpdateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
// ignoring the old object: relevance is decided from current state.
h.enqueue(ctx, evt.ObjectNew, queue)
}

func (e *eventHandler) Delete(ctx context.Context, event event.TypedDeleteEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
matches, err := e.genericHandler(ctx, event.Object)
if err != nil {
e.logger.Error(err, "error handling delete event",
"namespace", event.Object.GetNamespace(),
"name", event.Object.GetName(),
"kind", event.Object.GetObjectKind())
}

// not sure if actually we want to many or one
for _, match := range matches {
queue.Add(reconcile.Request{NamespacedName: match})
}
func (h *globalDelayHandler) Delete(ctx context.Context, evt event.TypedDeleteEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.enqueue(ctx, evt.Object, queue)
}

// Generic not sure what Generic is, so just loging for now
func (e *eventHandler) Generic(ctx context.Context, event event.TypedGenericEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
func (h *globalDelayHandler) Generic(ctx context.Context, evt event.TypedGenericEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.enqueue(ctx, evt.Object, queue)
}

matches, err := e.genericHandler(ctx, event.Object)
func (h *globalDelayHandler) enqueue(ctx context.Context, object client.Object, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
relevant, err := h.relevant(ctx, object)
if err != nil {
e.logger.Error(err, "error handling generic event",
"namespace", event.Object.GetNamespace(),
"name", event.Object.GetName(),
"kind", event.Object.GetObjectKind())
// On error we skip rather than enqueue: a spurious global reconcile is
// cheap, but more importantly the next event (or the periodic resync)
// will retry, and we don't want a transient list error to amplify into
// a reconcile per event.
h.logger.Error(err, "error deciding event relevance for global reconcile",
"namespace", object.GetNamespace(),
"name", object.GetName(),
"kind", object.GetObjectKind())
return
}

// not sure if actually we want to many or one
for _, match := range matches {
queue.Add(reconcile.Request{NamespacedName: match})
if relevant {
queue.AddAfter(globalReconcileKey, h.delay)
}
}

// genericHandler should be able to handle most of the logic for all the different event types
func (e *eventHandler) genericHandler(ctx context.Context, object client.Object) ([]types.NamespacedName, error) {

list, err := e.dal.GetSkyhooks(ctx)
if err != nil {

return nil, err
}
if list == nil {
return nil, nil
}

var matches []types.NamespacedName
// kind := "unknown"
// relevant reports whether an event on object should wake the heavy reconcile.
func (h *globalDelayHandler) relevant(ctx context.Context, object client.Object) (bool, error) {
switch obj := object.(type) {
case *corev1.Pod:
// kind = "pod"
node, err := e.dal.GetNode(ctx, obj.Spec.NodeName)
case *v1alpha1.Skyhook:
return true, nil
case *corev1.Node:
list, err := h.dal.GetSkyhooks(ctx)
if err != nil {
return nil, err
return false, err
}
matches = matchSelectors(list, node.Labels)
case *corev1.Node:
// kind = "node"
matches = matchSelectors(list, obj.Labels)
if list == nil {
return false, nil
}
return len(matchSelectors(list, obj.Labels)) > 0, nil
default:
return false, nil
}

// e.logger.Info("Event Handler",
// "event_type", event_type,
// "match_labels", matches,
// "namespace", object.GetNamespace(),
// "name", object.GetName(),
// "kind", kind)

return matches, nil
}

// matchSelectors returns the Skyhooks whose node selector matches the given
// labels.
func matchSelectors(crs *v1alpha1.SkyhookList, lbs map[string]string) []types.NamespacedName {

ret := make([]types.NamespacedName, 0)
Expand All @@ -168,7 +143,7 @@ func matchSelectors(crs *v1alpha1.SkyhookList, lbs map[string]string) []types.Na
if match {
ret = append(ret, types.NamespacedName{
Name: cr.Name,
Namespace: cr.Namespace, // guessing always empty string
Namespace: cr.Namespace,
})
}
}
Expand Down
Loading
Loading