-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathevent_handler.go
More file actions
152 lines (132 loc) · 5.35 KB
/
event_handler.go
File metadata and controls
152 lines (132 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/*
* SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller
import (
"context"
"time"
"github.com/NVIDIA/nodewright/operator/api/v1alpha1"
"github.com/NVIDIA/nodewright/operator/internal/dal"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
// globalReconcileKey is the single sentinel request that every relevant Skyhook
// and Node event collapses onto. The heavy reconcile path ignores req identity
// (it grabs the whole world), so routing all such events to one key lets the
// workqueue dedup a burst into one pass; paired with MaxConcurrentReconciles: 1
// it also guarantees only one heavy reconcile runs at a time. The Name is an
// arbitrary sentinel (see globalReconcileName) — not a namespace or object
// reference.
var globalReconcileKey = reconcile.Request{
NamespacedName: types.NamespacedName{Name: globalReconcileName},
}
// globalDelayHandler enqueues globalReconcileKey after a fixed delay on any
// watched event that is relevant to at least one Skyhook. A Skyhook event is
// always relevant; a Node event is relevant only if some Skyhook's node
// selector matches it — without that filter every kubelet heartbeat across the
// cluster would wake the heavy reconcile.
//
// We enqueue via AddAfter rather than a plain EnqueueRequestsFromMapFunc because
// the controller's priority queue only applies a delay on AddAfter/AddRateLimited:
// a plain Add lands the item in the ready tree immediately, leaving no window for
// the write storm a single pass produces across many Nodes to coalesce into one
// follow-up pass. The delay is that coalescing window.
type globalDelayHandler struct {
logger logr.Logger
dal dal.DAL
delay time.Duration
}
// force compiler to check that we implement the interface
var _ handler.EventHandler = &globalDelayHandler{}
func (h *globalDelayHandler) Create(ctx context.Context, evt event.TypedCreateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.enqueue(ctx, evt.Object, queue)
}
func (h *globalDelayHandler) Update(ctx context.Context, evt event.TypedUpdateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
// ignoring the old object: relevance is decided from current state.
h.enqueue(ctx, evt.ObjectNew, queue)
}
func (h *globalDelayHandler) Delete(ctx context.Context, evt event.TypedDeleteEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.enqueue(ctx, evt.Object, queue)
}
func (h *globalDelayHandler) Generic(ctx context.Context, evt event.TypedGenericEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.enqueue(ctx, evt.Object, queue)
}
func (h *globalDelayHandler) enqueue(ctx context.Context, object client.Object, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
relevant, err := h.relevant(ctx, object)
if err != nil {
// On error we skip rather than enqueue: a spurious global reconcile is
// cheap, but more importantly the next event (or the periodic resync)
// will retry, and we don't want a transient list error to amplify into
// a reconcile per event.
h.logger.Error(err, "error deciding event relevance for global reconcile",
"namespace", object.GetNamespace(),
"name", object.GetName(),
"kind", object.GetObjectKind())
return
}
if relevant {
queue.AddAfter(globalReconcileKey, h.delay)
}
}
// relevant reports whether an event on object should wake the heavy reconcile.
func (h *globalDelayHandler) relevant(ctx context.Context, object client.Object) (bool, error) {
switch obj := object.(type) {
case *v1alpha1.Skyhook:
return true, nil
case *corev1.Node:
list, err := h.dal.GetSkyhooks(ctx)
if err != nil {
return false, err
}
if list == nil {
return false, nil
}
return len(matchSelectors(list, obj.Labels)) > 0, nil
default:
return false, nil
}
}
// matchSelectors returns the Skyhooks whose node selector matches the given
// labels.
func matchSelectors(crs *v1alpha1.SkyhookList, lbs map[string]string) []types.NamespacedName {
ret := make([]types.NamespacedName, 0)
for _, cr := range crs.Items {
match := false
selector, err := metav1.LabelSelectorAsSelector(&cr.Spec.NodeSelector)
if err != nil {
match = true
}
if selector.Matches(labels.Set(lbs)) {
match = true
}
if match {
ret = append(ret, types.NamespacedName{
Name: cr.Name,
Namespace: cr.Namespace,
})
}
}
return ret
}