Skip to content

Commit 8e18f30

Browse files
authored
Merge pull request #5802 from CharlesQQ/resource-detector-optimization
feat(detector): remove type convert in detector controller by controller-runtime
2 parents 4eb2d27 + 6ac0f2c commit 8e18f30

File tree

9 files changed

+219
-322
lines changed

9 files changed

+219
-322
lines changed

cmd/controller-manager/app/controllermanager.go

+1
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
786786
DiscoveryClientSet: discoverClientSet,
787787
Client: mgr.GetClient(),
788788
InformerManager: controlPlaneInformerManager,
789+
ControllerRuntimeCache: mgr.GetCache(),
789790
RESTMapper: mgr.GetRESTMapper(),
790791
DynamicClient: dynamicClientSet,
791792
SkippedResourceConfig: skippedResourceConfig,

pkg/detector/detector.go

+72-135
Large diffs are not rendered by default.

pkg/detector/detector_test.go

+9-118
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,9 @@ import (
2929
"k8s.io/apimachinery/pkg/api/meta"
3030
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3131
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
32-
"k8s.io/apimachinery/pkg/labels"
3332
"k8s.io/apimachinery/pkg/runtime"
3433
"k8s.io/apimachinery/pkg/runtime/schema"
3534
dynamicfake "k8s.io/client-go/dynamic/fake"
36-
"k8s.io/client-go/tools/cache"
3735
"k8s.io/client-go/tools/record"
3836
"sigs.k8s.io/controller-runtime/pkg/client"
3937
"sigs.k8s.io/controller-runtime/pkg/client/fake"
@@ -634,7 +632,7 @@ func TestLookForMatchedPolicy(t *testing.T) {
634632
tests := []struct {
635633
name string
636634
object *unstructured.Unstructured
637-
policies []*policyv1alpha1.PropagationPolicy
635+
policies []client.Object
638636
expectedPolicy *policyv1alpha1.PropagationPolicy
639637
}{
640638
{
@@ -649,8 +647,8 @@ func TestLookForMatchedPolicy(t *testing.T) {
649647
},
650648
},
651649
},
652-
policies: []*policyv1alpha1.PropagationPolicy{
653-
{
650+
policies: []client.Object{
651+
&policyv1alpha1.PropagationPolicy{
654652
ObjectMeta: metav1.ObjectMeta{
655653
Name: "policy-1",
656654
Namespace: "default",
@@ -689,9 +687,7 @@ func TestLookForMatchedPolicy(t *testing.T) {
689687

690688
d := &ResourceDetector{
691689
DynamicClient: fakeClient,
692-
propagationPolicyLister: &mockPropagationPolicyLister{
693-
policies: tt.policies,
694-
},
690+
Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(tt.policies...).Build(),
695691
}
696692

697693
objectKey := keys.ClusterWideKey{
@@ -725,7 +721,7 @@ func TestLookForMatchedClusterPolicy(t *testing.T) {
725721
tests := []struct {
726722
name string
727723
object *unstructured.Unstructured
728-
policies []*policyv1alpha1.ClusterPropagationPolicy
724+
policies []client.Object
729725
expectedPolicy *policyv1alpha1.ClusterPropagationPolicy
730726
}{
731727
{
@@ -740,8 +736,8 @@ func TestLookForMatchedClusterPolicy(t *testing.T) {
740736
},
741737
},
742738
},
743-
policies: []*policyv1alpha1.ClusterPropagationPolicy{
744-
{
739+
policies: []client.Object{
740+
&policyv1alpha1.ClusterPropagationPolicy{
745741
ObjectMeta: metav1.ObjectMeta{
746742
Name: "cluster-policy-1",
747743
},
@@ -778,9 +774,7 @@ func TestLookForMatchedClusterPolicy(t *testing.T) {
778774

779775
d := &ResourceDetector{
780776
DynamicClient: fakeClient,
781-
clusterPropagationPolicyLister: &mockClusterPropagationPolicyLister{
782-
policies: tt.policies,
783-
},
777+
Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(tt.policies...).Build(),
784778
}
785779

786780
objectKey := keys.ClusterWideKey{
@@ -980,6 +974,7 @@ func setupTestScheme() *runtime.Scheme {
980974
scheme := runtime.NewScheme()
981975
_ = workv1alpha2.Install(scheme)
982976
_ = corev1.AddToScheme(scheme)
977+
_ = policyv1alpha1.Install(scheme)
983978
return scheme
984979
}
985980

@@ -1046,110 +1041,6 @@ func (m *mockRESTMapper) ResourceSingularizer(resource string) (string, error) {
10461041
return resource, nil
10471042
}
10481043

1049-
// mockPropagationPolicyLister is a mock implementation of the PropagationPolicyLister
1050-
type mockPropagationPolicyLister struct {
1051-
policies []*policyv1alpha1.PropagationPolicy
1052-
}
1053-
1054-
func (m *mockPropagationPolicyLister) List(_ labels.Selector) (ret []runtime.Object, err error) {
1055-
var result []runtime.Object
1056-
for _, p := range m.policies {
1057-
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p)
1058-
if err != nil {
1059-
return nil, err
1060-
}
1061-
result = append(result, &unstructured.Unstructured{Object: u})
1062-
}
1063-
return result, nil
1064-
}
1065-
1066-
func (m *mockPropagationPolicyLister) Get(name string) (runtime.Object, error) {
1067-
for _, p := range m.policies {
1068-
if p.Name == name {
1069-
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p)
1070-
if err != nil {
1071-
return nil, err
1072-
}
1073-
return &unstructured.Unstructured{Object: u}, nil
1074-
}
1075-
}
1076-
return nil, nil
1077-
}
1078-
1079-
func (m *mockPropagationPolicyLister) ByNamespace(namespace string) cache.GenericNamespaceLister {
1080-
return &mockGenericNamespaceLister{
1081-
policies: m.policies,
1082-
namespace: namespace,
1083-
}
1084-
}
1085-
1086-
// mockGenericNamespaceLister is a mock implementation of cache.GenericNamespaceLister
1087-
type mockGenericNamespaceLister struct {
1088-
policies []*policyv1alpha1.PropagationPolicy
1089-
namespace string
1090-
}
1091-
1092-
func (m *mockGenericNamespaceLister) List(_ labels.Selector) (ret []runtime.Object, err error) {
1093-
var result []runtime.Object
1094-
for _, p := range m.policies {
1095-
if p.Namespace == m.namespace {
1096-
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p)
1097-
if err != nil {
1098-
return nil, err
1099-
}
1100-
result = append(result, &unstructured.Unstructured{Object: u})
1101-
}
1102-
}
1103-
return result, nil
1104-
}
1105-
1106-
func (m *mockGenericNamespaceLister) Get(name string) (runtime.Object, error) {
1107-
for _, p := range m.policies {
1108-
if p.Name == name && p.Namespace == m.namespace {
1109-
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p)
1110-
if err != nil {
1111-
return nil, err
1112-
}
1113-
return &unstructured.Unstructured{Object: u}, nil
1114-
}
1115-
}
1116-
return nil, nil
1117-
}
1118-
1119-
// mockClusterPropagationPolicyLister is a mock implementation of the ClusterPropagationPolicyLister
1120-
type mockClusterPropagationPolicyLister struct {
1121-
policies []*policyv1alpha1.ClusterPropagationPolicy
1122-
}
1123-
1124-
func (m *mockClusterPropagationPolicyLister) List(_ labels.Selector) (ret []runtime.Object, err error) {
1125-
var result []runtime.Object
1126-
for _, p := range m.policies {
1127-
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p)
1128-
if err != nil {
1129-
return nil, err
1130-
}
1131-
result = append(result, &unstructured.Unstructured{Object: u})
1132-
}
1133-
return result, nil
1134-
}
1135-
1136-
func (m *mockClusterPropagationPolicyLister) Get(name string) (runtime.Object, error) {
1137-
for _, p := range m.policies {
1138-
if p.Name == name {
1139-
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p)
1140-
if err != nil {
1141-
return nil, err
1142-
}
1143-
return &unstructured.Unstructured{Object: u}, nil
1144-
}
1145-
}
1146-
return nil, nil
1147-
}
1148-
1149-
func (m *mockClusterPropagationPolicyLister) ByNamespace(_ string) cache.GenericNamespaceLister {
1150-
return nil // ClusterPropagationPolicies are not namespaced
1151-
}
1152-
11531044
// mockResourceInterpreter is a mock implementation of the ResourceInterpreter interface
11541045
type mockResourceInterpreter struct{}
11551046

pkg/detector/handler.go

+5
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,8 @@ func ResourceItemKeyFunc(obj interface{}) (util.QueueKey, error) {
5959

6060
return key, nil
6161
}
62+
63+
// NamespacedKeyFunc generates a NamespacedKey for object.
64+
func NamespacedKeyFunc(obj interface{}) (util.QueueKey, error) {
65+
return keys.NamespacedKeyFunc(obj)
66+
}

pkg/detector/policy.go

+4-16
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ func (d *ResourceDetector) propagateResource(object *unstructured.Unstructured,
107107

108108
func (d *ResourceDetector) getAndApplyPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey,
109109
resourceChangeByKarmada bool, policyNamespace, policyName, claimedID string) error {
110-
policyObject, err := d.propagationPolicyLister.ByNamespace(policyNamespace).Get(policyName)
110+
matchedPropagationPolicy := &policyv1alpha1.PropagationPolicy{}
111+
err := d.Client.Get(context.TODO(), client.ObjectKey{Namespace: policyNamespace, Name: policyName}, matchedPropagationPolicy)
111112
if err != nil {
112113
if apierrors.IsNotFound(err) {
113114
klog.V(4).Infof("PropagationPolicy(%s/%s) has been removed.", policyNamespace, policyName)
@@ -116,13 +117,6 @@ func (d *ResourceDetector) getAndApplyPolicy(object *unstructured.Unstructured,
116117
klog.Errorf("Failed to get claimed policy(%s/%s),: %v", policyNamespace, policyName, err)
117118
return err
118119
}
119-
120-
matchedPropagationPolicy := &policyv1alpha1.PropagationPolicy{}
121-
if err = helper.ConvertToTypedObject(policyObject, matchedPropagationPolicy); err != nil {
122-
klog.Errorf("Failed to convert PropagationPolicy from unstructured object: %v", err)
123-
return err
124-
}
125-
126120
// Some resources are available in more than one group in the same kubernetes version.
127121
// Therefore, the following scenarios occurs:
128122
// In v1.21 kubernetes cluster, Ingress are available in both networking.k8s.io and extensions groups.
@@ -145,7 +139,8 @@ func (d *ResourceDetector) getAndApplyPolicy(object *unstructured.Unstructured,
145139

146140
func (d *ResourceDetector) getAndApplyClusterPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey,
147141
resourceChangeByKarmada bool, policyName, policyID string) error {
148-
policyObject, err := d.clusterPropagationPolicyLister.Get(policyName)
142+
matchedClusterPropagationPolicy := &policyv1alpha1.ClusterPropagationPolicy{}
143+
err := d.Client.Get(context.TODO(), client.ObjectKey{Name: policyName}, matchedClusterPropagationPolicy)
149144
if err != nil {
150145
if apierrors.IsNotFound(err) {
151146
klog.V(4).Infof("ClusterPropagationPolicy(%s) has been removed.", policyName)
@@ -155,13 +150,6 @@ func (d *ResourceDetector) getAndApplyClusterPolicy(object *unstructured.Unstruc
155150
klog.Errorf("Failed to get claimed policy(%s),: %v", policyName, err)
156151
return err
157152
}
158-
159-
matchedClusterPropagationPolicy := &policyv1alpha1.ClusterPropagationPolicy{}
160-
if err = helper.ConvertToTypedObject(policyObject, matchedClusterPropagationPolicy); err != nil {
161-
klog.Errorf("Failed to convert ClusterPropagationPolicy from unstructured object: %v", err)
162-
return err
163-
}
164-
165153
// Some resources are available in more than one group in the same kubernetes version.
166154
// Therefore, the following scenarios occurs:
167155
// In v1.21 kubernetes cluster, Ingress are available in both networking.k8s.io and extensions groups.

pkg/detector/preemption.go

+15-32
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ import (
2424
corev1 "k8s.io/api/core/v1"
2525
apierrors "k8s.io/apimachinery/pkg/api/errors"
2626
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
27-
"k8s.io/apimachinery/pkg/labels"
2827
"k8s.io/apimachinery/pkg/runtime"
2928
utilerrors "k8s.io/apimachinery/pkg/util/errors"
3029
"k8s.io/klog/v2"
30+
"sigs.k8s.io/controller-runtime/pkg/client"
3131

3232
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
3333
"github.com/karmada-io/karmada/pkg/events"
@@ -118,18 +118,13 @@ func (d *ResourceDetector) preemptPropagationPolicy(resourceTemplate *unstructur
118118
return nil
119119
}
120120

121-
claimedPolicyObj, err := d.propagationPolicyLister.ByNamespace(claimedPolicyNamespace).Get(claimedPolicyName)
121+
claimedPolicy := &policyv1alpha1.PropagationPolicy{}
122+
err = d.Client.Get(context.TODO(), client.ObjectKey{Namespace: claimedPolicyNamespace, Name: claimedPolicyName}, claimedPolicy)
122123
if err != nil {
123124
klog.Errorf("Failed to retrieve claimed propagation policy(%s/%s): %v.", claimedPolicyNamespace, claimedPolicyName, err)
124125
return err
125126
}
126127

127-
claimedPolicy := &policyv1alpha1.PropagationPolicy{}
128-
if err = helper.ConvertToTypedObject(claimedPolicyObj, claimedPolicy); err != nil {
129-
klog.Errorf("Failed to convert PropagationPolicy from unstructured object: %v.", err)
130-
return err
131-
}
132-
133128
if policy.ExplicitPriority() <= claimedPolicy.ExplicitPriority() {
134129
klog.V(2).Infof("Propagation policy(%s/%s) cannot preempt another propagation policy(%s/%s) due to insufficient priority.",
135130
policy.Namespace, policy.Name, claimedPolicyNamespace, claimedPolicyName)
@@ -196,18 +191,12 @@ func (d *ResourceDetector) preemptClusterPropagationPolicy(resourceTemplate *uns
196191
return nil
197192
}
198193

199-
claimedPolicyObj, err := d.clusterPropagationPolicyLister.Get(claimedPolicyName)
194+
claimedPolicy := &policyv1alpha1.ClusterPropagationPolicy{}
195+
err = d.Client.Get(context.TODO(), client.ObjectKey{Name: claimedPolicyName}, claimedPolicy)
200196
if err != nil {
201197
klog.Errorf("Failed to retrieve claimed cluster propagation policy(%s): %v.", claimedPolicyName, err)
202198
return err
203199
}
204-
205-
claimedPolicy := &policyv1alpha1.ClusterPropagationPolicy{}
206-
if err = helper.ConvertToTypedObject(claimedPolicyObj, claimedPolicy); err != nil {
207-
klog.Errorf("Failed to convert ClusterPropagationPolicy from unstructured object: %v.", err)
208-
return err
209-
}
210-
211200
if policy.ExplicitPriority() <= claimedPolicy.ExplicitPriority() {
212201
klog.V(2).Infof("Cluster propagation policy(%s) cannot preempt another cluster propagation policy(%s) due to insufficient priority.",
213202
policy.Name, claimedPolicyName)
@@ -263,25 +252,23 @@ func (d *ResourceDetector) fetchResourceTemplate(rs policyv1alpha1.ResourceSelec
263252
// and put the PropagationPolicy in the queue to trigger preemption.
264253
func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policyv1alpha1.PropagationPolicy, newPolicy policyv1alpha1.PropagationPolicy) {
265254
klog.Infof("PropagationPolicy(%s/%s) priority changed from %d to %d", newPolicy.GetNamespace(), newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority)
266-
policies, err := d.propagationPolicyLister.ByNamespace(newPolicy.GetNamespace()).List(labels.Everything())
255+
policyList := &policyv1alpha1.PropagationPolicyList{}
256+
err := d.Client.List(context.TODO(), policyList, &client.ListOptions{
257+
Namespace: newPolicy.GetNamespace(),
258+
})
267259
if err != nil {
268260
klog.Errorf("Failed to list PropagationPolicy from namespace: %s, error: %v", newPolicy.GetNamespace(), err)
269261
return
270262
}
271-
if len(policies) == 0 {
263+
if len(policyList.Items) == 0 {
272264
klog.Infof("No PropagationPolicy to preempt the PropagationPolicy(%s/%s).", newPolicy.GetNamespace(), newPolicy.GetName())
273265
}
274266

275267
// Use the priority queue to sort the listed policies to ensure the
276268
// higher priority PropagationPolicy be process first to avoid possible
277269
// multiple preemption.
278270
sortedPotentialKeys := pq.NewWith(priorityDescendingComparator)
279-
for i := range policies {
280-
var potentialPolicy policyv1alpha1.PropagationPolicy
281-
if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil {
282-
klog.Errorf("Failed to convert typed PropagationPolicy: %v", err)
283-
continue
284-
}
271+
for _, potentialPolicy := range policyList.Items {
285272
// Re-queue the polies that enables preemption and with the priority
286273
// in range (new priority, old priority).
287274
// For the polices with higher priority than old priority, it can
@@ -309,25 +296,21 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy
309296
func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy policyv1alpha1.ClusterPropagationPolicy, newPolicy policyv1alpha1.ClusterPropagationPolicy) {
310297
klog.Infof("ClusterPropagationPolicy(%s) priority changed from %d to %d",
311298
newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority)
312-
policies, err := d.clusterPropagationPolicyLister.List(labels.Everything())
299+
policyList := &policyv1alpha1.ClusterPropagationPolicyList{}
300+
err := d.Client.List(context.TODO(), policyList)
313301
if err != nil {
314302
klog.Errorf("Failed to list ClusterPropagationPolicy, error: %v", err)
315303
return
316304
}
317-
if len(policies) == 0 {
305+
if len(policyList.Items) == 0 {
318306
klog.Infof("No ClusterPropagationPolicy to preempt the ClusterPropagationPolicy(%s).", newPolicy.GetName())
319307
}
320308

321309
// Use the priority queue to sort the listed policies to ensure the
322310
// higher priority ClusterPropagationPolicy be process first to avoid possible
323311
// multiple preemption.
324312
sortedPotentialKeys := pq.NewWith(priorityDescendingComparator)
325-
for i := range policies {
326-
var potentialPolicy policyv1alpha1.ClusterPropagationPolicy
327-
if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil {
328-
klog.Errorf("Failed to convert typed ClusterPropagationPolicy: %v", err)
329-
continue
330-
}
313+
for _, potentialPolicy := range policyList.Items {
331314
// Re-queue the polies that enables preemption and with the priority
332315
// in range (new priority, old priority).
333316
// For the polices with higher priority than old priority, it can

0 commit comments

Comments
 (0)