Skip to content

Commit 63e6eb5

Browse files
committed
Replicate logicalclusters containing synctargets
Signed-off-by: David Festal <[email protected]>
1 parent 5e15fe7 commit 63e6eb5

File tree

3 files changed

+137
-0
lines changed

3 files changed

+137
-0
lines changed
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
Copyright 2023 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package replicatelogicalcluster
18+
19+
import (
20+
"fmt"
21+
22+
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
23+
"github.com/kcp-dev/logicalcluster/v3"
24+
25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
26+
"k8s.io/apimachinery/pkg/util/runtime"
27+
"k8s.io/client-go/tools/cache"
28+
29+
"github.com/kcp-dev/kcp/pkg/reconciler/cache/labellogicalcluster"
30+
"github.com/kcp-dev/kcp/pkg/reconciler/cache/replication"
31+
corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
32+
"github.com/kcp-dev/kcp/sdk/apis/workload"
33+
workloadv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/workload/v1alpha1"
34+
kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster"
35+
corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1"
36+
workloadv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/workload/v1alpha1"
37+
)
38+
39+
const (
40+
ControllerName = "kcp-workload-replicate-logicalcluster"
41+
)
42+
43+
// NewController returns a new controller for labelling LogicalClusters that should be replicated.
44+
45+
func NewController(
46+
kcpClusterClient kcpclientset.ClusterInterface,
47+
logicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer,
48+
syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer,
49+
) labellogicalcluster.Controller {
50+
logicalClusterLister := logicalClusterInformer.Lister()
51+
syncTargetIndexer := syncTargetInformer.Informer().GetIndexer()
52+
53+
c := labellogicalcluster.NewController(
54+
ControllerName,
55+
workload.GroupName,
56+
func(cluster *corev1alpha1.LogicalCluster) bool {
57+
// If there are any SyncTargets for this logical cluster, then the LogicalCluster object should be replicated.
58+
keys, err := syncTargetIndexer.IndexKeys(kcpcache.ClusterIndexName, kcpcache.ClusterIndexKey(logicalcluster.From(cluster)))
59+
if err != nil {
60+
runtime.HandleError(fmt.Errorf("failed to list SyncTargets: %v", err))
61+
return false
62+
}
63+
return len(keys) > 0
64+
},
65+
kcpClusterClient,
66+
logicalClusterInformer,
67+
)
68+
69+
// enqueue the logical cluster every time the APIExport changes
70+
enqueueSyncTarget := func(obj interface{}) {
71+
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
72+
obj = tombstone.Obj
73+
}
74+
75+
syncTarget, ok := obj.(*workloadv1alpha1.SyncTarget)
76+
if !ok {
77+
runtime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
78+
return
79+
}
80+
81+
cluster, err := logicalClusterLister.Cluster(logicalcluster.From(syncTarget)).Get(corev1alpha1.LogicalClusterName)
82+
if err != nil && !apierrors.IsNotFound(err) {
83+
runtime.HandleError(fmt.Errorf("failed to get logical cluster: %v", err))
84+
return
85+
} else if apierrors.IsNotFound(err) {
86+
return
87+
}
88+
89+
c.EnqueueLogicalCluster(cluster, "reason", "SyncTarget changed", "synctarget", syncTarget.Name)
90+
}
91+
92+
syncTargetInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
93+
FilterFunc: replication.IsNoSystemClusterName,
94+
Handler: cache.ResourceEventHandlerFuncs{
95+
AddFunc: func(obj interface{}) {
96+
enqueueSyncTarget(obj)
97+
},
98+
DeleteFunc: func(obj interface{}) {
99+
enqueueSyncTarget(obj)
100+
},
101+
},
102+
})
103+
104+
return c
105+
}

tmc/pkg/server/controllers.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
workloadplacement "github.com/kcp-dev/kcp/pkg/reconciler/workload/placement"
4040
workloadreplicateclusterrole "github.com/kcp-dev/kcp/pkg/reconciler/workload/replicateclusterrole"
4141
workloadreplicateclusterrolebinding "github.com/kcp-dev/kcp/pkg/reconciler/workload/replicateclusterrolebinding"
42+
workloadreplicatelogicalcluster "github.com/kcp-dev/kcp/pkg/reconciler/workload/replicatelogicalcluster"
4243
workloadresource "github.com/kcp-dev/kcp/pkg/reconciler/workload/resource"
4344
synctargetcontroller "github.com/kcp-dev/kcp/pkg/reconciler/workload/synctarget"
4445
"github.com/kcp-dev/kcp/pkg/reconciler/workload/synctargetexports"
@@ -459,3 +460,30 @@ func (s *Server) installWorkloadReplicateClusterRoleBindingControllers(ctx conte
459460
return nil
460461
})
461462
}
463+
464+
func (s *Server) installWorkloadReplicateLogicalClusterControllers(ctx context.Context, config *rest.Config) error {
465+
config = rest.CopyConfig(config)
466+
config = rest.AddUserAgent(config, workloadreplicatelogicalcluster.ControllerName)
467+
kcpClusterClient, err := kcpclientset.NewForConfig(config)
468+
if err != nil {
469+
return err
470+
}
471+
472+
c := workloadreplicatelogicalcluster.NewController(
473+
kcpClusterClient,
474+
s.Core.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(),
475+
s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(),
476+
)
477+
478+
return s.Core.AddPostStartHook(postStartHookName(workloadreplicatelogicalcluster.ControllerName), func(hookContext genericapiserver.PostStartHookContext) error {
479+
logger := klog.FromContext(ctx).WithValues("postStartHook", postStartHookName(workloadreplicatelogicalcluster.ControllerName))
480+
if err := s.Core.WaitForSync(hookContext.StopCh); err != nil {
481+
logger.Error(err, "failed to finish post-start-hook")
482+
return nil // don't klog.Fatal. This only happens when context is cancelled.
483+
}
484+
485+
go c.Start(goContext(hookContext), 2)
486+
487+
return nil
488+
})
489+
}

tmc/pkg/server/server.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ func (s *Server) Run(ctx context.Context) error {
110110
if err := s.installWorkloadReplicateClusterRoleBindingControllers(ctx, controllerConfig); err != nil {
111111
return err
112112
}
113+
114+
if err := s.installWorkloadReplicateLogicalClusterControllers(ctx, controllerConfig); err != nil {
115+
return err
116+
}
113117
}
114118

115119
if s.Options.Core.Controllers.EnableAll || enabled.Has("resource-scheduler") {

0 commit comments

Comments
 (0)