Skip to content

Commit 7d84bf7

Browse files
committed
WIP: Implement asynchronous watch registration
Previously, when a ResourceGraphDefinition had issues (e.g., missing RBAC permissions), the `Register()` method would block indefinitely on `cache.WaitForCacheSync()` while holding a global lock, causing all other RGDs to stall. This change makes registration non-blocking by moving the cache sync wait into a goroutine and using channel-based notification when sync completes.
1 parent f8a4eb4 commit 7d84bf7

24 files changed

+461
-30
lines changed

Makefile

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,18 @@ deploy-kind: ko ## Deploy kro to a kind cluster
269269
kubectl wait --for=condition=ready --timeout=1m pod -n kro-system -l app.kubernetes.io/component=controller
270270
$(KUBECTL) --context kind-${KIND_CLUSTER_NAME} get pods -A
271271

272+
.PHONY: deploy-kind-aggregation
273+
deploy-kind-aggregation: export KO_DOCKER_REPO=kind.local
274+
deploy-kind-aggregation: ko ## Deploy kro to a kind cluster with aggregation RBAC mode
275+
$(KIND) delete clusters ${KIND_CLUSTER_NAME} || true
276+
$(KIND) create cluster --name ${KIND_CLUSTER_NAME}
277+
$(KUBECTL) --context kind-$(KIND_CLUSTER_NAME) create namespace kro-system
278+
make install
279+
# Deploy with aggregation RBAC mode - requires ClusterRoles with rbac.kro.run/aggregate-to-controller label
280+
${HELM} template kro ./helm --namespace kro-system --set image.pullPolicy=Never --set image.ko=true --set config.allowCRDDeletion=true --set rbac.mode=aggregation | $(KO) apply -f -
281+
kubectl wait --for=condition=ready --timeout=1m pod -n kro-system -l app.kubernetes.io/component=controller
282+
$(KUBECTL) --context kind-${KIND_CLUSTER_NAME} get pods -A
283+
272284
.PHONY: ko-apply
273285
ko-apply: ko
274286
${HELM} template kro ./helm --namespace kro-system --set image.pullPolicy=Never --set image.ko=true | $(KO) apply -f -
@@ -288,4 +300,12 @@ test-e2e: chainsaw ## Run e2e tests
288300

289301
.PHONY: test-e2e-kind
290302
test-e2e-kind: deploy-kind
291-
make test-e2e
303+
make test-e2e
304+
305+
.PHONY: test-e2e-kind-aggregation
306+
test-e2e-kind-aggregation: deploy-kind-aggregation ## Run e2e tests with aggregation RBAC mode
307+
make test-e2e
308+
309+
.PHONY: test-e2e-non-blocking
310+
test-e2e-non-blocking: chainsaw ## Run the non-blocking RGD test (requires aggregation RBAC mode)
311+
$(CHAINSAW) test ./test/e2e/chainsaw/check-non-blocking-rgd

api/v1alpha1/resourcegraphdefinition_types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,9 @@ const (
179179
ResourceGraphDefinitionStateActive ResourceGraphDefinitionState = "Active"
180180
// ResourceGraphDefinitionStateInactive represents the inactive state of the resource graph definition
181181
ResourceGraphDefinitionStateInactive ResourceGraphDefinitionState = "Inactive"
182+
// ResourceGraphDefinitionStateInitializing represents the initializing state where the controller
183+
// is registered but waiting for the informer cache to sync.
184+
ResourceGraphDefinitionStateInitializing ResourceGraphDefinitionState = "Initializing"
182185
)
183186

184187
// ResourceGraphDefinitionStatus defines the observed state of ResourceGraphDefinition.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ require (
1717
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
1818
golang.org/x/sync v0.12.0
1919
golang.org/x/time v0.9.0
20-
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb
2120
k8s.io/api v0.34.1
2221
k8s.io/apiextensions-apiserver v0.31.0
2322
k8s.io/apimachinery v0.34.1
@@ -92,6 +91,7 @@ require (
9291
golang.org/x/text v0.23.0 // indirect
9392
golang.org/x/tools v0.28.0 // indirect
9493
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
94+
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect
9595
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect
9696
google.golang.org/protobuf v1.36.5 // indirect
9797
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect

pkg/controller/resourcegraphdefinition/controller.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ package resourcegraphdefinition
1717
import (
1818
"context"
1919
"errors"
20+
"fmt"
2021

2122
"github.com/go-logr/logr"
2223
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
2324
"k8s.io/apimachinery/pkg/api/meta"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2426
"k8s.io/apimachinery/pkg/types"
2527
ctrl "sigs.k8s.io/controller-runtime"
2628
"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -30,6 +32,7 @@ import (
3032
"sigs.k8s.io/controller-runtime/pkg/handler"
3133
"sigs.k8s.io/controller-runtime/pkg/predicate"
3234
"sigs.k8s.io/controller-runtime/pkg/reconcile"
35+
"sigs.k8s.io/controller-runtime/pkg/source"
3336

3437
"github.com/kubernetes-sigs/kro/api/v1alpha1"
3538
kroclient "github.com/kubernetes-sigs/kro/pkg/client"
@@ -95,6 +98,10 @@ func (r *ResourceGraphDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) e
9598
return log
9699
}
97100

101+
// Create a channel source to watch for sync events from the dynamic controller
102+
syncEventsChan := make(chan event.GenericEvent)
103+
go r.watchSyncEvents(syncEventsChan)
104+
98105
return ctrl.NewControllerManagedBy(mgr).
99106
Named("ResourceGraphDefinition").
100107
For(&v1alpha1.ResourceGraphDefinition{}).
@@ -120,9 +127,39 @@ func (r *ResourceGraphDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) e
120127
},
121128
}),
122129
).
130+
WatchesRawSource(
131+
source.Channel(syncEventsChan, &handler.EnqueueRequestForObject{}),
132+
).
123133
Complete(reconcile.AsReconciler[*v1alpha1.ResourceGraphDefinition](mgr.GetClient(), r))
124134
}
125135

136+
// watchSyncEvents listens for sync events from the dynamic controller and
137+
// converts them to generic events that trigger RGD reconciliation.
138+
func (r *ResourceGraphDefinitionReconciler) watchSyncEvents(eventsChan chan<- event.GenericEvent) {
139+
for gvr := range r.dynamicController.SyncEvents() {
140+
crdName := fmt.Sprintf("%s.%s", gvr.Resource, gvr.Group)
141+
142+
crd := &metav1.PartialObjectMetadata{}
143+
crd.SetGroupVersionKind(extv1.SchemeGroupVersion.WithKind("CustomResourceDefinition"))
144+
if err := r.Client.Get(context.Background(), client.ObjectKey{Name: crdName}, crd); err != nil {
145+
continue
146+
}
147+
148+
if !metadata.IsKROOwned(crd) {
149+
continue
150+
}
151+
rgdName, ok := crd.GetLabels()[metadata.ResourceGraphDefinitionNameLabel]
152+
if !ok {
153+
continue
154+
}
155+
156+
rgd := &v1alpha1.ResourceGraphDefinition{}
157+
rgd.SetName(rgdName)
158+
159+
eventsChan <- event.GenericEvent{Object: rgd}
160+
}
161+
}
162+
126163
// findRGDsForCRD returns a list of reconcile requests for the ResourceGraphDefinition
127164
// that owns the given CRD. It is used to trigger reconciliation when a CRD is updated.
128165
func (r *ResourceGraphDefinitionReconciler) findRGDsForCRD(ctx context.Context, obj client.Object) []reconcile.Request {

pkg/controller/resourcegraphdefinition/controller_reconcile.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,18 @@ func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinition(
7676
// TODO: the context that is passed here is tied to the reconciliation of the rgd, we might need to make
7777
// a new context with our own cancel function here to allow us to cleanly term the dynamic controller
7878
// rather than have it ignore this context and use the background context.
79-
if err := r.reconcileResourceGraphDefinitionMicroController(ctx, processedRGD, graphExecLabeler); err != nil {
79+
gvr := processedRGD.Instance.GetGroupVersionResource()
80+
if err := r.reconcileResourceGraphDefinitionMicroController(ctx, gvr, processedRGD, graphExecLabeler); err != nil {
8081
mark.ControllerFailedToStart(err.Error())
8182
return processedRGD.TopologicalOrder, resourcesInfo, err
8283
}
83-
mark.ControllerRunning()
84+
85+
// Set condition based on sync state
86+
if r.dynamicController.HasSynced(gvr) {
87+
mark.ControllerRunning()
88+
} else {
89+
mark.ControllerSyncing()
90+
}
8491

8592
return processedRGD.TopologicalOrder, resourcesInfo, nil
8693
}
@@ -168,6 +175,7 @@ func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionCRD(
168175
// reconcileResourceGraphDefinitionMicroController starts the microcontroller for handling the resources
169176
func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionMicroController(
170177
ctx context.Context,
178+
gvr schema.GroupVersionResource,
171179
processedRGD *graph.Graph,
172180
graphExecLabeler metadata.Labeler,
173181
) error {
@@ -179,7 +187,6 @@ func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionMicr
179187
controller := r.setupMicroController(processedRGD, graphExecLabeler)
180188

181189
ctrl.LoggerFrom(ctx).V(1).Info("reconciling resource graph definition micro controller")
182-
gvr := processedRGD.Instance.GetGroupVersionResource()
183190

184191
err := r.dynamicController.Register(ctx, gvr, controller.Reconcile, resourceGVRsToWatch...)
185192
if err != nil {

pkg/controller/resourcegraphdefinition/controller_status.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,12 @@ func (r *ResourceGraphDefinitionReconciler) updateStatus(
4040
log, _ := logr.FromContext(ctx)
4141
log.V(1).Info("calculating resource graph definition status and conditions")
4242

43-
// Set status.state.
44-
if rgdConditionTypes.For(o).IsRootReady() {
43+
// Set status.state based on conditions.
44+
cs := rgdConditionTypes.For(o)
45+
if cs.IsRootReady() {
4546
o.Status.State = v1alpha1.ResourceGraphDefinitionStateActive
47+
} else if isControllerSyncing(o) {
48+
o.Status.State = v1alpha1.ResourceGraphDefinitionStateInitializing
4649
} else {
4750
o.Status.State = v1alpha1.ResourceGraphDefinitionStateInactive
4851
}
@@ -164,7 +167,22 @@ func (m *ConditionsMarker) ControllerFailedToStart(msg string) {
164167
m.cs.SetFalse(ControllerReady, "FailedToStart", msg)
165168
}
166169

170+
// ControllerSyncing signals the microcontroller is registered but waiting for informer cache to sync.
171+
func (m *ConditionsMarker) ControllerSyncing() {
172+
m.cs.SetFalse(ControllerReady, "Syncing", "waiting for informer cache to sync")
173+
}
174+
167175
// ControllerRunning signals the microcontroller is up and running for this RGD-Kind.
168176
func (m *ConditionsMarker) ControllerRunning() {
169177
m.cs.SetTrueWithReason(ControllerReady, "Running", "controller is running")
170178
}
179+
180+
// isControllerSyncing returns true if the ControllerReady condition is False with reason "Syncing".
181+
func isControllerSyncing(o *v1alpha1.ResourceGraphDefinition) bool {
182+
for _, c := range o.Status.Conditions {
183+
if string(c.Type) == ControllerReady {
184+
return c.Reason != nil && *c.Reason == "Syncing"
185+
}
186+
}
187+
return false
188+
}

pkg/dynamiccontroller/dynamic_controller.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ type DynamicController struct {
199199
// queue is the work queue used to process items received via watches.
200200
// The queue is shared between all informers and is used to propagate events to the handlers.
201201
queue workqueue.TypedRateLimitingInterface[ObjectIdentifiers]
202+
203+
// syncedGVRs receives notifications when parent informers finish syncing.
204+
// This channel is passed to parent LazyInformers and exposed via SyncEvents().
205+
syncedGVRs chan schema.GroupVersionResource
202206
}
203207

204208
// NewDynamicController creates a new DynamicController.
@@ -221,7 +225,26 @@ func NewDynamicController(
221225
workqueue.NewTypedItemExponentialFailureRateLimiter[ObjectIdentifiers](config.MinRetryDelay, config.MaxRetryDelay),
222226
&workqueue.TypedBucketRateLimiter[ObjectIdentifiers]{Limiter: rate.NewLimiter(rate.Limit(config.RateLimit), config.BurstLimit)},
223227
), workqueue.TypedRateLimitingQueueConfig[ObjectIdentifiers]{Name: "dynamic-controller-queue"}),
228+
syncedGVRs: make(chan schema.GroupVersionResource, 100),
229+
}
230+
}
231+
232+
// SyncEvents returns a channel that receives GVRs when their informers finish syncing.
233+
// Consumers should use this to watch for GVR sync completion.
234+
func (dc *DynamicController) SyncEvents() <-chan schema.GroupVersionResource {
235+
return dc.syncedGVRs
236+
}
237+
238+
// HasSynced returns true if the informer for the given GVR has synced.
239+
func (dc *DynamicController) HasSynced(gvr schema.GroupVersionResource) bool {
240+
dc.mu.Lock()
241+
defer dc.mu.Unlock()
242+
243+
w, exists := dc.watches[gvr]
244+
if !exists {
245+
return false
224246
}
247+
return w.HasSynced()
225248
}
226249

227250
// Start starts workers and blocks until ctx.Done().
@@ -434,13 +457,14 @@ func (dc *DynamicController) Deregister(_ context.Context, parent schema.GroupVe
434457

435458
func (dc *DynamicController) ensureWatchLocked(
436459
gvr schema.GroupVersionResource,
460+
syncedCh chan<- schema.GroupVersionResource,
437461
) *internal.LazyInformer {
438462
if w, ok := dc.watches[gvr]; ok {
439463
return w
440464
}
441465

442466
// Create per-GVR watch wrapper (informer created lazily on first handler)
443-
w := internal.NewLazyInformer(dc.client, gvr, dc.config.ResyncPeriod, nil, dc.log)
467+
w := internal.NewLazyInformer(dc.client, gvr, dc.config.ResyncPeriod, nil, dc.log, syncedCh)
444468
dc.watches[gvr] = w
445469
return w
446470
}
@@ -471,8 +495,8 @@ func (dc *DynamicController) reconcileParentLocked(
471495
return nil
472496
}
473497

474-
// ensure watch
475-
w := dc.ensureWatchLocked(parent)
498+
// ensure watch - parent gets sync notifications
499+
w := dc.ensureWatchLocked(parent, dc.syncedGVRs)
476500

477501
// create handler if missing
478502
if reg.parentHandlerID == "" {
@@ -533,7 +557,7 @@ func (dc *DynamicController) reconcileChildrenLocked(
533557
if _, exists := reg.childHandlerIDs[child]; exists {
534558
continue
535559
}
536-
w := dc.ensureWatchLocked(child)
560+
w := dc.ensureWatchLocked(child, nil) // children don't need sync notifications
537561
childHandlerID := childHandlerID(parent, child)
538562
if err := w.AddHandler(dc.ctx, childHandlerID, dc.handlerForChildGVR(parent, child)); err != nil {
539563
return fmt.Errorf("add child handler %s: %w", child, err)

pkg/dynamiccontroller/dynamic_controller_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,11 @@ func TestInstanceUpdatePolicy(t *testing.T) {
349349
err := dc.Register(t.Context(), gvr, handlerFunc)
350350
assert.NoError(t, err)
351351

352+
// Wait for cache to sync (Register is now non-blocking)
353+
require.Eventually(t, func() bool {
354+
return dc.HasSynced(gvr)
355+
}, 5*time.Second, 10*time.Millisecond, "cache should sync")
356+
352357
// simulate reconciling the instances
353358
for dc.queue.Len() > 0 {
354359
item, _ := dc.queue.Get()
@@ -361,7 +366,10 @@ func TestInstanceUpdatePolicy(t *testing.T) {
361366
assert.NoError(t, err)
362367

363368
// check if the expected objects are queued
364-
assert.Equal(t, dc.queue.Len(), 2)
369+
// Note: On re-register, the existing 2 objects should be re-queued
370+
require.Eventually(t, func() bool {
371+
return dc.queue.Len() == 2
372+
}, 5*time.Second, 10*time.Millisecond, "expected 2 items in queue after re-register")
365373
for dc.queue.Len() > 0 {
366374
name, _ := dc.queue.Get()
367375
_, ok := objs[name.String()]

pkg/dynamiccontroller/internal/gvr_watch.go

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ type LazyInformer struct {
4444
done <-chan struct{}
4545
cancel context.CancelFunc
4646

47+
// syncedCh receives notification when the informer finishes syncing.
48+
// Only the GVR is sent - the receiver is responsible for any name mapping.
49+
syncedCh chan<- schema.GroupVersionResource
50+
4751
log logr.Logger
4852
}
4953

@@ -53,6 +57,7 @@ func NewLazyInformer(
5357
resync time.Duration,
5458
tweak metadatainformer.TweakListOptionsFunc,
5559
logger logr.Logger,
60+
syncedCh chan<- schema.GroupVersionResource,
5661
) *LazyInformer {
5762
li := &LazyInformer{
5863
gvr: gvr,
@@ -61,6 +66,7 @@ func NewLazyInformer(
6166
tweak: tweak,
6267
handlers: make(map[string]cache.ResourceEventHandlerRegistration),
6368
log: logger.WithValues("gvr", gvr.String()),
69+
syncedCh: syncedCh,
6470
}
6571
return li
6672
}
@@ -90,13 +96,14 @@ func (w *LazyInformer) ensureInformer() {
9096
}
9197

9298
// AddHandler registers a new event handler and starts the informer if needed.
99+
// This method is non-blocking - it starts the informer but does not wait for cache sync.
100+
// When the informer syncs, the GVR is sent to syncedCh (if set at construction).
93101
func (w *LazyInformer) AddHandler(ctx context.Context, id string, h cache.ResourceEventHandler) error {
94102
w.mu.Lock()
95103
defer w.mu.Unlock()
96104

97105
// If informer was fully stopped, reset context before re-creating it.
98106
if w.cancel == nil {
99-
// recreate context
100107
w.resetContext(ctx)
101108
}
102109

@@ -110,22 +117,37 @@ func (w *LazyInformer) AddHandler(ctx context.Context, id string, h cache.Resour
110117

111118
// Start informer if first handler
112119
if len(w.handlers) == 1 {
113-
go w.run()
114-
115-
if !cache.WaitForCacheSync(w.done, w.informer.HasSynced) {
116-
w.log.Error(fmt.Errorf("cache sync failed"), "lazy informer sync failure", "gvr", w.gvr)
117-
w.cancel()
118-
w.cancel = nil
119-
w.informer = nil
120-
w.handlers = make(map[string]cache.ResourceEventHandlerRegistration)
121-
return fmt.Errorf("failed to sync informer for %s", w.gvr)
122-
}
120+
// Capture values before releasing the lock to avoid races
121+
informer := w.informer
122+
done := w.done
123+
syncedCh := w.syncedCh
124+
gvr := w.gvr
125+
log := w.log
126+
go func() {
127+
// Start the informer
128+
go informer.Run(done)
129+
130+
// Wait for cache to sync and notify via channel
131+
if cache.WaitForCacheSync(done, informer.HasSynced) {
132+
if syncedCh != nil {
133+
select {
134+
case syncedCh <- gvr:
135+
case <-done:
136+
}
137+
}
138+
} else {
139+
log.Error(fmt.Errorf("cache sync failed"), "lazy informer sync failure", "gvr", gvr)
140+
}
141+
}()
123142
}
124143
return nil
125144
}
126145

127-
func (w *LazyInformer) run() {
128-
w.informer.Run(w.done)
146+
// HasSynced returns true if the informer cache has synced.
147+
func (w *LazyInformer) HasSynced() bool {
148+
w.mu.Lock()
149+
defer w.mu.Unlock()
150+
return w.informer != nil && w.informer.HasSynced()
129151
}
130152

131153
// RemoveHandler unregisters a handler. Returns true if informer was stopped.

0 commit comments

Comments
 (0)