diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 0e12adbd4ba2..45796c1c2cef 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -280,7 +280,7 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts * rateLimiterGetter := util.GetClusterRateLimiterGetter().SetDefaultLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst) clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter} - objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, clusterClientOption, resourceInterpreter) + objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, clusterClientOption, resourceInterpreter, genericmanager.GetInstance()) controllerContext := controllerscontext.Context{ Mgr: mgr, ObjectWatcher: objectWatcher, @@ -346,12 +346,13 @@ func startClusterStatusController(ctx controllerscontext.Context) (bool, error) func startExecutionController(ctx controllerscontext.Context) (bool, error) { executionController := &execution.Controller{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), //nolint:staticcheck // Note: GetEventRecorderFor is deprecated in controller-runtime v0.23.0 in favor of GetEventRecorder. This changes event API from v1 events to events.k8s.io. We need to migrate carefully, especially considering the impact on users and RBAC permission changes in installation/deployment tools. - RESTMapper: ctx.Mgr.GetRESTMapper(), - ObjectWatcher: ctx.ObjectWatcher, - InformerManager: genericmanager.GetInstance(), - RateLimiterOptions: ctx.Opts.RateLimiterOptions, + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), //nolint:staticcheck // Note: GetEventRecorderFor is deprecated in controller-runtime v0.23.0 in favor of GetEventRecorder. This changes event API from v1 events to events.k8s.io. We need to migrate carefully, especially considering the impact on users and RBAC permission changes in installation/deployment tools. + RESTMapper: ctx.Mgr.GetRESTMapper(), + ObjectWatcher: ctx.ObjectWatcher, + InformerManager: genericmanager.GetInstance(), + RateLimiterOptions: ctx.Opts.RateLimiterOptions, + ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent, } if err := executionController.SetupWithManager(ctx.Mgr); err != nil { return false, err @@ -366,9 +367,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (bool, error) { RESTMapper: ctx.Mgr.GetRESTMapper(), InformerManager: genericmanager.GetInstance(), Context: ctx.Context, - ObjectWatcher: ctx.ObjectWatcher, ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, ConcurrentWorkStatusSyncs: ctx.Opts.ConcurrentWorkSyncs, RateLimiterOptions: ctx.Opts.RateLimiterOptions, ResourceInterpreter: ctx.ResourceInterpreter, diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 6032d3f84e2e..3e983031bfce 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -457,13 +457,15 @@ func startBindingStatusController(ctx controllerscontext.Context) (enabled bool, func startExecutionController(ctx controllerscontext.Context) (enabled bool, err error) { executionController := &execution.Controller{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), //nolint:staticcheck // Note: GetEventRecorderFor is deprecated in controller-runtime v0.23.0 in favor of GetEventRecorder. This changes event API from v1 events to events.k8s.io. We need to migrate carefully, especially considering the impact on users and RBAC permission changes in installation/deployment tools. - RESTMapper: ctx.Mgr.GetRESTMapper(), - ObjectWatcher: ctx.ObjectWatcher, - WorkPredicateFunc: helper.WorkWithinPushClusterPredicate(ctx.Mgr), - InformerManager: genericmanager.GetInstance(), - RateLimiterOptions: ctx.Opts.RateLimiterOptions, + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), //nolint:staticcheck // Note: GetEventRecorderFor is deprecated in controller-runtime v0.23.0 in favor of GetEventRecorder. This changes event API from v1 events to events.k8s.io. We need to migrate carefully, especially considering the impact on users and RBAC permission changes in installation/deployment tools. + RESTMapper: ctx.Mgr.GetRESTMapper(), + ObjectWatcher: ctx.ObjectWatcher, + WorkPredicateFunc: helper.WorkWithinPushClusterPredicate(ctx.Mgr), + InformerManager: genericmanager.GetInstance(), + RateLimiterOptions: ctx.Opts.RateLimiterOptions, + ClusterClientSetFunc: util.NewClusterDynamicClientSet, + ClusterClientOption: ctx.ClusterClientOption, } if err := executionController.SetupWithManager(ctx.Mgr); err != nil { return false, err @@ -479,11 +481,9 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er RESTMapper: ctx.Mgr.GetRESTMapper(), InformerManager: genericmanager.GetInstance(), Context: ctx.Context, - ObjectWatcher: ctx.ObjectWatcher, WorkPredicateFunc: helper.WorkWithinPushClusterPredicate(ctx.Mgr), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, ClusterClientOption: ctx.ClusterClientOption, - ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs, RateLimiterOptions: ctx.Opts.RateLimiterOptions, ResourceInterpreter: ctx.ResourceInterpreter, @@ -868,7 +868,7 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts * } rateLimiterGetter := util.GetClusterRateLimiterGetter().SetDefaultLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst) clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter} - objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, clusterClientOption, resourceInterpreter) + objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, clusterClientOption, resourceInterpreter, genericmanager.GetInstance()) resourceDetector := &detector.ResourceDetector{ DiscoveryClientSet: discoverClientSet, diff --git a/pkg/controllers/execution/execution_controller.go b/pkg/controllers/execution/execution_controller.go index 9eed47e27a50..40b53862631e 100644 --- a/pkg/controllers/execution/execution_controller.go +++ b/pkg/controllers/execution/execution_controller.go @@ -19,6 +19,8 @@ package execution import ( "context" "fmt" + "reflect" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -26,7 +28,9 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" @@ -36,15 +40,22 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/detector" "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/metrics" + "github.com/karmada-io/karmada/pkg/resourceinterpreter/default/native/prune" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/fedinformer" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" "github.com/karmada-io/karmada/pkg/util/helper" @@ -62,18 +73,24 @@ const ( // workSuspendDispatchingConditionReason is the reason for the WorkDispatching condition when dispatching is suspended. workSuspendDispatchingConditionReason = "SuspendDispatching" // workDispatchingConditionReason is the reason for the WorkDispatching condition when dispatching is not suspended. - workDispatchingConditionReason = "Dispatching" + workDispatchingConditionReason = "Dispatching" + memberInformerNotSyncedRequeueAfter = 1 * time.Minute ) // Controller is to sync Work. type Controller struct { - client.Client // used to operate Work resources. - EventRecorder record.EventRecorder - RESTMapper meta.RESTMapper - ObjectWatcher objectwatcher.ObjectWatcher - WorkPredicateFunc predicate.Predicate - InformerManager genericmanager.MultiClusterInformerManager - RateLimiterOptions ratelimiterflag.Options + client.Client // used to operate Work resources. + EventRecorder record.EventRecorder + RESTMapper meta.RESTMapper + ObjectWatcher objectwatcher.ObjectWatcher + WorkPredicateFunc predicate.Predicate + InformerManager genericmanager.MultiClusterInformerManager + eventHandlerOnce sync.Once + eventHandler cache.ResourceEventHandler + eventChannel chan event.TypedGenericEvent[client.Object] + RateLimiterOptions ratelimiterflag.Options + ClusterClientSetFunc util.NewClusterDynamicClientSetFunc + ClusterClientOption *util.ClientOption } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -128,11 +145,28 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques return controllerruntime.Result{}, err } + gvrTargets, err := util.GetGVRsFromWork(c.RESTMapper, work) + if err != nil { + return controllerruntime.Result{}, err + } + + informersSynced, err := helper.EnsureInformerHandlersReady(cluster, gvrTargets, c.getEventHandler(), c.InformerManager, c.ClusterClientSetFunc, c.Client, c.ClusterClientOption) + if err != nil { + klog.ErrorS(err, "Failed to ensure informers for Work.", "namespace", work.GetNamespace(), "name", work.GetName()) + return controllerruntime.Result{}, err + } + if !informersSynced { + klog.V(4).InfoS("Member cluster informers are not synced yet.", "namespace", work.GetNamespace(), "name", work.GetName(), "cluster", cluster.Name) + return controllerruntime.Result{RequeueAfter: memberInformerNotSyncedRequeueAfter}, nil + } + return c.syncWork(ctx, clusterName, work) } // SetupWithManager creates a controller and register to controller manager. func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { + c.eventChannel = make(chan event.TypedGenericEvent[client.Object], 1024) + ctrlBuilder := controllerruntime.NewControllerManagedBy(mgr).Named(ControllerName). WithEventFilter(predicate.GenerationChangedPredicate{}). WithOptions(controller.Options{ @@ -145,7 +179,10 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { ctrlBuilder.For(&workv1alpha1.Work{}) } - return ctrlBuilder.Complete(c) + return ctrlBuilder.WatchesRawSource(source.Channel[client.Object]( + c.eventChannel, + handler.EnqueueRequestsFromMapFunc(c.mapWorkloadToWork), + )).Complete(c) } func (c *Controller) syncWork(ctx context.Context, clusterName string, work *workv1alpha1.Work) (controllerruntime.Result, error) { @@ -396,3 +433,148 @@ func (c *Controller) eventf(object *unstructured.Unstructured, eventType, reason } c.EventRecorder.Eventf(ref, eventType, reason, messageFmt, args...) } + +func (c *Controller) mapWorkloadToWork(_ context.Context, object client.Object) []reconcile.Request { + if object.GetLabels()[util.ManagedByKarmadaLabel] != util.ManagedByKarmadaLabelValue { + klog.V(5).InfoS("Ignore resource which is not managed by Karmada.", "kind", object.GetObjectKind().GroupVersionKind().Kind, "namespace", object.GetNamespace(), "name", object.GetName()) + return nil + } + + annotations := object.GetAnnotations() + workNamespace, nsExist := annotations[workv1alpha2.WorkNamespaceAnnotation] + workName, nameExist := annotations[workv1alpha2.WorkNameAnnotation] + if !nsExist || !nameExist { + klog.V(5).InfoS("Ignore resource missing Karmada work annotations.", "kind", object.GetObjectKind().GroupVersionKind().Kind, "namespace", object.GetNamespace(), "name", object.GetName()) + return nil + } + + return []reconcile.Request{{NamespacedName: client.ObjectKey{Namespace: workNamespace, Name: workName}}} +} + +func (c *Controller) enqueueWorkload(object runtime.Object) { + if c.eventChannel == nil { + klog.V(5).InfoS("Ignore member cluster object event as event channel is not initialized") + return + } + clientObject, ok := object.(client.Object) + if !ok { + klog.ErrorS(nil, "Ignore event as object does not implement client.Object", "object", object) + return + } + c.eventChannel <- event.TypedGenericEvent[client.Object]{Object: clientObject} +} + +// getEventHandler return callback function that knows how to handle events from the member cluster. +func (c *Controller) getEventHandler() cache.ResourceEventHandler { + c.eventHandlerOnce.Do(func() { + if c.eventHandler == nil { + c.eventHandler = fedinformer.NewHandlerOnEvents(nil, c.onUpdate, c.onDelete) + } + }) + return c.eventHandler +} + +func (c *Controller) onUpdate(old, cur any) { + oldObj, ok := old.(*unstructured.Unstructured) + if !ok { + klog.ErrorS(nil, "Failed to assert old object as Unstructured", "old", old) + return + } + // Skip resources not managed by Karmada. + if oldObj.GetLabels()[util.ManagedByKarmadaLabel] != util.ManagedByKarmadaLabelValue { + klog.V(5).InfoS("Skip resource not managed by Karmada", + "apiVersion", oldObj.GetAPIVersion(), + "kind", oldObj.GetKind(), + "namespace", oldObj.GetNamespace(), + "name", oldObj.GetName()) + return + } + annotations := oldObj.GetAnnotations() + workNS, nsExist := annotations[workv1alpha2.WorkNamespaceAnnotation] + _, nameExist := annotations[workv1alpha2.WorkNameAnnotation] + if !nsExist || !nameExist { + klog.V(5).InfoS("Skip resource missing Karmada work annotations", + "apiVersion", oldObj.GetAPIVersion(), + "kind", oldObj.GetKind(), + "namespace", oldObj.GetNamespace(), + "name", oldObj.GetName()) + return + } + + curObj, ok := cur.(*unstructured.Unstructured) + if !ok { + klog.ErrorS(nil, "Failed to assert cur object as Unstructured", "cur", cur) + return + } + + clusterName, err := names.GetClusterName(workNS) + if err != nil { + klog.ErrorS(err, "Failed to get cluster name from work namespace", + "workNamespace", workNS, + "apiVersion", curObj.GetAPIVersion(), + "kind", curObj.GetKind(), + "namespace", curObj.GetNamespace(), + "name", curObj.GetName()) + return + } + versionRecord, ok := c.ObjectWatcher.GetVersionRecord(clusterName, curObj) + if !ok { + // Wait for the initial Reconcile to establish a version record before processing events. + // The version record is set after the first successful sync. + return + } + // Skip if the resource version matches the recorded one, this event is a result of our own sync. + // However, this assessment is not always accurate, + // as the update event may reach the Informer faster than the act of recording the version. + if curObj.GetResourceVersion() == versionRecord { + return + } + + oldCopy := oldObj.DeepCopy() + // Remove irrelevant fields (e.g., managedFields, status) before comparing content. + // TODO: Consider a dedicated RemoveIrrelevantFields variant that preserves + // relevant changes like ownerReferences. + err = prune.RemoveIrrelevantFields(oldCopy, prune.RemoveJobTTLSeconds) + if err != nil { + klog.ErrorS(err, "Failed to remove irrelevant fields from old resource", + "apiVersion", curObj.GetAPIVersion(), + "kind", oldCopy.GetKind(), + "namespace", oldCopy.GetNamespace(), + "name", oldCopy.GetName(), + "clusterName", clusterName) + return + } + curCopy := curObj.DeepCopy() + err = prune.RemoveIrrelevantFields(curCopy, prune.RemoveJobTTLSeconds) + if err != nil { + klog.ErrorS(err, "Failed to remove irrelevant fields from current resource", + "apiVersion", curObj.GetAPIVersion(), + "kind", curCopy.GetKind(), + "namespace", curCopy.GetNamespace(), + "name", curCopy.GetName(), + "clusterName", clusterName) + return + } + + // No meaningful content difference after pruning irrelevant fields. + if reflect.DeepEqual(oldCopy, curCopy) { + return + } + + c.enqueueWorkload(curObj) +} + +func (c *Controller) onDelete(old any) { + if deleted, ok := old.(cache.DeletedFinalStateUnknown); ok { + old = deleted.Obj + if old == nil { + return + } + } + oldObj, ok := old.(runtime.Object) + if !ok { + klog.ErrorS(nil, "Failed to assert old object as runtime.Object", "old", old) + return + } + c.enqueueWorkload(oldObj) +} diff --git a/pkg/controllers/execution/execution_controller_test.go b/pkg/controllers/execution/execution_controller_test.go index 4a8e01cdde07..47463ecb3148 100644 --- a/pkg/controllers/execution/execution_controller_test.go +++ b/pkg/controllers/execution/execution_controller_test.go @@ -20,27 +20,33 @@ import ( "encoding/json" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" dynamicfake "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/kubernetes/scheme" + toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/reconcile" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/resourceinterpreter/default/native" @@ -194,10 +200,6 @@ func TestExecutionController_Reconcile(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - t.Cleanup(func() { - genericmanager.GetInstance().Stop(clusterName) - }) - req := controllerruntime.Request{ NamespacedName: types.NamespacedName{ Name: "work", @@ -255,7 +257,7 @@ func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Contr WithInterceptorFuncs(withGVKInterceptor(clientScheme)). Build() dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme, pod) - informerManager := genericmanager.GetInstance() + informerManager := genericmanager.NewMultiClusterInformerManager(context.Background()) informerManager.ForCluster(cluster.Name, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) informerManager.Start(cluster.Name) informerManager.WaitForCacheSync(cluster.Name) @@ -271,7 +273,7 @@ func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Contr InformerManager: informerManager, EventRecorder: recorder, RESTMapper: restMapper, - ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, nil, resourceInterpreter), + ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, nil, resourceInterpreter, informerManager), } } @@ -305,3 +307,276 @@ func newCluster(name string, clusterType string, clusterStatus metav1.ConditionS func (f FakeResourceInterpreter) Start(context.Context) error { return nil } + +func TestController_getEventHandlerIsMemoized(t *testing.T) { + c := &Controller{} + first := c.getEventHandler() + second := c.getEventHandler() + assert.NotNil(t, first) + assert.Same(t, first, second, "getEventHandler should return the same handler instance across calls") +} + +// stubObjectWatcher implements objectwatcher.ObjectWatcher for testing. +type stubObjectWatcher struct { + versionRecord string + recordExists bool +} + +func (s *stubObjectWatcher) Create(_ context.Context, _ string, _ *unstructured.Unstructured) error { + return nil +} +func (s *stubObjectWatcher) Update(_ context.Context, _ string, _, _ *unstructured.Unstructured) (objectwatcher.OperationResult, error) { + return objectwatcher.OperationResultNone, nil +} +func (s *stubObjectWatcher) Delete(_ context.Context, _ string, _ *unstructured.Unstructured) error { + return nil +} +func (s *stubObjectWatcher) GetVersionRecord(_ string, _ *unstructured.Unstructured) (string, bool) { + return s.versionRecord, s.recordExists +} + +func TestController_mapWorkloadToWork(t *testing.T) { + tests := []struct { + name string + labels map[string]string + annotations map[string]string + want []reconcile.Request + }{ + { + name: "non-Karmada managed resource is ignored", + labels: nil, + want: nil, + }, + { + name: "Karmada managed but missing both annotations returns nil", + labels: map[string]string{util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue}, + want: nil, + }, + { + name: "Karmada managed but missing name annotation returns nil", + labels: map[string]string{util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue}, + annotations: map[string]string{ + workv1alpha2.WorkNamespaceAnnotation: "karmada-es-cluster", + }, + want: nil, + }, + { + name: "Karmada managed but missing namespace annotation returns nil", + labels: map[string]string{util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue}, + annotations: map[string]string{ + workv1alpha2.WorkNameAnnotation: "work", + }, + want: nil, + }, + { + name: "both annotations present returns reconcile request", + labels: map[string]string{util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue}, + annotations: map[string]string{ + workv1alpha2.WorkNamespaceAnnotation: "karmada-es-cluster", + workv1alpha2.WorkNameAnnotation: "work", + }, + want: []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: "karmada-es-cluster", Name: "work"}}}, + }, + } + + c := &Controller{} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pod := testhelper.NewPod(podNamespace, podName) + pod.SetLabels(tt.labels) + pod.SetAnnotations(tt.annotations) + got := c.mapWorkloadToWork(context.Background(), pod) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestController_enqueueWorkload(t *testing.T) { + t.Run("nil channel drops event without panic", func(_ *testing.T) { + c := &Controller{} + c.enqueueWorkload(testhelper.NewPod(podNamespace, podName)) + }) + + t.Run("nil runtime.Object is ignored", func(t *testing.T) { + ch := make(chan event.TypedGenericEvent[client.Object], 10) + c := &Controller{eventChannel: ch} + c.enqueueWorkload(nil) + assert.Equal(t, 0, len(ch)) + }) + + t.Run("non-client.Object input is ignored", func(t *testing.T) { + ch := make(chan event.TypedGenericEvent[client.Object], 10) + c := &Controller{eventChannel: ch} + c.enqueueWorkload(&corev1.PodList{}) + assert.Equal(t, 0, len(ch)) + }) + + t.Run("client.Object is forwarded to eventChannel", func(t *testing.T) { + ch := make(chan event.TypedGenericEvent[client.Object], 10) + c := &Controller{eventChannel: ch} + pod := testhelper.NewPod(podNamespace, podName) + c.enqueueWorkload(pod) + select { + case ev := <-ch: + assert.Equal(t, pod.GetName(), ev.Object.GetName()) + assert.Equal(t, pod.GetNamespace(), ev.Object.GetNamespace()) + case <-time.After(time.Second): + t.Fatal("timeout waiting for enqueued event") + } + }) +} + +func TestController_onUpdate(t *testing.T) { + // newManagedUnstructuredPod creates an Unstructured Pod pre-configured with + // labels, annotations, and ResourceVersion required to pass the guard chain. + newManagedUnstructuredPod := func(rv string, spec, status map[string]any) *unstructured.Unstructured { + obj := &unstructured.Unstructured{} + obj.SetAPIVersion("v1") + obj.SetKind("Pod") + obj.SetNamespace(podNamespace) + obj.SetName(podName) + obj.SetResourceVersion(rv) + obj.SetLabels(map[string]string{util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue}) + obj.SetAnnotations(map[string]string{ + workv1alpha2.WorkNamespaceAnnotation: "karmada-es-cluster", + workv1alpha2.WorkNameAnnotation: "work", + }) + if spec != nil { + _ = unstructured.SetNestedMap(obj.Object, spec, "spec") + } + if status != nil { + _ = unstructured.SetNestedMap(obj.Object, status, "status") + } + return obj + } + + guardController := func() *Controller { + return &Controller{eventChannel: make(chan event.TypedGenericEvent[client.Object], 10)} + } + + versionedController := func(recordRV string) *Controller { + return &Controller{ + eventChannel: make(chan event.TypedGenericEvent[client.Object], 10), + ObjectWatcher: &stubObjectWatcher{versionRecord: recordRV, recordExists: true}, + } + } + + t.Run("non-Unstructured old is ignored", func(t *testing.T) { + c := guardController() + c.onUpdate(testhelper.NewPod(podNamespace, podName), nil) + assert.Equal(t, 0, len(c.eventChannel)) + }) + + t.Run("non-Karmada resource is skipped", func(t *testing.T) { + c := guardController() + obj := newManagedUnstructuredPod("v1", nil, nil) + obj.SetLabels(nil) + c.onUpdate(obj, obj) + assert.Equal(t, 0, len(c.eventChannel)) + }) + + t.Run("resource missing work annotations is skipped", func(t *testing.T) { + c := guardController() + obj := newManagedUnstructuredPod("v1", nil, nil) + obj.SetAnnotations(nil) + c.onUpdate(obj, obj) + assert.Equal(t, 0, len(c.eventChannel)) + }) + + t.Run("non-Unstructured cur is ignored", func(t *testing.T) { + c := guardController() + oldObj := newManagedUnstructuredPod("v1", nil, nil) + c.onUpdate(oldObj, testhelper.NewPod(podNamespace, podName)) + assert.Equal(t, 0, len(c.eventChannel)) + }) + + t.Run("resource without version record is skipped", func(t *testing.T) { + c := &Controller{ + eventChannel: make(chan event.TypedGenericEvent[client.Object], 10), + ObjectWatcher: &stubObjectWatcher{recordExists: false}, + } + oldObj := newManagedUnstructuredPod("v1", nil, nil) + curObj := newManagedUnstructuredPod("v2", map[string]any{"nodeName": "n1"}, nil) + c.onUpdate(oldObj, curObj) + assert.Equal(t, 0, len(c.eventChannel)) + }) + + t.Run("matching resource version is skipped", func(t *testing.T) { + c := versionedController("v1") + oldObj := newManagedUnstructuredPod("v1", nil, nil) + curObj := newManagedUnstructuredPod("v1", map[string]any{"nodeName": "n2"}, nil) + c.onUpdate(oldObj, curObj) + assert.Equal(t, 0, len(c.eventChannel)) + }) + + t.Run("identical old and cur is skipped", func(t *testing.T) { + c := versionedController("v1") + oldObj := newManagedUnstructuredPod("v2", map[string]any{"nodeName": "n1"}, nil) + curObj := newManagedUnstructuredPod("v2", map[string]any{"nodeName": "n1"}, nil) + c.onUpdate(oldObj, curObj) + assert.Equal(t, 0, len(c.eventChannel)) + }) + + t.Run("status-only diff is ignored", func(t *testing.T) { + c := versionedController("v1") + oldObj := newManagedUnstructuredPod("v2", map[string]any{"nodeName": "n1"}, map[string]any{"phase": "Pending"}) + curObj := newManagedUnstructuredPod("v3", map[string]any{"nodeName": "n1"}, map[string]any{"phase": "Running"}) + c.onUpdate(oldObj, curObj) + assert.Equal(t, 0, len(c.eventChannel)) + }) + + t.Run("non-status diff enqueues current object", func(t *testing.T) { + c := versionedController("v1") + oldObj := newManagedUnstructuredPod("v2", map[string]any{"nodeName": "n1"}, nil) + curObj := newManagedUnstructuredPod("v3", map[string]any{"nodeName": "n2"}, nil) + c.onUpdate(oldObj, curObj) + select { + case ev := <-c.eventChannel: + assert.Equal(t, podName, ev.Object.GetName()) + case <-time.After(time.Second): + t.Fatal("timeout waiting for enqueued event") + } + }) +} + +func TestController_onDelete(t *testing.T) { + t.Run("non-runtime.Object old is ignored", func(t *testing.T) { + ch := make(chan event.TypedGenericEvent[client.Object], 10) + c := &Controller{eventChannel: ch} + c.onDelete("not a runtime object") + assert.Equal(t, 0, len(ch)) + }) + + t.Run("plain object is enqueued", func(t *testing.T) { + ch := make(chan event.TypedGenericEvent[client.Object], 10) + c := &Controller{eventChannel: ch} + pod := testhelper.NewPod(podNamespace, podName) + c.onDelete(pod) + select { + case ev := <-ch: + assert.Equal(t, podName, ev.Object.GetName()) + case <-time.After(time.Second): + t.Fatal("timeout waiting for enqueued event") + } + }) + + t.Run("DeletedFinalStateUnknown wrapper is unwrapped", func(t *testing.T) { + ch := make(chan event.TypedGenericEvent[client.Object], 10) + c := &Controller{eventChannel: ch} + pod := testhelper.NewPod(podNamespace, podName) + c.onDelete(toolscache.DeletedFinalStateUnknown{Key: "k", Obj: pod}) + select { + case ev := <-ch: + assert.Equal(t, podName, ev.Object.GetName()) + case <-time.After(time.Second): + t.Fatal("timeout waiting for enqueued event") + } + }) + + t.Run("DeletedFinalStateUnknown with nil Obj is dropped", func(t *testing.T) { + ch := make(chan event.TypedGenericEvent[client.Object], 10) + c := &Controller{eventChannel: ch} + c.onDelete(toolscache.DeletedFinalStateUnknown{Key: "k", Obj: nil}) + assert.Equal(t, 0, len(ch)) + }) +} diff --git a/pkg/controllers/status/work_status_controller.go b/pkg/controllers/status/work_status_controller.go index 6a11be4d8178..c0539ce1ad4b 100644 --- a/pkg/controllers/status/work_status_controller.go +++ b/pkg/controllers/status/work_status_controller.go @@ -20,11 +20,12 @@ import ( "context" "fmt" "reflect" + "sync" + "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -44,7 +45,6 @@ import ( workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/features" - "github.com/karmada-io/karmada/pkg/metrics" "github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" @@ -53,29 +53,28 @@ import ( "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" - "github.com/karmada-io/karmada/pkg/util/objectwatcher" - "github.com/karmada-io/karmada/pkg/util/restmapper" ) // WorkStatusControllerName is the controller name that will be used when reporting events and metrics. const WorkStatusControllerName = "work-status-controller" +const memberInformerNotSyncedRequeueAfter = time.Minute + // WorkStatusController is to sync status of Work. type WorkStatusController struct { - client.Client // used to operate Work resources. - EventRecorder record.EventRecorder - RESTMapper meta.RESTMapper - InformerManager genericmanager.MultiClusterInformerManager - eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster. - Context context.Context - worker util.AsyncPriorityWorker // worker process resources periodic from rateLimitingQueue. + client.Client // used to operate Work resources. + EventRecorder record.EventRecorder + RESTMapper meta.RESTMapper + InformerManager genericmanager.MultiClusterInformerManager + eventHandlerOnce sync.Once + eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster. + Context context.Context + worker util.AsyncPriorityWorker // worker process resources periodic from rateLimitingQueue. // ConcurrentWorkStatusSyncs is the number of Work status that are allowed to sync concurrently. ConcurrentWorkStatusSyncs int - ObjectWatcher objectwatcher.ObjectWatcher WorkPredicateFunc predicate.Predicate ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc ClusterClientOption *util.ClientOption - ClusterCacheSyncTimeout metav1.Duration RateLimiterOptions ratelimiterflag.Options ResourceInterpreter resourceinterpreter.ResourceInterpreter } @@ -128,19 +127,29 @@ func (c *WorkStatusController) Reconcile(ctx context.Context, req controllerrunt // buildResourceInformers builds informer dynamically for managed resources in member cluster. // The created informer watches resource change and then sync to the relevant Work object. func (c *WorkStatusController) buildResourceInformers(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work) (controllerruntime.Result, error) { - err := c.registerInformersAndStart(cluster, work) + gvrTargets, err := util.GetGVRsFromWork(c.RESTMapper, work) + if err != nil { + return controllerruntime.Result{}, err + } + informersSynced, err := helper.EnsureInformerHandlersReady(cluster, gvrTargets, c.getEventHandler(), c.InformerManager, c.ClusterDynamicClientSetFunc, c.Client, c.ClusterClientOption) if err != nil { klog.ErrorS(err, "Failed to register informer for Work.", "namespace", work.GetNamespace(), "name", work.GetName()) return controllerruntime.Result{}, err } + if !informersSynced { + klog.V(4).InfoS("Member cluster informers are not synced yet.", "namespace", work.GetNamespace(), "name", work.GetName(), "cluster", cluster.Name) + return controllerruntime.Result{RequeueAfter: memberInformerNotSyncedRequeueAfter}, nil + } return controllerruntime.Result{}, nil } // getEventHandler return callback function that knows how to handle events from the member cluster. func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler { - if c.eventHandler == nil { - c.eventHandler = fedinformer.NewHandlerOnEvents(c.onAdd, c.onUpdate, c.onDelete) - } + c.eventHandlerOnce.Do(func() { + if c.eventHandler == nil { + c.eventHandler = fedinformer.NewHandlerOnEvents(c.onAdd, c.onUpdate, c.onDelete) + } + }) return c.eventHandler } @@ -200,7 +209,7 @@ func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error { observedObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey) if err != nil { if apierrors.IsNotFound(err) { - return c.handleDeleteEvent(ctx, fedKey) + return nil } return err } @@ -229,135 +238,10 @@ func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error { return nil } - if err := c.updateResource(ctx, observedObj, workObject, fedKey); err != nil { - return err - } - klog.InfoS("Reflecting resource status to Work.", "kind", observedObj.GetKind(), "resource", observedObj.GetNamespace()+"/"+observedObj.GetName(), "namespace", workNamespace, "name", workName) return c.reflectStatus(ctx, workObject, observedObj) } -func (c *WorkStatusController) updateResource(ctx context.Context, observedObj *unstructured.Unstructured, workObject *workv1alpha1.Work, fedKey keys.FederatedKey) error { - if util.IsWorkSuspendDispatching(workObject) { - return nil - } - - desiredObj, err := c.getRawManifest(workObject.Spec.Workload.Manifests, observedObj) - if err != nil { - return err - } - - clusterName, err := names.GetClusterName(workObject.Namespace) - if err != nil { - klog.ErrorS(err, "Failed to get member cluster name", "cluster", workObject.Namespace) - return err - } - - // we should check if the observed status is consistent with the declaration to prevent accidental changes made - // in member clusters. - needUpdate := c.ObjectWatcher.NeedsUpdate(clusterName, desiredObj, observedObj) - if needUpdate { - operationResult, updateErr := c.ObjectWatcher.Update(ctx, clusterName, desiredObj, observedObj) - metrics.CountUpdateResourceToCluster(updateErr, desiredObj.GetAPIVersion(), desiredObj.GetKind(), clusterName, string(operationResult)) - if updateErr != nil { - klog.ErrorS(updateErr, "Updating resource failed", "resource", fedKey.String()) - return updateErr - } - // We can't return even after a success updates, because that might lose the chance to collect status. - // Not all updates are real, they might be no change, in that case there will be no more event for this update, - // this usually happens with those resources not enables 'metadata.generation', like 'Service'. - // When a Service's status changes, it's 'metadata.resourceVersion' will be increased, but 'metadata.generation' - // not increased(defaults to 0), the ObjectWatcher can't easily tell what happened to the object, so ObjectWatcher - // also needs to update again. The update operation will be a non-operation if the event triggered by Service's - // status changes. - } - return nil -} - -func (c *WorkStatusController) handleDeleteEvent(ctx context.Context, key keys.FederatedKey) error { - executionSpace := names.GenerateExecutionSpaceName(key.Cluster) - - // Given the workload might have been deleted from informer cache, so that we can't get work object by its label, - // we have to get work by naming rule as the work's name is generated by the workload's kind, name and namespace. - workName := names.GenerateWorkName(key.Kind, key.Name, key.Namespace) - work := &workv1alpha1.Work{} - if err := c.Client.Get(ctx, client.ObjectKey{Namespace: executionSpace, Name: workName}, work); err != nil { - // stop processing as the work object has been removed, assume it's a normal delete operation. - if apierrors.IsNotFound(err) { - return nil - } - - klog.ErrorS(err, "Failed to get Work from cache") - return err - } - - // stop processing as the work object being deleting. - if !work.DeletionTimestamp.IsZero() { - return nil - } - - // skip processing as the work object is suspended for dispatching. - if util.IsWorkSuspendDispatching(work) { - return nil - } - - reCreateErr := c.recreateResourceIfNeeded(ctx, work, key) - if reCreateErr != nil { - c.updateAppliedCondition(ctx, work, metav1.ConditionFalse, "ReCreateFailed", reCreateErr.Error()) - return reCreateErr - } - c.updateAppliedCondition(ctx, work, metav1.ConditionTrue, "ReCreateSuccessful", "Manifest has been successfully applied") - return nil -} - -func (c *WorkStatusController) recreateResourceIfNeeded(ctx context.Context, work *workv1alpha1.Work, workloadKey keys.FederatedKey) error { - for _, rawManifest := range work.Spec.Workload.Manifests { - manifest := &unstructured.Unstructured{} - if err := manifest.UnmarshalJSON(rawManifest.Raw); err != nil { - return err - } - - desiredGVK := schema.FromAPIVersionAndKind(manifest.GetAPIVersion(), manifest.GetKind()) - if reflect.DeepEqual(desiredGVK, workloadKey.GroupVersionKind()) && - manifest.GetNamespace() == workloadKey.Namespace && - manifest.GetName() == workloadKey.Name { - klog.InfoS("Recreating resource.", "resource", workloadKey.String()) - err := c.ObjectWatcher.Create(ctx, workloadKey.Cluster, manifest) - metrics.CountCreateResourceToCluster(err, workloadKey.GroupVersion().String(), workloadKey.Kind, workloadKey.Cluster, true) - if err != nil { - c.eventf(manifest, corev1.EventTypeWarning, events.EventReasonSyncWorkloadFailed, "Failed to create or update resource(%s/%s) in member cluster(%s): %v", manifest.GetNamespace(), manifest.GetName(), workloadKey.Cluster, err) - return err - } - c.eventf(manifest, corev1.EventTypeNormal, events.EventReasonSyncWorkloadSucceed, "Successfully applied resource(%s/%s) to cluster %s", manifest.GetNamespace(), manifest.GetName(), workloadKey.Cluster) - return nil - } - } - return nil -} - -// updateAppliedCondition update the condition for the given Work -func (c *WorkStatusController) updateAppliedCondition(ctx context.Context, work *workv1alpha1.Work, status metav1.ConditionStatus, reason, message string) { - newWorkAppliedCondition := metav1.Condition{ - Type: workv1alpha1.WorkApplied, - Status: status, - Reason: reason, - Message: message, - LastTransitionTime: metav1.Now(), - } - - err := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - _, err = helper.UpdateStatus(ctx, c.Client, work, func() error { - meta.SetStatusCondition(&work.Status.Conditions, newWorkAppliedCondition) - return nil - }) - return err - }) - - if err != nil { - klog.ErrorS(err, "Failed to update condition of work.", "namespace", work.Namespace, "name", work.Name) - } -} - // reflectStatus grabs cluster object's running status then updates to its owner object(Work). func (c *WorkStatusController) reflectStatus(ctx context.Context, work *workv1alpha1.Work, clusterObj *unstructured.Unstructured) error { statusRaw, err := c.ResourceInterpreter.ReflectStatus(clusterObj) @@ -444,106 +328,6 @@ func (c *WorkStatusController) mergeStatus(_ []workv1alpha1.ManifestStatus, newS return []workv1alpha1.ManifestStatus{newStatus} } -func (c *WorkStatusController) getRawManifest(manifests []workv1alpha1.Manifest, clusterObj *unstructured.Unstructured) (*unstructured.Unstructured, error) { - for _, rawManifest := range manifests { - manifest := &unstructured.Unstructured{} - if err := manifest.UnmarshalJSON(rawManifest.Raw); err != nil { - return nil, err - } - - if manifest.GetAPIVersion() == clusterObj.GetAPIVersion() && - manifest.GetKind() == clusterObj.GetKind() && - manifest.GetNamespace() == clusterObj.GetNamespace() && - manifest.GetName() == clusterObj.GetName() { - return manifest, nil - } - } - - return nil, fmt.Errorf("no such manifest exist") -} - -// registerInformersAndStart builds informer manager for cluster if it doesn't exist, then constructs informers for gvr -// and start it. -func (c *WorkStatusController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work) error { - singleClusterInformerManager, err := c.getSingleClusterManager(cluster) - if err != nil { - return err - } - - gvrTargets, err := c.getGVRsFromWork(work) - if err != nil { - return err - } - - allSynced := true - for gvr := range gvrTargets { - if !singleClusterInformerManager.IsInformerSynced(gvr) || !singleClusterInformerManager.IsHandlerExist(gvr, c.getEventHandler()) { - allSynced = false - singleClusterInformerManager.ForResource(gvr, c.getEventHandler()) - } - } - if allSynced { - return nil - } - - c.InformerManager.Start(cluster.Name) - - if err := func() error { - synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, c.ClusterCacheSyncTimeout.Duration) - if synced == nil { - return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) - } - for gvr := range gvrTargets { - if !synced[gvr] { - return fmt.Errorf("informer for %s hasn't synced", gvr) - } - } - return nil - }(); err != nil { - klog.ErrorS(err, "Failed to sync cache for cluster", "cluster", cluster.Name) - return err - } - - return nil -} - -// getGVRsFromWork traverses the manifests in work to find groupVersionResource list. -func (c *WorkStatusController) getGVRsFromWork(work *workv1alpha1.Work) (map[schema.GroupVersionResource]bool, error) { - gvrTargets := map[schema.GroupVersionResource]bool{} - for _, manifest := range work.Spec.Workload.Manifests { - workload := &unstructured.Unstructured{} - err := workload.UnmarshalJSON(manifest.Raw) - if err != nil { - klog.ErrorS(err, "Failed to unmarshal workload.") - return nil, err - } - gvr, err := restmapper.GetGroupVersionResource(c.RESTMapper, workload.GroupVersionKind()) - if err != nil { - klog.ErrorS(err, "Failed to get GVR from GVK for resource.", "namespace", workload.GetNamespace(), "name", workload.GetName()) - return nil, err - } - gvrTargets[gvr] = true - } - return gvrTargets, nil -} - -// getSingleClusterManager gets singleClusterInformerManager with clusterName. -// If manager is not exist, create it, otherwise gets it from map. -func (c *WorkStatusController) getSingleClusterManager(cluster *clusterv1alpha1.Cluster) (genericmanager.SingleClusterInformerManager, error) { - // TODO(chenxianpao): If cluster A is removed, then a new cluster that name also is A joins karmada, - // the cache in informer manager should be updated. - singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name) - if singleClusterInformerManager == nil { - dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption) - if err != nil { - klog.ErrorS(err, "Failed to build dynamic cluster client for cluster.", "cluster", cluster.Name) - return nil, err - } - singleClusterInformerManager = c.InformerManager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0) - } - return singleClusterInformerManager, nil -} - // SetupWithManager creates a controller and register to controller manager. func (c *WorkStatusController) SetupWithManager(mgr controllerruntime.Manager) error { ctrlBuilder := controllerruntime.NewControllerManagedBy(mgr).Named(WorkStatusControllerName). @@ -560,15 +344,6 @@ func (c *WorkStatusController) SetupWithManager(mgr controllerruntime.Manager) e return ctrlBuilder.Complete(c) } -func (c *WorkStatusController) eventf(object *unstructured.Unstructured, eventType, reason, messageFmt string, args ...any) { - ref, err := util.GenEventRef(object) - if err != nil { - klog.ErrorS(err, "Ignoring event. Failed to build event reference.", "reason", reason, "kind", object.GetKind(), "reference", klog.KObj(object)) - return - } - c.EventRecorder.Eventf(ref, eventType, reason, messageFmt, args...) -} - func (c *WorkStatusController) onAdd(obj any, isInInitialList bool) { curObj := obj.(runtime.Object) priority := util.ItemPriorityIfInInitialList(isInInitialList) diff --git a/pkg/controllers/status/work_status_controller_test.go b/pkg/controllers/status/work_status_controller_test.go index 9bd8abfccb82..98e093bbd37a 100644 --- a/pkg/controllers/status/work_status_controller_test.go +++ b/pkg/controllers/status/work_status_controller_test.go @@ -42,6 +42,7 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" @@ -51,10 +52,8 @@ import ( "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" - "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" "github.com/karmada-io/karmada/pkg/util/gclient" "github.com/karmada-io/karmada/pkg/util/helper" - "github.com/karmada-io/karmada/pkg/util/objectwatcher" testhelper "github.com/karmada-io/karmada/test/helper" ) @@ -78,7 +77,7 @@ func newCluster(name string, clusterType string, clusterStatus metav1.ConditionS func TestWorkStatusController_Reconcile(t *testing.T) { tests := []struct { name string - c WorkStatusController + c *WorkStatusController work *workv1alpha1.Work ns string expectRes controllerruntime.Result @@ -86,7 +85,7 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }{ { name: "normal case", - c: WorkStatusController{ + c: &WorkStatusController{ Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects( &clusterv1alpha1.Cluster{ ObjectMeta: metav1.ObjectMeta{Name: "cluster"}, @@ -107,10 +106,9 @@ func TestWorkStatusController_Reconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"}, Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token"), clusterv1alpha1.SecretCADataKey: testCA}, }).Build(), - InformerManager: genericmanager.GetInstance(), + InformerManager: genericmanager.NewMultiClusterInformerManager(context.Background()), WorkPredicateFunc: helper.NewClusterPredicateOnAgent("test"), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, - ClusterCacheSyncTimeout: metav1.Duration{}, RateLimiterOptions: ratelimiterflag.Options{}, }, work: &workv1alpha1.Work{ @@ -133,12 +131,11 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, { name: "work not exists", - c: WorkStatusController{ + c: &WorkStatusController{ Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), + InformerManager: genericmanager.NewMultiClusterInformerManager(context.Background()), WorkPredicateFunc: helper.NewClusterPredicateOnAgent("test"), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, RateLimiterOptions: ratelimiterflag.Options{}, }, work: &workv1alpha1.Work{ @@ -161,12 +158,11 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, { name: "work's DeletionTimestamp isn't zero", - c: WorkStatusController{ + c: &WorkStatusController{ Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), + InformerManager: genericmanager.NewMultiClusterInformerManager(context.Background()), WorkPredicateFunc: helper.NewClusterPredicateOnAgent("test"), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, RateLimiterOptions: ratelimiterflag.Options{}, }, work: &workv1alpha1.Work{ @@ -174,6 +170,7 @@ func TestWorkStatusController_Reconcile(t *testing.T) { Name: "work", Namespace: "karmada-es-cluster", DeletionTimestamp: &metav1.Time{Time: time.Now()}, + Finalizers: []string{"karmada.io/block-deletion"}, }, Status: workv1alpha1.WorkStatus{ Conditions: []metav1.Condition{ @@ -190,12 +187,11 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, { name: "work's status is not applied", - c: WorkStatusController{ + c: &WorkStatusController{ Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), + InformerManager: genericmanager.NewMultiClusterInformerManager(context.Background()), WorkPredicateFunc: helper.NewClusterPredicateOnAgent("test"), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, RateLimiterOptions: ratelimiterflag.Options{}, }, work: &workv1alpha1.Work{ @@ -218,12 +214,11 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, { name: "failed to get cluster name", - c: WorkStatusController{ + c: &WorkStatusController{ Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), + InformerManager: genericmanager.NewMultiClusterInformerManager(context.Background()), WorkPredicateFunc: helper.NewClusterPredicateOnAgent("test"), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, RateLimiterOptions: ratelimiterflag.Options{}, }, work: &workv1alpha1.Work{ @@ -246,12 +241,11 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, { name: "failed to get cluster", - c: WorkStatusController{ + c: &WorkStatusController{ Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster1", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), + InformerManager: genericmanager.NewMultiClusterInformerManager(context.Background()), WorkPredicateFunc: helper.NewClusterPredicateOnAgent("test"), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, RateLimiterOptions: ratelimiterflag.Options{}, }, work: &workv1alpha1.Work{ @@ -274,12 +268,11 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, { name: "cluster is not ready", - c: WorkStatusController{ + c: &WorkStatusController{ Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(), - InformerManager: genericmanager.GetInstance(), + InformerManager: genericmanager.NewMultiClusterInformerManager(context.Background()), WorkPredicateFunc: helper.NewClusterPredicateOnAgent("test"), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, RateLimiterOptions: ratelimiterflag.Options{}, }, work: &workv1alpha1.Work{ @@ -311,10 +304,21 @@ func TestWorkStatusController_Reconcile(t *testing.T) { }, } + wantDeleted := !tt.work.DeletionTimestamp.IsZero() if err := tt.c.Client.Create(context.Background(), tt.work); err != nil { t.Fatalf("Failed to create cluster: %v", err) } + // The fake client strips DeletionTimestamp during Create. When the test + // case wants the Work to be in terminating state, issue a Delete after + // Create — the preset finalizer keeps the object around and the fake + // client sets DeletionTimestamp on the stored copy. + if wantDeleted { + if err := tt.c.Client.Delete(context.Background(), tt.work); err != nil { + t.Fatalf("Failed to delete work to set DeletionTimestamp: %v", err) + } + } + res, err := tt.c.Reconcile(context.Background(), req) assert.Equal(t, tt.expectRes, res) if tt.existErr { @@ -335,10 +339,9 @@ func TestWorkStatusController_getEventHandler(t *testing.T) { c := WorkStatusController{ Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(), - InformerManager: genericmanager.GetInstance(), + InformerManager: genericmanager.NewMultiClusterInformerManager(context.Background()), WorkPredicateFunc: helper.NewClusterPredicateOnAgent("test"), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, RateLimiterOptions: ratelimiterflag.Options{}, eventHandler: nil, worker: util.NewAsyncWorker(opt), @@ -351,10 +354,9 @@ func TestWorkStatusController_getEventHandler(t *testing.T) { func TestWorkStatusController_RunWorkQueue(_ *testing.T) { c := WorkStatusController{ Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse)).Build(), - InformerManager: genericmanager.GetInstance(), + InformerManager: genericmanager.NewMultiClusterInformerManager(context.Background()), WorkPredicateFunc: helper.NewClusterPredicateOnAgent("test"), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, RateLimiterOptions: ratelimiterflag.Options{}, eventHandler: nil, Context: context.Background(), @@ -566,138 +568,122 @@ func newPod(workNs, workName string, wrongAnnotations ...bool) *corev1.Pod { return pod } +func newPodWithoutWorkNS(workName string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "default", + Annotations: map[string]string{ + workv1alpha2.WorkNameAnnotation: workName, + }, + }, + } +} + func TestWorkStatusController_syncWorkStatus(t *testing.T) { cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) workName := "work" workNs := "karmada-es-cluster" workUID := "92345678-1234-5678-1234-567812345678" + matchingManifest := []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`) + mismatchedManifest := []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod1","namespace":"default"}}`) tests := []struct { - name string - obj *unstructured.Unstructured - pod *corev1.Pod - raw []byte - controllerWithoutInformer bool - expectedError bool - wrongWorkNS bool - workApplyFunc func(work *workv1alpha1.Work) - assertFunc func(t *testing.T, dynamicClientSets *dynamicfake.FakeDynamicClient) + name string + obj *unstructured.Unstructured + pod *corev1.Pod + raw []byte + registerInformer bool + invalidKey util.QueueKey + clientGetError error + workApplyFunc func(work *workv1alpha1.Work) + expectedError bool + expectManifestStatus bool }{ { - name: "failed to exec NeedUpdate", - obj: newPodObj("karmada-es-cluster"), - pod: newPod(workNs, workName), - raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), - controllerWithoutInformer: true, - expectedError: true, - }, - { - name: "invalid key, wrong WorkNamespaceLabel in obj", - obj: newPodObj("karmada-cluster"), - pod: newPod(workNs, workName), - raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), - controllerWithoutInformer: true, - expectedError: true, + name: "key is not a FederatedKey, returns error", + invalidKey: util.QueueKey("not-a-federated-key"), + obj: newPodObj(workNs), + pod: newPod(workNs, workName), + raw: matchingManifest, + registerInformer: true, + expectedError: true, }, { - name: "failed to GetObjectFromCache, wrong InformerManager in WorkStatusController", - obj: newPodObj("karmada-es-cluster"), - pod: newPod(workNs, workName), - raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), - controllerWithoutInformer: false, - expectedError: true, + name: "informer manager has no cluster, returns error", + obj: newPodObj(workNs), + pod: newPod(workNs, workName), + raw: matchingManifest, + registerInformer: false, + expectedError: true, }, { - name: "obj not found in informer, wrong dynamicClientSet without pod", - obj: newPodObj("karmada-es-cluster"), - raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), - controllerWithoutInformer: true, - expectedError: false, + name: "obj not found in informer cache, returns nil", + obj: newPodObj(workNs), + raw: matchingManifest, + registerInformer: true, + expectedError: false, }, { - name: "workNamespace is zero, set wrong label 'test' in pod", - obj: newPodObj("karmada-es-cluster"), - pod: newPod(workNs, workName, true), - raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), - controllerWithoutInformer: true, - expectedError: false, + name: "obj missing WorkNamespaceAnnotation, ignored", + obj: newPodObj(workNs), + pod: newPodWithoutWorkNS(workName), + raw: matchingManifest, + registerInformer: true, + expectedError: false, }, { - name: "failed to exec Client.Get, set wrong name in work", - obj: newPodObj("karmada-es-cluster"), - pod: newPod(workNs, workName), - raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), - controllerWithoutInformer: true, - expectedError: false, + name: "work not found in client, returns nil", + obj: newPodObj(workNs), + pod: newPod(workNs, workName), + raw: matchingManifest, + registerInformer: true, + expectedError: false, workApplyFunc: func(work *workv1alpha1.Work) { work.SetName(fmt.Sprintf("%v-test", workNs)) }, }, { - name: "failed to getRawManifest, wrong Manifests in work", - obj: newPodObj("karmada-es-cluster"), - pod: newPod(workNs, workName), - raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod1","namespace":"default"}}`), - controllerWithoutInformer: true, - expectedError: true, - }, - { - name: "failed to exec GetClusterName, wrong workNamespace", - obj: newPodObj("karmada-es-cluster"), - pod: newPod(workNs, workName), - raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), - controllerWithoutInformer: true, - expectedError: true, - wrongWorkNS: true, + name: "client.Get returns non-NotFound error, propagates", + obj: newPodObj(workNs), + pod: newPod(workNs, workName), + raw: matchingManifest, + registerInformer: true, + clientGetError: apierrors.NewServiceUnavailable("api down"), + expectedError: true, }, { - name: "skips work with suspend dispatching", - obj: newPodObj("karmada-es-cluster"), - pod: newPod(workNs, workName), - raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), - controllerWithoutInformer: true, - expectedError: false, - workApplyFunc: func(work *workv1alpha1.Work) { - work.Spec.SuspendDispatching = ptr.To(true) - }, + name: "reflectStatus fails when manifest does not match observed object", + obj: newPodObj(workNs), + pod: newPod(workNs, workName), + raw: mismatchedManifest, + registerInformer: true, + expectedError: true, }, { - name: "skips work with deletion timestamp", - obj: newPodObj("karmada-es-cluster"), - pod: newPod(workNs, workName), - raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), - controllerWithoutInformer: true, - expectedError: false, + name: "skips work with deletion timestamp", + obj: newPodObj(workNs), + pod: newPod(workNs, workName), + raw: matchingManifest, + registerInformer: true, + expectedError: false, workApplyFunc: func(work *workv1alpha1.Work) { work.SetDeletionTimestamp(ptr.To(metav1.Now())) }, }, { - name: "resource not found, work suspendDispatching true, should not recreate resource", - obj: newPodObj("karmada-es-cluster"), - pod: nil, // Simulate the resource does not exist in the member cluster - raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), - controllerWithoutInformer: true, - expectedError: false, - workApplyFunc: func(work *workv1alpha1.Work) { - work.Spec.SuspendDispatching = ptr.To(true) - }, - assertFunc: func(t *testing.T, dynamicClientSets *dynamicfake.FakeDynamicClient) { - gvr := corev1.SchemeGroupVersion.WithResource("pods") - obj, err := dynamicClientSets.Resource(gvr).Namespace("default").Get(context.Background(), "pod", metav1.GetOptions{}) - assert.True(t, apierrors.IsNotFound(err), "expected a NotFound error but got: %s", err) - assert.Nil(t, obj) - }, + name: "happy path updates work ManifestStatuses", + obj: newPodObj(workNs), + pod: newPod(workNs, workName), + raw: matchingManifest, + registerInformer: true, + expectedError: false, + expectManifestStatus: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if tt.wrongWorkNS { - workNs = "karmada-cluster" - tt.pod = newPod(workNs, workName) - } - var dynamicClientSet *dynamicfake.FakeDynamicClient if tt.pod != nil { dynamicClientSet = dynamicfake.NewSimpleDynamicClient(scheme.Scheme, tt.pod) @@ -705,8 +691,8 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) { dynamicClientSet = dynamicfake.NewSimpleDynamicClient(scheme.Scheme) } - var c WorkStatusController - if tt.controllerWithoutInformer { + var c *WorkStatusController + if tt.registerInformer { c = newWorkStatusController(cluster, dynamicClientSet) } else { c = newWorkStatusController(cluster) @@ -716,13 +702,32 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) { if tt.workApplyFunc != nil { tt.workApplyFunc(work) } - - key, _ := generateKey(tt.obj) - if err := c.Client.Create(context.Background(), work); err != nil { t.Fatalf("Failed to create work: %v", err) } + if tt.clientGetError != nil { + c.Client = fake.NewClientBuilder(). + WithScheme(gclient.NewSchema()). + WithObjects(cluster, work). + WithStatusSubresource(&workv1alpha1.Work{}). + WithInterceptorFuncs(interceptor.Funcs{ + Get: func(_ context.Context, _ client.WithWatch, _ client.ObjectKey, obj client.Object, _ ...client.GetOption) error { + if _, ok := obj.(*workv1alpha1.Work); ok { + return tt.clientGetError + } + return nil + }, + }).Build() + } + + var key util.QueueKey + if tt.invalidKey != nil { + key = tt.invalidKey + } else { + key, _ = generateKey(tt.obj) + } + err := c.syncWorkStatus(key) if tt.expectedError { assert.Error(t, err) @@ -730,20 +735,54 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) { assert.NoError(t, err) } - if tt.assertFunc != nil { - tt.assertFunc(t, dynamicClientSet) + if tt.expectManifestStatus { + updated := &workv1alpha1.Work{} + assert.NoError(t, c.Client.Get(context.Background(), client.ObjectKey{Namespace: workNs, Name: workName}, updated)) + assert.Lenf(t, updated.Status.ManifestStatuses, 1, "expected one manifest status") + assert.Equal(t, "Pod", updated.Status.ManifestStatuses[0].Identifier.Kind) + assert.Equal(t, "pod", updated.Status.ManifestStatuses[0].Identifier.Name) } }) } } -func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets ...*dynamicfake.FakeDynamicClient) WorkStatusController { - c := WorkStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster).WithStatusSubresource().Build(), - InformerManager: genericmanager.GetInstance(), +func TestWorkStatusController_buildResourceInformers(t *testing.T) { + cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) + raw := []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`) + work := testhelper.NewWork("work", "karmada-es-cluster", "92345678-1234-5678-1234-567812345678", raw) + dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme, newPod("karmada-es-cluster", "work")) + + t.Run("registers informer and requeues before cache sync", func(t *testing.T) { + c := newWorkStatusController(cluster) + c.ClusterDynamicClientSetFunc = func(clusterName string, _ client.Client, _ *util.ClientOption) (*util.DynamicClusterClient, error) { + return &util.DynamicClusterClient{ClusterName: clusterName, DynamicClientSet: dynamicClientSet}, nil + } + + res, err := c.buildResourceInformers(cluster, work) + assert.NoError(t, err) + assert.Equal(t, controllerruntime.Result{RequeueAfter: memberInformerNotSyncedRequeueAfter}, res) + + singleClusterManager := c.InformerManager.GetSingleClusterManager(cluster.Name) + assert.NotNil(t, singleClusterManager) + assert.True(t, singleClusterManager.IsHandlerExist(corev1.SchemeGroupVersion.WithResource("pods"), c.getEventHandler())) + }) + + t.Run("returns error for invalid manifest", func(t *testing.T) { + c := newWorkStatusController(cluster, dynamicClientSet) + invalidWork := testhelper.NewWork("work", "karmada-es-cluster", "92345678-1234-5678-1234-567812345678", []byte(`{"apiVersion":"v1","kind":"Pod"},`)) + + res, err := c.buildResourceInformers(cluster, invalidWork) + assert.Error(t, err) + assert.Equal(t, controllerruntime.Result{}, res) + }) +} + +func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets ...*dynamicfake.FakeDynamicClient) *WorkStatusController { + c := &WorkStatusController{ + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster).WithStatusSubresource(&workv1alpha1.Work{}).Build(), + InformerManager: genericmanager.NewMultiClusterInformerManager(context.Background()), WorkPredicateFunc: helper.NewClusterPredicateOnAgent("test"), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, RateLimiterOptions: ratelimiterflag.Options{}, eventHandler: nil, EventRecorder: record.NewFakeRecorder(1024), @@ -756,10 +795,14 @@ func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets DefaultInterpreter: native.NewDefaultInterpreter(), }, } + c.worker = util.NewAsyncWorker(util.Options{ + Name: "work-status-test", + KeyFunc: generateKey, + ReconcileFunc: c.syncWorkStatus, + }) if len(dynamicClientSets) > 0 { c.ResourceInterpreter = FakeResourceInterpreter{DefaultInterpreter: native.NewDefaultInterpreter()} - c.ObjectWatcher = objectwatcher.NewObjectWatcher(c.Client, c.RESTMapper, util.NewClusterDynamicClientSetForAgent, nil, c.ResourceInterpreter) // Generate InformerManager clusterName := cluster.Name @@ -777,153 +820,12 @@ func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets return c } -func TestWorkStatusController_getSingleClusterManager(t *testing.T) { - clusterName := "cluster" - cluster := newCluster(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) - - // Generate InformerManager - ctx := t.Context() - - dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme) - - tests := []struct { - name string - rightClusterName bool - expectInformer bool - expectError bool - wrongClusterDynamicClientSetFunc bool - }{ - { - name: "normal case", - rightClusterName: true, - expectInformer: true, - expectError: false, - }, - { - name: "failed to build dynamic cluster client", - rightClusterName: false, - expectInformer: false, - expectError: true, - wrongClusterDynamicClientSetFunc: true, - }, - { - name: "failed to get single cluster", - rightClusterName: false, - expectInformer: true, - expectError: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := newWorkStatusController(cluster) - m := genericmanager.NewMultiClusterInformerManager(ctx) - if tt.rightClusterName { - m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) - } else { - m.ForCluster("test", dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) - } - m.Start(clusterName) - m.WaitForCacheSync(clusterName) - c.InformerManager = m - - if tt.wrongClusterDynamicClientSetFunc { - c.ClusterDynamicClientSetFunc = NewClusterDynamicClientSetForAgentWithError - } else { - c.ClusterDynamicClientSetFunc = util.NewClusterDynamicClientSet - c.Client = fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects( - &clusterv1alpha1.Cluster{ - ObjectMeta: metav1.ObjectMeta{Name: "cluster"}, - Spec: clusterv1alpha1.ClusterSpec{ - APIEndpoint: "https://127.0.0.1", - SecretRef: &clusterv1alpha1.LocalSecretReference{Namespace: "ns1", Name: "secret1"}, - }, - Status: clusterv1alpha1.ClusterStatus{ - Conditions: []metav1.Condition{ - { - Type: clusterv1alpha1.ClusterConditionReady, - Status: metav1.ConditionTrue, - }, - }, - }, - }, - &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"}, - Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token"), clusterv1alpha1.SecretCADataKey: testCA}, - }).Build() - } - - informerManager, err := c.getSingleClusterManager(cluster) - - if tt.expectInformer { - assert.NotEmpty(t, informerManager) - } else { - assert.Empty(t, informerManager) - } - - if tt.expectError { - assert.NotEmpty(t, err) - } else { - assert.Empty(t, err) - } - }) - } -} - -func TestWorkStatusController_recreateResourceIfNeeded(t *testing.T) { - c := WorkStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), - WorkPredicateFunc: helper.NewClusterPredicateOnAgent("test"), - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, - RateLimiterOptions: ratelimiterflag.Options{}, - } - - workUID := "92345678-1234-5678-1234-567812345678" - raw := []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`) - work := testhelper.NewWork("work", "default", workUID, raw) - - obj := &unstructured.Unstructured{ - Object: map[string]any{ - "apiVersion": "v1", - "kind": "Pod", - "metadata": map[string]any{ - "name": "pod1", - "namespace": "default", - "annotations": map[string]any{ - workv1alpha2.WorkNamespaceAnnotation: "karmada-es-cluster", - }, - }, - }, - } - - key, _ := generateKey(obj) - - fedKey, ok := key.(keys.FederatedKey) - if !ok { - t.Fatalf("Invalid key, key: %v", key) - } - - t.Run("normal case", func(t *testing.T) { - err := c.recreateResourceIfNeeded(context.Background(), work, fedKey) - assert.Empty(t, err) - }) - - t.Run("failed to UnmarshalJSON", func(t *testing.T) { - work.Spec.Workload.Manifests[0].RawExtension.Raw = []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}},`) - err := c.recreateResourceIfNeeded(context.Background(), work, fedKey) - assert.NotEmpty(t, err) - }) -} - func TestWorkStatusController_buildStatusIdentifier(t *testing.T) { c := WorkStatusController{ Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), + InformerManager: genericmanager.NewMultiClusterInformerManager(context.Background()), WorkPredicateFunc: helper.NewClusterPredicateOnAgent("test"), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, RateLimiterOptions: ratelimiterflag.Options{}, } @@ -981,10 +883,9 @@ func TestWorkStatusController_buildStatusIdentifier(t *testing.T) { func TestWorkStatusController_mergeStatus(t *testing.T) { c := WorkStatusController{ Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)).Build(), - InformerManager: genericmanager.GetInstance(), + InformerManager: genericmanager.NewMultiClusterInformerManager(context.Background()), WorkPredicateFunc: helper.NewClusterPredicateOnAgent("test"), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterCacheSyncTimeout: metav1.Duration{}, RateLimiterOptions: ratelimiterflag.Options{}, } @@ -995,63 +896,6 @@ func TestWorkStatusController_mergeStatus(t *testing.T) { assert.Equal(t, []workv1alpha1.ManifestStatus{newStatus}, actual) } -func TestWorkStatusController_registerInformersAndStart(t *testing.T) { - clusterName := "cluster" - cluster := newCluster(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) - - // Generate InformerManager - ctx := t.Context() - dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme) - c := newWorkStatusController(cluster) - opt := util.Options{ - Name: "opt", - KeyFunc: nil, - ReconcileFunc: nil, - } - c.worker = util.NewAsyncWorker(opt) - - workUID := "92345678-1234-5678-1234-567812345678" - raw := []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`) - work := testhelper.NewWork("work", "default", workUID, raw) - - t.Run("normal case", func(t *testing.T) { - m := genericmanager.NewMultiClusterInformerManager(ctx) - m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer - m.Start(clusterName) - m.WaitForCacheSync(clusterName) - c.InformerManager = m - - err := c.registerInformersAndStart(cluster, work) - assert.Empty(t, err) - }) - - t.Run("failed to getSingleClusterManager", func(t *testing.T) { - c := newWorkStatusController(cluster) - m := genericmanager.NewMultiClusterInformerManager(ctx) - m.ForCluster("test", dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer - m.Start(clusterName) - m.WaitForCacheSync(clusterName) - c.InformerManager = m - c.ClusterDynamicClientSetFunc = NewClusterDynamicClientSetForAgentWithError - - err := c.registerInformersAndStart(cluster, work) - assert.NotEmpty(t, err) - }) - - t.Run("failed to getGVRsFromWork", func(t *testing.T) { - work.Spec.Workload.Manifests[0].RawExtension.Raw = []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}},`) - - m := genericmanager.NewMultiClusterInformerManager(ctx) - m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer - m.Start(clusterName) - m.WaitForCacheSync(clusterName) - c.InformerManager = m - - err := c.registerInformersAndStart(cluster, work) - assert.NotEmpty(t, err) - }) -} - func TestWorkStatusController_interpretHealth(t *testing.T) { tests := []struct { name string diff --git a/pkg/util/helper/cache.go b/pkg/util/helper/cache.go index 1f1b84c35a0e..052e36f35267 100644 --- a/pkg/util/helper/cache.go +++ b/pkg/util/helper/cache.go @@ -27,13 +27,75 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" "github.com/karmada-io/karmada/pkg/util/restmapper" ) +// EnsureInformerHandlersReady ensures the member cluster informers for the given resources exist +// and have the handler registered. It returns whether those informers have already synced without waiting. +func EnsureInformerHandlersReady( + cluster *clusterv1alpha1.Cluster, + gvrTargets []schema.GroupVersionResource, + handler cache.ResourceEventHandler, + manager genericmanager.MultiClusterInformerManager, + clusterClientSetFunc util.NewClusterDynamicClientSetFunc, + kubeClientSet client.Client, + clusterClientOption *util.ClientOption, +) (bool, error) { + singleClusterInformerManager, err := getSingleClusterManager(cluster, manager, clusterClientSetFunc, kubeClientSet, clusterClientOption) + if err != nil { + return false, err + } + + // We need an immediate check here and don't need to wait, the timeout is set to 0. + singleClusterInformerManager.WaitForCacheSyncWithTimeout(0) + allSynced := true + for _, gvr := range gvrTargets { + if !singleClusterInformerManager.IsHandlerExist(gvr, handler) { + allSynced = false + singleClusterInformerManager.ForResource(gvr, handler) + continue + } + + if !singleClusterInformerManager.IsInformerSynced(gvr) { + allSynced = false + } + } + if allSynced { + return true, nil + } + + manager.Start(cluster.Name) + return false, nil +} + +func getSingleClusterManager( + cluster *clusterv1alpha1.Cluster, + manager genericmanager.MultiClusterInformerManager, + clusterClientSetFunc util.NewClusterDynamicClientSetFunc, + kubeClientSet client.Client, + clusterClientOption *util.ClientOption, +) (genericmanager.SingleClusterInformerManager, error) { + singleClusterInformerManager := manager.GetSingleClusterManager(cluster.Name) + if singleClusterInformerManager != nil { + return singleClusterInformerManager, nil + } + + dynamicClusterClient, err := clusterClientSetFunc(cluster.Name, kubeClientSet, clusterClientOption) + if err != nil { + klog.ErrorS(err, "Failed to build dynamic cluster client.", "cluster", cluster.Name) + return nil, err + } + return manager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0), nil +} + // GetObjectFromCache gets full object information from cache by key in worker queue. func GetObjectFromCache( restMapper meta.RESTMapper, diff --git a/pkg/util/helper/cache_test.go b/pkg/util/helper/cache_test.go index 74ba68317c02..80151e84a01f 100644 --- a/pkg/util/helper/cache_test.go +++ b/pkg/util/helper/cache_test.go @@ -18,6 +18,7 @@ package helper import ( "context" + "errors" "reflect" "testing" @@ -28,11 +29,109 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + controllerfake "sigs.k8s.io/controller-runtime/pkg/client/fake" + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" + "github.com/karmada-io/karmada/pkg/util/gclient" ) +func TestEnsureInformerHandlersReady(t *testing.T) { + cluster := &clusterv1alpha1.Cluster{ObjectMeta: metav1.ObjectMeta{Name: "cluster"}} + gvr := corev1.SchemeGroupVersion.WithResource("pods") + handler := &cache.ResourceEventHandlerFuncs{} + dynamicClient := fake.NewSimpleDynamicClient(scheme.Scheme) + kubeClient := controllerfake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build() + + tests := []struct { + name string + manager func(context.Context) genericmanager.MultiClusterInformerManager + clusterClientSetFunc util.NewClusterDynamicClientSetFunc + wantSynced bool + wantErr bool + }{ + { + name: "registers missing handler and starts informer", + manager: func(ctx context.Context) genericmanager.MultiClusterInformerManager { + m := genericmanager.NewMultiClusterInformerManager(ctx) + m.ForCluster(cluster.Name, dynamicClient, 0) + return m + }, + clusterClientSetFunc: func(string, client.Client, *util.ClientOption) (*util.DynamicClusterClient, error) { + return nil, errors.New("should use existing manager") + }, + wantSynced: false, + }, + { + name: "returns synced when handler exists and informer synced", + manager: func(ctx context.Context) genericmanager.MultiClusterInformerManager { + m := genericmanager.NewMultiClusterInformerManager(ctx) + m.ForCluster(cluster.Name, dynamicClient, 0).ForResource(gvr, handler) + m.Start(cluster.Name) + m.WaitForCacheSync(cluster.Name) + return m + }, + clusterClientSetFunc: func(string, client.Client, *util.ClientOption) (*util.DynamicClusterClient, error) { + return nil, errors.New("should use existing manager") + }, + wantSynced: true, + }, + { + name: "creates manager with cluster client when missing", + manager: func(ctx context.Context) genericmanager.MultiClusterInformerManager { + return genericmanager.NewMultiClusterInformerManager(ctx) + }, + clusterClientSetFunc: func(clusterName string, _ client.Client, _ *util.ClientOption) (*util.DynamicClusterClient, error) { + return &util.DynamicClusterClient{ClusterName: clusterName, DynamicClientSet: dynamicClient}, nil + }, + wantSynced: false, + }, + { + name: "returns error when cluster client creation fails", + manager: func(ctx context.Context) genericmanager.MultiClusterInformerManager { + return genericmanager.NewMultiClusterInformerManager(ctx) + }, + clusterClientSetFunc: func(string, client.Client, *util.ClientOption) (*util.DynamicClusterClient, error) { + return nil, errors.New("failed to build dynamic client") + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr := tt.manager(t.Context()) + + gotSynced, err := EnsureInformerHandlersReady(cluster, []schema.GroupVersionResource{gvr}, handler, mgr, tt.clusterClientSetFunc, kubeClient, nil) + if tt.wantErr { + if err == nil { + t.Fatalf("expected error") + } + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if gotSynced != tt.wantSynced { + t.Fatalf("EnsureInformerHandlersReady() synced = %v, want %v", gotSynced, tt.wantSynced) + } + + singleClusterManager := mgr.GetSingleClusterManager(cluster.Name) + if singleClusterManager == nil { + t.Fatalf("expected single cluster manager") + } + if !singleClusterManager.IsHandlerExist(gvr, handler) { + t.Fatalf("expected handler to be registered") + } + }) + } +} + func TestGetObjectFromCache(t *testing.T) { type args struct { restMapper meta.RESTMapper diff --git a/pkg/util/lifted/doc.go b/pkg/util/lifted/doc.go index 28965a9c0fee..99a3b81792ef 100644 --- a/pkg/util/lifted/doc.go +++ b/pkg/util/lifted/doc.go @@ -47,9 +47,6 @@ package lifted | genutils_test.go | https://github.com/kubernetes/kubernetes/blob/release-1.26/cmd/genutils/genutils_test.go#L37-L42 | func TestNotDir | N | | nodeaffinity.go | https://github.com/kubernetes/kubernetes/blob/release-1.26/staging/src/k8s.io/component-helpers/scheduling/corev1/nodeaffinity/nodeaffinity.go#L203-L242 | func NodeSelectorRequirementsAsSelector | Y | | nodeaffinity_test.go | https://github.com/kubernetes/kubernetes/blob/release-1.26/staging/src/k8s.io/component-helpers/scheduling/corev1/nodeaffinity/nodeaffinity_test.go#L297-L360 | func TestNodeSelectorRequirementsAsSelector | N | -| objectwatcher.go | https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/propagatedversion.go#L35-L43 | func ObjectVersion | N | -| objectwatcher.go | https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/propagatedversion.go#L45-L59 | func ObjectNeedsUpdate | N | -| objectwatcher.go | https://github.com/kubernetes-retired/kubefed/blob/master/pkg/controller/util/meta.go#L82-L103 | func objectMetaObjEquivalent | Y | | podtemplate.go | https://github.com/kubernetes/kubernetes/blob/release-1.26/pkg/controller/controller_utils.go#L466-L472 | func getPodsLabelSet | N | | podtemplate.go | https://github.com/kubernetes/kubernetes/blob/release-1.26/pkg/controller/controller_utils.go#L474-L478 | func getPodsFinalizers | N | | podtemplate.go | https://github.com/kubernetes/kubernetes/blob/release-1.26/pkg/controller/controller_utils.go#L480-L486 | func getPodsAnnotationSet | N | diff --git a/pkg/util/lifted/objectwatcher.go b/pkg/util/lifted/objectwatcher.go deleted file mode 100644 index 574ee652b851..000000000000 --- a/pkg/util/lifted/objectwatcher.go +++ /dev/null @@ -1,88 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// This code is lifted from the kubefed codebase. It's a list of functions to determine whether the provided cluster -// object needs to be updated according to the desired object and the recorded version. -// For reference: -// https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/propagatedversion.go#L30-L59 -// https://github.com/kubernetes-retired/kubefed/blob/master/pkg/controller/util/meta.go#L82-L103 - -package lifted - -import ( - "fmt" - "reflect" - "strings" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" -) - -const ( - generationPrefix = "gen:" - resourceVersionPrefix = "rv:" -) - -// +lifted:source=https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/propagatedversion.go#L35-L43 - -// ObjectVersion retrieves the field type-prefixed value used for -// determining currency of the given cluster object. -func ObjectVersion(clusterObj *unstructured.Unstructured) string { - generation := clusterObj.GetGeneration() - if generation != 0 { - return fmt.Sprintf("%s%d", generationPrefix, generation) - } - return fmt.Sprintf("%s%s", resourceVersionPrefix, clusterObj.GetResourceVersion()) -} - -// +lifted:source=https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/propagatedversion.go#L45-L59 - -// ObjectNeedsUpdate determines whether the 2 objects provided cluster -// object needs to be updated according to the desired object and the -// recorded version. -func ObjectNeedsUpdate(desiredObj, clusterObj *unstructured.Unstructured, recordedVersion string) bool { - targetVersion := ObjectVersion(clusterObj) - - if recordedVersion != targetVersion { - return true - } - - // If versions match and the version is sourced from the - // generation field, a further check of metadata equivalency is - // required. - return strings.HasPrefix(targetVersion, generationPrefix) && !objectMetaObjEquivalent(desiredObj, clusterObj) -} - -// +lifted:source=https://github.com/kubernetes-retired/kubefed/blob/master/pkg/controller/util/meta.go#L82-L103 -// +lifted:changed - -// objectMetaObjEquivalent checks if cluster-independent, user provided data in two given ObjectMeta are equal. If in -// the future the ObjectMeta structure is expanded then any field that is not populated -// by the api server should be included here. -func objectMetaObjEquivalent(a, b metav1.Object) bool { - if a.GetName() != b.GetName() { - return false - } - if a.GetNamespace() != b.GetNamespace() { - return false - } - aLabels := a.GetLabels() - bLabels := b.GetLabels() - if !reflect.DeepEqual(aLabels, bLabels) && (len(aLabels) != 0 || len(bLabels) != 0) { - return false - } - return true -} diff --git a/pkg/util/lifted/objectwatcher_test.go b/pkg/util/lifted/objectwatcher_test.go deleted file mode 100644 index 81b0032e365e..000000000000 --- a/pkg/util/lifted/objectwatcher_test.go +++ /dev/null @@ -1,167 +0,0 @@ -/* -Copyright 2023 The Karmada Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package lifted - -import ( - "fmt" - "testing" - - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" -) - -func TestObjectVersion(t *testing.T) { - t.Run("have generation", func(t *testing.T) { - obj := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "metadata": map[string]interface{}{ - "generation": int64(1), - }, - }, - } - res := ObjectVersion(obj) - expect := fmt.Sprintf("%s%d", generationPrefix, 1) - if res != expect { - t.Errorf("expect %v, but got %v", expect, res) - } - }) - - t.Run("don't have generation", func(t *testing.T) { - obj := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "metadata": map[string]interface{}{ - "resourceVersion": "version", - }, - }, - } - res := ObjectVersion(obj) - expect := fmt.Sprintf("%s%s", resourceVersionPrefix, "version") - if res != expect { - t.Errorf("expect %v, but got %v", expect, res) - } - }) -} - -func TestObjectMetaObjEquivalent(t *testing.T) { - tests := []struct { - name string - a *unstructured.Unstructured - b *unstructured.Unstructured - expect bool - }{ - { - name: "name not equal", - a: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "metadata": map[string]interface{}{ - "name": "a", - }, - }, - }, - b: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "metadata": map[string]interface{}{ - "name": "b", - }, - }, - }, - expect: false, - }, - { - name: "namespace not equal", - a: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "metadata": map[string]interface{}{ - "namespace": "a", - }, - }, - }, - b: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "metadata": map[string]interface{}{ - "namespace": "b", - }, - }, - }, - expect: false, - }, - { - name: "label not equal", - a: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]interface{}{"a": "b"}, - }, - }, - }, - b: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]interface{}{"c": "d"}, - }, - }, - }, - expect: false, - }, - { - name: "everything is the same", - a: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "metadata": map[string]interface{}{ - "name": "a", - "namespace": "a", - "labels": "a", - }, - }, - }, - b: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "metadata": map[string]interface{}{ - "name": "a", - "namespace": "a", - "labels": "a", - }, - }, - }, - expect: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - actual := objectMetaObjEquivalent(tt.a, tt.b) - if actual != tt.expect { - t.Errorf("expect %v but got %v", tt.expect, actual) - } - }) - } -} - -func TestObjectNeedsUpdate(t *testing.T) { - clusterObj := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "metadata": map[string]interface{}{ - "generation": int64(1), - }, - }, - } - desiredObj := clusterObj - recordedVersion := fmt.Sprintf("%s%d", generationPrefix, 2) - actual := ObjectNeedsUpdate(desiredObj, clusterObj, recordedVersion) - if actual != true { - t.Errorf("expect %v, but got %v", true, actual) - } -} diff --git a/pkg/util/objectwatcher/objectwatcher.go b/pkg/util/objectwatcher/objectwatcher.go index 0a56d55e75ac..855dc8f1b9f2 100644 --- a/pkg/util/objectwatcher/objectwatcher.go +++ b/pkg/util/objectwatcher/objectwatcher.go @@ -37,7 +37,6 @@ import ( "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" "github.com/karmada-io/karmada/pkg/util/helper" - "github.com/karmada-io/karmada/pkg/util/lifted" "github.com/karmada-io/karmada/pkg/util/restmapper" ) @@ -58,7 +57,7 @@ type ObjectWatcher interface { Create(ctx context.Context, clusterName string, desireObj *unstructured.Unstructured) error Update(ctx context.Context, clusterName string, desireObj, clusterObj *unstructured.Unstructured) (operationResult OperationResult, err error) Delete(ctx context.Context, clusterName string, desireObj *unstructured.Unstructured) error - NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) bool + GetVersionRecord(clusterName string, object *unstructured.Unstructured) (string, bool) } type objectWatcherImpl struct { @@ -73,7 +72,7 @@ type objectWatcherImpl struct { } // NewObjectWatcher returns an instance of ObjectWatcher -func NewObjectWatcher(kubeClientSet client.Client, restMapper meta.RESTMapper, clusterClientSetFunc util.NewClusterDynamicClientSetFunc, clusterClientOption *util.ClientOption, interpreter resourceinterpreter.ResourceInterpreter) ObjectWatcher { +func NewObjectWatcher(kubeClientSet client.Client, restMapper meta.RESTMapper, clusterClientSetFunc util.NewClusterDynamicClientSetFunc, clusterClientOption *util.ClientOption, interpreter resourceinterpreter.ResourceInterpreter, informerManager genericmanager.MultiClusterInformerManager) ObjectWatcher { return &objectWatcherImpl{ KubeClientSet: kubeClientSet, VersionRecord: make(map[string]map[string]string), @@ -81,7 +80,7 @@ func NewObjectWatcher(kubeClientSet client.Client, restMapper meta.RESTMapper, c ClusterClientSetFunc: clusterClientSetFunc, ClusterClientOption: clusterClientOption, resourceInterpreter: interpreter, - InformerManager: genericmanager.GetInstance(), + InformerManager: informerManager, } } @@ -193,6 +192,9 @@ func (o *objectWatcherImpl) Update(ctx context.Context, clusterName string, desi } // record version + // TODO: Occasionally, due to the nature of asynchronous processing, the event generated by an object update may + // reach the informer before the corresponding recording operation is executed. + // This can sometimes lead to redundant reconciliation by the execution controller. o.recordVersion(resource, clusterName) if clusterObj.GetResourceVersion() == resource.GetResourceVersion() { @@ -267,7 +269,7 @@ func (o *objectWatcherImpl) genObjectKey(obj *unstructured.Unstructured) string // recordVersion will add or update resource version records func (o *objectWatcherImpl) recordVersion(clusterObj *unstructured.Unstructured, clusterName string) { - objVersion := lifted.ObjectVersion(clusterObj) + objVersion := clusterObj.GetResourceVersion() objectKey := o.genObjectKey(clusterObj) o.Lock.Lock() @@ -294,10 +296,8 @@ func (o *objectWatcherImpl) deleteVersionRecord(clusterName, resourceName string delete(o.VersionRecord[clusterName], resourceName) } -func (o *objectWatcherImpl) NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) bool { - // get resource version - version, _ := o.getVersionRecord(clusterName, desiredObj.GroupVersionKind().String()+"/"+desiredObj.GetNamespace()+"/"+desiredObj.GetName()) - return lifted.ObjectNeedsUpdate(desiredObj, clusterObj, version) +func (o *objectWatcherImpl) GetVersionRecord(clusterName string, object *unstructured.Unstructured) (string, bool) { + return o.getVersionRecord(clusterName, object.GroupVersionKind().String()+"/"+object.GetNamespace()+"/"+object.GetName()) } func (o *objectWatcherImpl) isManagedResource(clusterObj *unstructured.Unstructured) bool { diff --git a/pkg/util/work.go b/pkg/util/work.go index 9ca725880db4..7c9abe333e56 100644 --- a/pkg/util/work.go +++ b/pkg/util/work.go @@ -21,6 +21,7 @@ import ( "strconv" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -29,6 +30,7 @@ import ( workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/util/restmapper" ) // GenEventRef returns the event reference. sets the UID(.spec.uid) that might be missing for fire events. @@ -79,6 +81,41 @@ func IsWorkContains(manifests []workv1alpha1.Manifest, targetResource schema.Gro return false } +// GetGVRsFromWork returns the distinct GroupVersionResources referenced by Work manifests. +func GetGVRsFromWork(restMapper meta.RESTMapper, work *workv1alpha1.Work) ([]schema.GroupVersionResource, error) { + var gvrTargets []schema.GroupVersionResource + gvrSet := map[schema.GroupVersionResource]bool{} + for _, manifest := range work.Spec.Workload.Manifests { + workload := &unstructured.Unstructured{} + if err := workload.UnmarshalJSON(manifest.Raw); err != nil { + klog.ErrorS(err, "Failed to unmarshal workload.", + "workNamespace", work.GetNamespace(), + "workName", work.GetName(), + ) + return nil, err + } + gvk := workload.GroupVersionKind() + gvr, err := restmapper.GetGroupVersionResource(restMapper, gvk) + if err != nil { + klog.ErrorS(err, "Failed to get GVR from GVK for resource.", + "workNamespace", work.GetNamespace(), + "workName", work.GetName(), + "gvk", gvk.String(), + "apiVersion", gvk.GroupVersion().String(), + "kind", gvk.Kind, + "namespace", workload.GetNamespace(), + "name", workload.GetName(), + ) + return nil, err + } + if !gvrSet[gvr] { + gvrSet[gvr] = true + gvrTargets = append(gvrTargets, gvr) + } + } + return gvrTargets, nil +} + // IsWorkSuspendDispatching checks if the work is suspended from dispatching. func IsWorkSuspendDispatching(work *workv1alpha1.Work) bool { return ptr.Deref(work.Spec.SuspendDispatching, false) diff --git a/pkg/util/work_test.go b/pkg/util/work_test.go index bc4041986b65..ef5d02ba8976 100644 --- a/pkg/util/work_test.go +++ b/pkg/util/work_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -213,6 +214,76 @@ func TestIsWorkContains(t *testing.T) { } } +func TestGetGVRsFromWork(t *testing.T) { + restMapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion}) + restMapper.Add(corev1.SchemeGroupVersion.WithKind("Pod"), meta.RESTScopeNamespace) + restMapper.Add(corev1.SchemeGroupVersion.WithKind("Service"), meta.RESTScopeNamespace) + + pod := unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "v1", + "kind": "Pod", + }, + } + podData, _ := pod.MarshalJSON() + + service := unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "v1", + "kind": "Service", + }, + } + serviceData, _ := service.MarshalJSON() + + tests := []struct { + name string + work *workv1alpha1.Work + want []schema.GroupVersionResource + wantErr bool + }{ + { + name: "returns distinct GVRs in manifest order", + work: &workv1alpha1.Work{Spec: workv1alpha1.WorkSpec{Workload: workv1alpha1.WorkloadTemplate{Manifests: []workv1alpha1.Manifest{ + {RawExtension: runtime.RawExtension{Raw: podData}}, + {RawExtension: runtime.RawExtension{Raw: serviceData}}, + {RawExtension: runtime.RawExtension{Raw: podData}}, + }}}}, + want: []schema.GroupVersionResource{ + corev1.SchemeGroupVersion.WithResource("pods"), + corev1.SchemeGroupVersion.WithResource("services"), + }, + }, + { + name: "returns error for invalid manifest", + work: &workv1alpha1.Work{Spec: workv1alpha1.WorkSpec{Workload: workv1alpha1.WorkloadTemplate{Manifests: []workv1alpha1.Manifest{ + {RawExtension: runtime.RawExtension{Raw: []byte(`{"apiVersion":"v1","kind":"Pod"},`)}}, + }}}}, + wantErr: true, + }, + { + name: "returns error for unknown kind", + work: &workv1alpha1.Work{Spec: workv1alpha1.WorkSpec{Workload: workv1alpha1.WorkloadTemplate{Manifests: []workv1alpha1.Manifest{ + {RawExtension: runtime.RawExtension{Raw: []byte(`{"apiVersion":"apps/v1","kind":"Deployment"}`)}}, + }}}}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := GetGVRsFromWork(restMapper, tt.work) + if tt.wantErr { + assert.Error(t, err) + assert.Nil(t, got) + return + } + + assert.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} + func TestIsWorkSuspendDispatching(t *testing.T) { tests := []struct { name string