Skip to content

Commit 8204640

Browse files
authored
Watch and sync changes to related resources (#149)
* Watch and sync changes to related resources change copyright year of related handlers On-behalf-of: SAP <iskren.pertov@sap.com> Signed-off-by: Iskren Petrov <iskren@kubermatic.com> Watch and sync changes to related resources * resolve Christoph's comments fix linter issue fix verify ci
1 parent 2db6919 commit 8204640

File tree

9 files changed

+526
-0
lines changed

9 files changed

+526
-0
lines changed

deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,70 @@ spec:
778778
Version is the API version of the related resource. This can be left blank to automatically
779779
use the preferred version.
780780
type: string
781+
watch:
782+
description: |-
783+
Watch configures how the agent identifies the owning primary object when a related
784+
resource with origin: kcp changes. When set, the agent sets up a watch on the related
785+
resource type and uses the configured rule to enqueue the correct primary object.
786+
Without this field, changes to origin:kcp related resources do not trigger reconciliation.
787+
properties:
788+
byOwner:
789+
description: |-
790+
ByOwner configures the watch handler to inspect the OwnerReferences of the changed
791+
object. When an OwnerReference with the given Kind is found, the referenced owner
792+
is enqueued as the primary object.
793+
type: object
794+
bySelector:
795+
description: |-
796+
BySelector configures the watch handler to list primary objects matching the given label
797+
selector. When a related object changes, all primary objects matching this selector
798+
are enqueued for reconciliation.
799+
properties:
800+
matchExpressions:
801+
description: matchExpressions is a list of label selector requirements. The requirements are ANDed.
802+
items:
803+
description: |-
804+
A label selector requirement is a selector that contains values, a key, and an operator that
805+
relates the key and values.
806+
properties:
807+
key:
808+
description: key is the label key that the selector applies to.
809+
type: string
810+
operator:
811+
description: |-
812+
operator represents a key's relationship to a set of values.
813+
Valid operators are In, NotIn, Exists and DoesNotExist.
814+
type: string
815+
values:
816+
description: |-
817+
values is an array of string values. If the operator is In or NotIn,
818+
the values array must be non-empty. If the operator is Exists or DoesNotExist,
819+
the values array must be empty. This array is replaced during a strategic
820+
merge patch.
821+
items:
822+
type: string
823+
type: array
824+
x-kubernetes-list-type: atomic
825+
required:
826+
- key
827+
- operator
828+
type: object
829+
type: array
830+
x-kubernetes-list-type: atomic
831+
matchLabels:
832+
additionalProperties:
833+
type: string
834+
description: |-
835+
matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels
836+
map is equivalent to an element of matchExpressions, whose key field is "key", the
837+
operator is "In", and the values array contains only "value". The requirements are ANDed.
838+
type: object
839+
type: object
840+
x-kubernetes-map-type: atomic
841+
type: object
842+
x-kubernetes-validations:
843+
- message: exactly one of byOwner or bySelector must be set
844+
rule: has(self.byOwner) != has(self.bySelector)
781845
required:
782846
- identifier
783847
- object

hack/tools.checksums

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
boilerplate|GOARCH=amd64;GOOS=linux|6f05fc3be207ae2ed99e125509a08df677cb007e197e16607c654a434b91d47f
2+
boilerplate|GOARCH=arm64;GOOS=darwin|3ac82c58f440ac8461746674e39311ba332d6d960966a060dd3be734b1111522
23
boilerplate|GOARCH=arm64;GOOS=linux|70253486ed7a803a35a9abb2bab4db2f1f7748d5266bf7a1c2ee298fda2b208a
34
etcd|GOARCH=amd64;GOOS=linux|435d74510f3216bab1932fb6d7a6b5fe8245301143fcd25f7e65dfb7dcf8904a
45
etcd|GOARCH=arm64;GOOS=linux|cc8c645e5a8df0f35f2a5c51d9b9383037eef0cf0167c52e648457b3971a7a09
56
gimps|GOARCH=amd64;GOOS=linux|b597efc7e2c72097a44c001b41a06ccca97610963e1f1aec74c3d99c0e0b6c11
67
gimps|GOARCH=arm64;GOOS=linux|2588daec997b4f4b3a8d8875f780fd6faf3c39c933519e7899e19a686476c8e4
78
golangci-lint|GOARCH=amd64;GOOS=linux|8a01a08dad47a14824d7d0f14af07c7144105fc079386c9c31fbe85f08f91643
9+
golangci-lint|GOARCH=arm64;GOOS=darwin|5fd0b6a09353eb0101d3ae81d5e3cf4707b77210c66fb92ae152d7280d959419
810
golangci-lint|GOARCH=arm64;GOOS=linux|2ed9cf2ad070dabc7947ba34cdc5142910be830306f063719898bc8fb44a7074
911
kube-apiserver|GOARCH=amd64;GOOS=linux|ca822082ec39e54a25836a4011ddb66e482e317a7a4f1a1f73882bbd2cf5a2a1
1012
kube-apiserver|GOARCH=arm64;GOOS=linux|6ade6c2646e2c01fde1095407452afc2b65e89d6da16da29ee39f6223ccaf63b
1113
kubectl|GOARCH=amd64;GOOS=linux|9591f3d75e1581f3f7392e6ad119aab2f28ae7d6c6e083dc5d22469667f27253
1214
kubectl|GOARCH=arm64;GOOS=linux|95df604e914941f3172a93fa8feeb1a1a50f4011dfbe0c01e01b660afc8f9b85
1315
yq|GOARCH=amd64;GOOS=linux|0c2b24e645b57d8e7c0566d18643a6d4f5580feeea3878127354a46f2a1e4598
16+
yq|GOARCH=arm64;GOOS=darwin|164e10e5f7df62990e4f3823205e7ea42ba5660523a428df07c7386c0b62e3d9
1417
yq|GOARCH=arm64;GOOS=linux|9477ac3cc447b6c083986129e35af8122eb2b938fe55c9c3e40436fb966e5813

internal/controller/sync/controller.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,16 @@ import (
3636

3737
corev1 "k8s.io/api/core/v1"
3838
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
39+
"k8s.io/apimachinery/pkg/api/meta"
3940
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4041
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
4142
"k8s.io/apimachinery/pkg/labels"
43+
"k8s.io/apimachinery/pkg/runtime/schema"
4244
"k8s.io/apimachinery/pkg/types"
45+
"k8s.io/apimachinery/pkg/util/sets"
4346
"k8s.io/utils/ptr"
4447
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
48+
"sigs.k8s.io/controller-runtime/pkg/cluster"
4549
"sigs.k8s.io/controller-runtime/pkg/handler"
4650
"sigs.k8s.io/controller-runtime/pkg/manager"
4751
"sigs.k8s.io/controller-runtime/pkg/predicate"
@@ -161,11 +165,191 @@ func Create(
161165
return nil, fmt.Errorf("failed to setup local-side watch: %w", err)
162166
}
163167

168+
if err := setupRelatedResourceWatches(c, localManager, remoteManager, pubRes, localDummy, remoteDummy, log); err != nil {
169+
return nil, err
170+
}
171+
164172
log.Info("Done setting up unmanaged controller.")
165173

166174
return c, nil
167175
}
168176

177+
// setupRelatedResourceWatches sets up watches for all related resources that have a Watch
178+
// config, on their respective origin side, so that changes trigger primary reconciliation.
179+
func setupRelatedResourceWatches(
180+
c mccontroller.Controller,
181+
localManager manager.Manager,
182+
remoteManager mcmanager.Manager,
183+
pubRes *syncagentv1alpha1.PublishedResource,
184+
localDummy, remoteDummy *unstructured.Unstructured,
185+
log *zap.SugaredLogger,
186+
) error {
187+
// Deduplication is per-origin to allow the same GVK on both sides.
188+
watchedKcpGVKs := sets.New[schema.GroupVersionKind]()
189+
watchedServiceGVKs := sets.New[schema.GroupVersionKind]()
190+
191+
for _, relRes := range pubRes.Spec.Related {
192+
if relRes.Watch == nil {
193+
continue
194+
}
195+
196+
gvr := schema.GroupVersionResource{
197+
Group: relRes.Group,
198+
Version: relRes.Version,
199+
Resource: relRes.Resource,
200+
}
201+
202+
// Use the REST mapper of the origin side: related resources may have projected GVKs
203+
// that differ between kcp and the service cluster, so we must resolve using the
204+
// mapper that actually knows about the GVR on that side.
205+
var originRESTMapper meta.RESTMapper
206+
if relRes.Origin == syncagentv1alpha1.RelatedResourceOriginKcp {
207+
originRESTMapper = remoteManager.GetLocalManager().GetRESTMapper()
208+
} else {
209+
originRESTMapper = localManager.GetRESTMapper()
210+
}
211+
212+
gvk, err := originRESTMapper.KindFor(gvr)
213+
if err != nil {
214+
return fmt.Errorf("failed to determine Kind for related resource %v (origin: %s): %w", gvr, relRes.Origin, err)
215+
}
216+
217+
relatedDummy := &unstructured.Unstructured{}
218+
relatedDummy.SetGroupVersionKind(gvk)
219+
220+
if relRes.Origin == syncagentv1alpha1.RelatedResourceOriginKcp {
221+
if watchedKcpGVKs.Has(gvk) {
222+
continue
223+
}
224+
watchedKcpGVKs.Insert(gvk)
225+
226+
enqueueForRelated, err := buildKcpRelatedHandler(relRes.Watch, gvk, remoteDummy, log)
227+
if err != nil {
228+
return err
229+
}
230+
231+
if err := c.MultiClusterWatch(mcsource.TypedKind(relatedDummy, enqueueForRelated)); err != nil {
232+
return fmt.Errorf("failed to setup watch for kcp-origin related resource %v: %w", gvk, err)
233+
}
234+
} else {
235+
if watchedServiceGVKs.Has(gvk) {
236+
continue
237+
}
238+
watchedServiceGVKs.Insert(gvk)
239+
240+
enqueueForRelated, err := buildServiceRelatedHandler(relRes.Watch, gvk, localDummy, localManager, log)
241+
if err != nil {
242+
return err
243+
}
244+
245+
if err := c.Watch(source.TypedKind(localManager.GetCache(), relatedDummy, enqueueForRelated)); err != nil {
246+
return fmt.Errorf("failed to setup watch for service-origin related resource %v: %w", gvk, err)
247+
}
248+
}
249+
250+
log.Infow("Set up watch for related resource", "gvk", gvk, "origin", relRes.Origin)
251+
}
252+
253+
return nil
254+
}
255+
256+
// buildKcpRelatedHandler constructs the per-cluster event handler for a kcp-origin related resource.
257+
func buildKcpRelatedHandler(
258+
watch *syncagentv1alpha1.RelatedResourceWatch,
259+
gvk schema.GroupVersionKind,
260+
remoteDummy *unstructured.Unstructured,
261+
log *zap.SugaredLogger,
262+
) (mchandler.TypedEventHandlerFunc[*unstructured.Unstructured, mcreconcile.Request], error) {
263+
switch {
264+
case watch.ByOwner != nil:
265+
ownerGVK := remoteDummy.GroupVersionKind()
266+
return func(clusterName string, _ cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] {
267+
return &byOwnerEventHandler{
268+
clusterName: clusterName,
269+
ownerGVK: ownerGVK,
270+
}
271+
}, nil
272+
273+
case watch.BySelector != nil:
274+
labelSelector := watch.BySelector
275+
primaryDummy := remoteDummy.DeepCopy()
276+
return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] {
277+
return &bySelectorEventHandler{
278+
clusterName: clusterName,
279+
client: cl.GetClient(),
280+
primaryDummy: primaryDummy,
281+
labelSelector: labelSelector,
282+
log: log,
283+
}
284+
}, nil
285+
286+
default:
287+
return nil, fmt.Errorf("related resource %v (origin: kcp) has Watch set but neither byOwner nor bySelector configured", gvk)
288+
}
289+
}
290+
291+
// buildServiceRelatedHandler constructs the event handler for a service-cluster-origin related resource.
292+
// It maps the changed related resource back to the remote (kcp) primary via sync metadata on the local primary.
293+
func buildServiceRelatedHandler(
294+
watch *syncagentv1alpha1.RelatedResourceWatch,
295+
gvk schema.GroupVersionKind,
296+
localDummy *unstructured.Unstructured,
297+
localManager manager.Manager,
298+
log *zap.SugaredLogger,
299+
) (handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request], error) {
300+
localClient := localManager.GetClient()
301+
302+
switch {
303+
case watch.ByOwner != nil:
304+
ownerGVK := localDummy.GroupVersionKind()
305+
primaryDummy := localDummy.DeepCopy()
306+
return handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, obj *unstructured.Unstructured) []mcreconcile.Request {
307+
for _, ref := range obj.GetOwnerReferences() {
308+
refGV, err := schema.ParseGroupVersion(ref.APIVersion)
309+
if err != nil || refGV.Group != ownerGVK.Group || refGV.Version != ownerGVK.Version || ref.Kind != ownerGVK.Kind {
310+
continue
311+
}
312+
localPrimary := primaryDummy.DeepCopy()
313+
if err := localClient.Get(ctx, types.NamespacedName{Namespace: obj.GetNamespace(), Name: ref.Name}, localPrimary); err != nil {
314+
log.Warnw("Failed to fetch local primary for byOwner watch", "owner", ref.Name, "error", err)
315+
return nil
316+
}
317+
if req := sync.RemoteNameForLocalObject(localPrimary); req != nil {
318+
return []mcreconcile.Request{*req}
319+
}
320+
return nil
321+
}
322+
return nil
323+
}), nil
324+
325+
case watch.BySelector != nil:
326+
selector, err := metav1.LabelSelectorAsSelector(watch.BySelector)
327+
if err != nil {
328+
return nil, fmt.Errorf("failed to convert bySelector for service-origin related resource %v: %w", gvk, err)
329+
}
330+
primaryDummy := localDummy.DeepCopy()
331+
return handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, _ *unstructured.Unstructured) []mcreconcile.Request {
332+
primaryList := &unstructured.UnstructuredList{}
333+
primaryList.SetAPIVersion(primaryDummy.GetAPIVersion())
334+
primaryList.SetKind(primaryDummy.GetKind() + "List")
335+
if err := localClient.List(ctx, primaryList, &ctrlruntimeclient.ListOptions{LabelSelector: selector}); err != nil {
336+
log.Warnw("Failed to list local primary objects for bySelector watch", "selector", selector.String(), "error", err)
337+
return nil
338+
}
339+
var reqs []mcreconcile.Request
340+
for i := range primaryList.Items {
341+
if req := sync.RemoteNameForLocalObject(&primaryList.Items[i]); req != nil {
342+
reqs = append(reqs, *req)
343+
}
344+
}
345+
return reqs
346+
}), nil
347+
348+
default:
349+
return nil, fmt.Errorf("related resource %v (origin: service) has Watch set but neither byOwner nor bySelector configured", gvk)
350+
}
351+
}
352+
169353
func (r *Reconciler) Reconcile(ctx context.Context, request mcreconcile.Request) (reconcile.Result, error) {
170354
log := r.log.With("cluster", request.ClusterName, "request", request.NamespacedName)
171355
log.Debug("Processing")

0 commit comments

Comments
 (0)