Skip to content

Commit 7a039f9

Browse files
committed
watch origin:kcp related resources and sync changes to service cluster
1 parent 2db6919 commit 7a039f9

File tree

5 files changed

+189
-1
lines changed

5 files changed

+189
-1
lines changed

internal/controller/sync/controller.go

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,14 @@ import (
3939
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4040
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
4141
"k8s.io/apimachinery/pkg/labels"
42+
"k8s.io/apimachinery/pkg/runtime/schema"
4243
"k8s.io/apimachinery/pkg/types"
44+
"k8s.io/apimachinery/pkg/util/sets"
45+
"k8s.io/client-go/util/workqueue"
4346
"k8s.io/utils/ptr"
4447
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
48+
"sigs.k8s.io/controller-runtime/pkg/cluster"
49+
"sigs.k8s.io/controller-runtime/pkg/event"
4550
"sigs.k8s.io/controller-runtime/pkg/handler"
4651
"sigs.k8s.io/controller-runtime/pkg/manager"
4752
"sigs.k8s.io/controller-runtime/pkg/predicate"
@@ -67,6 +72,41 @@ type Reconciler struct {
6772
localCRD *apiextensionsv1.CustomResourceDefinition
6873
stateNamespace string
6974
agentName string
75+
relatedIndex *sync.RelatedObjectIndex
76+
}
77+
78+
// relatedResourceEventHandler enqueues the primary object whenever a kcp-origin
79+
// related resource changes. It uses the RelatedObjectIndex to reverse-map the
80+
// changed related object back to its owning primary object.
81+
type relatedResourceEventHandler struct {
82+
clusterName string
83+
relatedIndex *sync.RelatedObjectIndex
84+
group string
85+
resource string
86+
}
87+
88+
func (h *relatedResourceEventHandler) Create(_ context.Context, evt event.TypedCreateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
89+
h.enqueue(evt.Object, q)
90+
}
91+
92+
func (h *relatedResourceEventHandler) Update(_ context.Context, evt event.TypedUpdateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
93+
h.enqueue(evt.ObjectNew, q)
94+
}
95+
96+
func (h *relatedResourceEventHandler) Delete(_ context.Context, evt event.TypedDeleteEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
97+
h.enqueue(evt.Object, q)
98+
}
99+
100+
func (h *relatedResourceEventHandler) Generic(_ context.Context, evt event.TypedGenericEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
101+
h.enqueue(evt.Object, q)
102+
}
103+
104+
func (h *relatedResourceEventHandler) enqueue(obj *unstructured.Unstructured, q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
105+
req, ok := h.relatedIndex.Get(h.clusterName, h.group, h.resource, obj.GetNamespace(), obj.GetName())
106+
if !ok {
107+
return
108+
}
109+
q.Add(req)
70110
}
71111

72112
// Create creates a new controller and importantly does *not* add it to the manager,
@@ -120,6 +160,7 @@ func Create(
120160
stateNamespace: stateNamespace,
121161
agentName: agentName,
122162
localCRD: localCRD,
163+
relatedIndex: sync.NewRelatedObjectIndex(),
123164
}
124165

125166
ctrlOptions := mccontroller.Options{
@@ -161,6 +202,59 @@ func Create(
161202
return nil, fmt.Errorf("failed to setup local-side watch: %w", err)
162203
}
163204

205+
// Watch origin:kcp related resources in the virtual workspace so that changes
206+
// to them trigger reconciliation of their owning primary object.
207+
watchedGVKs := sets.New[schema.GroupVersionKind]()
208+
for _, relRes := range pubRes.Spec.Related {
209+
if relRes.Origin != syncagentv1alpha1.RelatedResourceOriginKcp {
210+
continue
211+
}
212+
213+
gvr := schema.GroupVersionResource{
214+
Group: relRes.Group,
215+
Version: relRes.Version,
216+
Resource: relRes.Resource,
217+
}
218+
219+
// Use the local REST mapper to determine the Kind. Core resources (ConfigMap,
220+
// Secret, …) and CRDs installed on the service cluster are covered by this.
221+
gvk, err := localManager.GetRESTMapper().KindFor(gvr)
222+
if err != nil {
223+
log.Warnw("failed to determine Kind for origin:kcp related resource, skipping watch", "gvr", gvr, "error", err)
224+
continue
225+
}
226+
227+
// Deduplicate: only set up one watch per GVK.
228+
if watchedGVKs.Has(gvk) {
229+
continue
230+
}
231+
232+
watchedGVKs.Insert(gvk)
233+
234+
relatedDummy := &unstructured.Unstructured{}
235+
relatedDummy.SetGroupVersionKind(gvk)
236+
237+
group := relRes.Group
238+
resource := relRes.Resource
239+
240+
enqueueForRelated := mchandler.TypedEventHandlerFunc[*unstructured.Unstructured, mcreconcile.Request](
241+
func(clusterName string, _ cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] {
242+
return &relatedResourceEventHandler{
243+
clusterName: clusterName,
244+
relatedIndex: reconciler.relatedIndex,
245+
group: group,
246+
resource: resource,
247+
}
248+
},
249+
)
250+
251+
if err := c.MultiClusterWatch(mcsource.TypedKind(relatedDummy, enqueueForRelated)); err != nil {
252+
return nil, fmt.Errorf("failed to setup watch for origin:kcp related resource %v: %w", gvk, err)
253+
}
254+
255+
log.Infow("Set up watch for origin:kcp related resource", "gvk", gvk)
256+
}
257+
164258
log.Info("Done setting up unmanaged controller.")
165259

166260
return c, nil
@@ -226,7 +320,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, request mcreconcile.Request)
226320
}
227321

228322
// sync main object
229-
syncer, err := sync.NewResourceSyncer(log, r.localClient, vwClient, r.pubRes, r.localCRD, mutation.NewMutator, r.stateNamespace, r.agentName)
323+
syncer, err := sync.NewResourceSyncer(log, r.localClient, vwClient, r.pubRes, r.localCRD, mutation.NewMutator, r.stateNamespace, r.agentName, r.relatedIndex)
230324
if err != nil {
231325
recorder.Event(remoteObj, corev1.EventTypeWarning, "ReconcilingError", "Failed to process object: a provider-side issue has occurred.")
232326
return reconcile.Result{}, fmt.Errorf("failed to create syncer: %w", err)

internal/sync/related_index.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
Copyright 2025 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package sync
18+
19+
import (
20+
gosync "sync"
21+
22+
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"
23+
)
24+
25+
type relatedObjectKey struct {
26+
cluster string
27+
group string
28+
resource string
29+
namespace string
30+
name string
31+
}
32+
33+
// RelatedObjectIndex maps kcp-side related objects back to their owning primary objects.
34+
// It is populated during reconciliation and used by watch handlers to trigger
35+
// reconciliation when a related resource changes in kcp.
36+
type RelatedObjectIndex struct {
37+
mu gosync.RWMutex
38+
index map[relatedObjectKey]mcreconcile.Request
39+
}
40+
41+
// NewRelatedObjectIndex creates a new empty RelatedObjectIndex.
42+
func NewRelatedObjectIndex() *RelatedObjectIndex {
43+
return &RelatedObjectIndex{
44+
index: make(map[relatedObjectKey]mcreconcile.Request),
45+
}
46+
}
47+
48+
// Set stores the mapping from a related object to its owning primary object.
49+
func (i *RelatedObjectIndex) Set(cluster, group, resource, namespace, name string, primary mcreconcile.Request) {
50+
i.mu.Lock()
51+
defer i.mu.Unlock()
52+
i.index[relatedObjectKey{cluster, group, resource, namespace, name}] = primary
53+
}
54+
55+
// Get looks up the primary object that owns the given related object.
56+
func (i *RelatedObjectIndex) Get(cluster, group, resource, namespace, name string) (mcreconcile.Request, bool) {
57+
i.mu.RLock()
58+
defer i.mu.RUnlock()
59+
req, ok := i.index[relatedObjectKey{cluster, group, resource, namespace, name}]
60+
return req, ok
61+
}

internal/sync/syncer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ type ResourceSyncer struct {
5454

5555
agentName string
5656

57+
// relatedIndex is populated during reconciliation to enable watches on origin:kcp
58+
// related resources to trigger primary object reconciliation.
59+
relatedIndex *RelatedObjectIndex
60+
5761
// newObjectStateStore is used for testing purposes
5862
newObjectStateStore newObjectStateStoreFunc
5963
}
@@ -69,6 +73,7 @@ func NewResourceSyncer(
6973
mutatorCreator MutatorCreatorFunc,
7074
stateNamespace string,
7175
agentName string,
76+
relatedIndex *RelatedObjectIndex,
7277
) (*ResourceSyncer, error) {
7378
// create a dummy that represents the type used on the local service cluster
7479
localGVK, err := projection.PublishedResourceSourceGVK(localCRD, pubRes)
@@ -127,6 +132,7 @@ func NewResourceSyncer(
127132
primaryMutator: primaryMutator,
128133
relatedMutators: relatedMutators,
129134
agentName: agentName,
135+
relatedIndex: relatedIndex,
130136
newObjectStateStore: newKubernetesStateStoreCreator(stateNamespace),
131137
}, nil
132138
}

internal/sync/syncer_related.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import (
3838
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3939
"k8s.io/apimachinery/pkg/types"
4040
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
41+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
42+
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"
4143
)
4244

4345
func (s *ResourceSyncer) processRelatedResources(ctx context.Context, log *zap.SugaredLogger, stateStore ObjectStateStore, remote, local syncSide, primaryDeleting bool) (requeue bool, err error) {
@@ -91,6 +93,29 @@ func (s *ResourceSyncer) processRelatedResource(ctx context.Context, log *zap.Su
9193
return false, nil
9294
}
9395

96+
// Populate the reverse index for origin:kcp related resources so that watches
97+
// on those resources can trigger reconciliation of the owning primary object.
98+
if relRes.Origin == syncagentv1alpha1.RelatedResourceOriginKcp && s.relatedIndex != nil {
99+
for _, resolved := range resolvedObjects {
100+
s.relatedIndex.Set(
101+
string(remote.clusterName),
102+
relRes.Group,
103+
relRes.Resource,
104+
resolved.original.GetNamespace(),
105+
resolved.original.GetName(),
106+
mcreconcile.Request{
107+
ClusterName: string(remote.clusterName),
108+
Request: reconcile.Request{
109+
NamespacedName: types.NamespacedName{
110+
Namespace: remote.object.GetNamespace(),
111+
Name: remote.object.GetName(),
112+
},
113+
},
114+
},
115+
)
116+
}
117+
}
118+
94119
slices.SortStableFunc(resolvedObjects, func(a, b resolvedObject) int {
95120
aKey := ctrlruntimeclient.ObjectKeyFromObject(a.original).String()
96121
bKey := ctrlruntimeclient.ObjectKeyFromObject(b.original).String()

internal/sync/syncer_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -923,6 +923,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
923923
},
924924
stateNamespace,
925925
"textor-the-doctor",
926+
nil,
926927
)
927928
if err != nil {
928929
t.Fatalf("Failed to create syncer: %v", err)
@@ -1231,6 +1232,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
12311232
},
12321233
stateNamespace,
12331234
"textor-the-doctor",
1235+
nil,
12341236
)
12351237
if err != nil {
12361238
t.Fatalf("Failed to create syncer: %v", err)

0 commit comments

Comments
 (0)