Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
rateLimiterGetter := util.GetClusterRateLimiterGetter().SetDefaultLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter}

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, clusterClientOption, resourceInterpreter)
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, clusterClientOption, resourceInterpreter, genericmanager.GetInstance())
controllerContext := controllerscontext.Context{
Mgr: mgr,
ObjectWatcher: objectWatcher,
Expand Down Expand Up @@ -346,12 +346,13 @@ func startClusterStatusController(ctx controllerscontext.Context) (bool, error)

func startExecutionController(ctx controllerscontext.Context) (bool, error) {
executionController := &execution.Controller{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), //nolint:staticcheck // Note: GetEventRecorderFor is deprecated in controller-runtime v0.23.0 in favor of GetEventRecorder. This changes event API from v1 events to events.k8s.io. We need to migrate carefully, especially considering the impact on users and RBAC permission changes in installation/deployment tools.
RESTMapper: ctx.Mgr.GetRESTMapper(),
ObjectWatcher: ctx.ObjectWatcher,
InformerManager: genericmanager.GetInstance(),
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), //nolint:staticcheck // Note: GetEventRecorderFor is deprecated in controller-runtime v0.23.0 in favor of GetEventRecorder. This changes event API from v1 events to events.k8s.io. We need to migrate carefully, especially considering the impact on users and RBAC permission changes in installation/deployment tools.
RESTMapper: ctx.Mgr.GetRESTMapper(),
ObjectWatcher: ctx.ObjectWatcher,
InformerManager: genericmanager.GetInstance(),
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent,
}
if err := executionController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand All @@ -366,9 +367,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (bool, error) {
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: genericmanager.GetInstance(),
Context: ctx.Context,
ObjectWatcher: ctx.ObjectWatcher,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
ConcurrentWorkStatusSyncs: ctx.Opts.ConcurrentWorkSyncs,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
ResourceInterpreter: ctx.ResourceInterpreter,
Expand Down
20 changes: 10 additions & 10 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,13 +457,15 @@ func startBindingStatusController(ctx controllerscontext.Context) (enabled bool,

func startExecutionController(ctx controllerscontext.Context) (enabled bool, err error) {
executionController := &execution.Controller{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), //nolint:staticcheck // Note: GetEventRecorderFor is deprecated in controller-runtime v0.23.0 in favor of GetEventRecorder. This changes event API from v1 events to events.k8s.io. We need to migrate carefully, especially considering the impact on users and RBAC permission changes in installation/deployment tools.
RESTMapper: ctx.Mgr.GetRESTMapper(),
ObjectWatcher: ctx.ObjectWatcher,
WorkPredicateFunc: helper.WorkWithinPushClusterPredicate(ctx.Mgr),
InformerManager: genericmanager.GetInstance(),
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), //nolint:staticcheck // Note: GetEventRecorderFor is deprecated in controller-runtime v0.23.0 in favor of GetEventRecorder. This changes event API from v1 events to events.k8s.io. We need to migrate carefully, especially considering the impact on users and RBAC permission changes in installation/deployment tools.
RESTMapper: ctx.Mgr.GetRESTMapper(),
ObjectWatcher: ctx.ObjectWatcher,
WorkPredicateFunc: helper.WorkWithinPushClusterPredicate(ctx.Mgr),
InformerManager: genericmanager.GetInstance(),
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
ClusterClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: ctx.ClusterClientOption,
}
if err := executionController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand All @@ -479,11 +481,9 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: genericmanager.GetInstance(),
Context: ctx.Context,
ObjectWatcher: ctx.ObjectWatcher,
WorkPredicateFunc: helper.WorkWithinPushClusterPredicate(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: ctx.ClusterClientOption,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
ResourceInterpreter: ctx.ResourceInterpreter,
Expand Down Expand Up @@ -868,7 +868,7 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
}
rateLimiterGetter := util.GetClusterRateLimiterGetter().SetDefaultLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter}
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, clusterClientOption, resourceInterpreter)
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, clusterClientOption, resourceInterpreter, genericmanager.GetInstance())

resourceDetector := &detector.ResourceDetector{
DiscoveryClientSet: discoverClientSet,
Expand Down
200 changes: 191 additions & 9 deletions pkg/controllers/execution/execution_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ package execution
import (
"context"
"fmt"
"reflect"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
Expand All @@ -36,15 +40,22 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/detector"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/metrics"
"github.com/karmada-io/karmada/pkg/resourceinterpreter/default/native/prune"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
Expand All @@ -62,18 +73,24 @@ const (
// workSuspendDispatchingConditionReason is the reason for the WorkDispatching condition when dispatching is suspended.
workSuspendDispatchingConditionReason = "SuspendDispatching"
// workDispatchingConditionReason is the reason for the WorkDispatching condition when dispatching is not suspended.
workDispatchingConditionReason = "Dispatching"
workDispatchingConditionReason = "Dispatching"
memberInformerNotSyncedRequeueAfter = 1 * time.Minute
)

// Controller is to sync Work.
type Controller struct {
client.Client // used to operate Work resources.
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
ObjectWatcher objectwatcher.ObjectWatcher
WorkPredicateFunc predicate.Predicate
InformerManager genericmanager.MultiClusterInformerManager
RateLimiterOptions ratelimiterflag.Options
client.Client // used to operate Work resources.
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
ObjectWatcher objectwatcher.ObjectWatcher
WorkPredicateFunc predicate.Predicate
InformerManager genericmanager.MultiClusterInformerManager
eventHandlerOnce sync.Once
eventHandler cache.ResourceEventHandler
eventChannel chan event.TypedGenericEvent[client.Object]
RateLimiterOptions ratelimiterflag.Options
ClusterClientSetFunc util.NewClusterDynamicClientSetFunc
ClusterClientOption *util.ClientOption
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
Expand Down Expand Up @@ -128,11 +145,28 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques
return controllerruntime.Result{}, err
}

gvrTargets, err := util.GetGVRsFromWork(c.RESTMapper, work)
if err != nil {
return controllerruntime.Result{}, err
}

informersSynced, err := helper.EnsureInformerHandlersReady(cluster, gvrTargets, c.getEventHandler(), c.InformerManager, c.ClusterClientSetFunc, c.Client, c.ClusterClientOption)
if err != nil {
klog.ErrorS(err, "Failed to ensure informers for Work.", "namespace", work.GetNamespace(), "name", work.GetName())
return controllerruntime.Result{}, err
}
if !informersSynced {
klog.V(4).InfoS("Member cluster informers are not synced yet.", "namespace", work.GetNamespace(), "name", work.GetName(), "cluster", cluster.Name)
return controllerruntime.Result{RequeueAfter: memberInformerNotSyncedRequeueAfter}, nil
}

return c.syncWork(ctx, clusterName, work)
}

// SetupWithManager creates a controller and register to controller manager.
func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
c.eventChannel = make(chan event.TypedGenericEvent[client.Object], 1024)

ctrlBuilder := controllerruntime.NewControllerManagedBy(mgr).Named(ControllerName).
WithEventFilter(predicate.GenerationChangedPredicate{}).
WithOptions(controller.Options{
Expand All @@ -145,7 +179,10 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
ctrlBuilder.For(&workv1alpha1.Work{})
}

return ctrlBuilder.Complete(c)
return ctrlBuilder.WatchesRawSource(source.Channel[client.Object](
c.eventChannel,
handler.EnqueueRequestsFromMapFunc(c.mapWorkloadToWork),
)).Complete(c)
}

func (c *Controller) syncWork(ctx context.Context, clusterName string, work *workv1alpha1.Work) (controllerruntime.Result, error) {
Expand Down Expand Up @@ -396,3 +433,148 @@ func (c *Controller) eventf(object *unstructured.Unstructured, eventType, reason
}
c.EventRecorder.Eventf(ref, eventType, reason, messageFmt, args...)
}

func (c *Controller) mapWorkloadToWork(_ context.Context, object client.Object) []reconcile.Request {
if object.GetLabels()[util.ManagedByKarmadaLabel] != util.ManagedByKarmadaLabelValue {
klog.V(5).InfoS("Ignore resource which is not managed by Karmada.", "kind", object.GetObjectKind().GroupVersionKind().Kind, "namespace", object.GetNamespace(), "name", object.GetName())
return nil
}

annotations := object.GetAnnotations()
workNamespace, nsExist := annotations[workv1alpha2.WorkNamespaceAnnotation]
workName, nameExist := annotations[workv1alpha2.WorkNameAnnotation]
if !nsExist || !nameExist {
klog.V(5).InfoS("Ignore resource missing Karmada work annotations.", "kind", object.GetObjectKind().GroupVersionKind().Kind, "namespace", object.GetNamespace(), "name", object.GetName())
return nil
}

return []reconcile.Request{{NamespacedName: client.ObjectKey{Namespace: workNamespace, Name: workName}}}
}

func (c *Controller) enqueueWorkload(object runtime.Object) {
if c.eventChannel == nil {
klog.V(5).InfoS("Ignore member cluster object event as event channel is not initialized")
return
}
clientObject, ok := object.(client.Object)
if !ok {
klog.ErrorS(nil, "Ignore event as object does not implement client.Object", "object", object)
return
}
c.eventChannel <- event.TypedGenericEvent[client.Object]{Object: clientObject}
Comment thread
zach593 marked this conversation as resolved.
Comment thread
zach593 marked this conversation as resolved.
}

// getEventHandler return callback function that knows how to handle events from the member cluster.
func (c *Controller) getEventHandler() cache.ResourceEventHandler {
c.eventHandlerOnce.Do(func() {
if c.eventHandler == nil {
c.eventHandler = fedinformer.NewHandlerOnEvents(nil, c.onUpdate, c.onDelete)
}
})
return c.eventHandler
}

func (c *Controller) onUpdate(old, cur any) {
oldObj, ok := old.(*unstructured.Unstructured)
if !ok {
klog.ErrorS(nil, "Failed to assert old object as Unstructured", "old", old)
return
}
// Skip resources not managed by Karmada.
if oldObj.GetLabels()[util.ManagedByKarmadaLabel] != util.ManagedByKarmadaLabelValue {
klog.V(5).InfoS("Skip resource not managed by Karmada",
"apiVersion", oldObj.GetAPIVersion(),
"kind", oldObj.GetKind(),
"namespace", oldObj.GetNamespace(),
"name", oldObj.GetName())
return
}
annotations := oldObj.GetAnnotations()
workNS, nsExist := annotations[workv1alpha2.WorkNamespaceAnnotation]
_, nameExist := annotations[workv1alpha2.WorkNameAnnotation]
if !nsExist || !nameExist {
klog.V(5).InfoS("Skip resource missing Karmada work annotations",
"apiVersion", oldObj.GetAPIVersion(),
"kind", oldObj.GetKind(),
"namespace", oldObj.GetNamespace(),
"name", oldObj.GetName())
return
}
Comment thread
zach593 marked this conversation as resolved.

curObj, ok := cur.(*unstructured.Unstructured)
if !ok {
klog.ErrorS(nil, "Failed to assert cur object as Unstructured", "cur", cur)
return
}

clusterName, err := names.GetClusterName(workNS)
if err != nil {
klog.ErrorS(err, "Failed to get cluster name from work namespace",
"workNamespace", workNS,
"apiVersion", curObj.GetAPIVersion(),
"kind", curObj.GetKind(),
"namespace", curObj.GetNamespace(),
"name", curObj.GetName())
return
}
versionRecord, ok := c.ObjectWatcher.GetVersionRecord(clusterName, curObj)
if !ok {
// Wait for the initial Reconcile to establish a version record before processing events.
// The version record is set after the first successful sync.
return
}
// Skip if the resource version matches the recorded one, this event is a result of our own sync.
// However, this assessment is not always accurate,
// as the update event may reach the Informer faster than the act of recording the version.
if curObj.GetResourceVersion() == versionRecord {
return
}

oldCopy := oldObj.DeepCopy()
// Remove irrelevant fields (e.g., managedFields, status) before comparing content.
// TODO: Consider a dedicated RemoveIrrelevantFields variant that preserves
// relevant changes like ownerReferences.
err = prune.RemoveIrrelevantFields(oldCopy, prune.RemoveJobTTLSeconds)
if err != nil {
klog.ErrorS(err, "Failed to remove irrelevant fields from old resource",
"apiVersion", curObj.GetAPIVersion(),
"kind", oldCopy.GetKind(),
"namespace", oldCopy.GetNamespace(),
"name", oldCopy.GetName(),
"clusterName", clusterName)
return
}
curCopy := curObj.DeepCopy()
err = prune.RemoveIrrelevantFields(curCopy, prune.RemoveJobTTLSeconds)
if err != nil {
klog.ErrorS(err, "Failed to remove irrelevant fields from current resource",
"apiVersion", curObj.GetAPIVersion(),
"kind", curCopy.GetKind(),
"namespace", curCopy.GetNamespace(),
"name", curCopy.GetName(),
"clusterName", clusterName)
return
}

// No meaningful content difference after pruning irrelevant fields.
if reflect.DeepEqual(oldCopy, curCopy) {
return
}

c.enqueueWorkload(curObj)
}

func (c *Controller) onDelete(old any) {
if deleted, ok := old.(cache.DeletedFinalStateUnknown); ok {
old = deleted.Obj
if old == nil {
return
}
}
oldObj, ok := old.(runtime.Object)
if !ok {
klog.ErrorS(nil, "Failed to assert old object as runtime.Object", "old", old)
return
}
c.enqueueWorkload(oldObj)
}
Loading