From 51cebc1d44f5bf4adbfc7ade18b1e5aaba6e7391 Mon Sep 17 00:00:00 2001 From: zhihao jian Date: Wed, 2 Jul 2025 15:00:20 +0800 Subject: [PATCH 1/4] change workload controller to use patch instead of update Signed-off-by: zhihao jian fix test use MergeFrom func to patch data get latest rs fix unit test fix lint do not get latest status before patch --- pkg/controller/deployment/controller.go | 2 + .../deployment/deployment_controller.go | 3 + .../deployment/deployment_controller_test.go | 258 +++++++++++++----- pkg/controller/deployment/rolling_test.go | 94 +++++-- pkg/controller/deployment/sync.go | 43 ++- pkg/util/patch/patch_utils_test.go | 1 - 6 files changed, 300 insertions(+), 101 deletions(-) diff --git a/pkg/controller/deployment/controller.go b/pkg/controller/deployment/controller.go index 89515ea7..313f8732 100644 --- a/pkg/controller/deployment/controller.go +++ b/pkg/controller/deployment/controller.go @@ -107,6 +107,7 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { // Deployment controller factory factory := &controllerFactory{ client: genericClient.KubeClient, + runtimeClient: mgr.GetClient(), eventBroadcaster: eventBroadcaster, eventRecorder: recorder, dLister: dLister, @@ -268,6 +269,7 @@ func (f *controllerFactory) NewController(deployment *appsv1.Deployment) *Deploy return &DeploymentController{ client: f.client, + runtimeClient: f.runtimeClient, eventBroadcaster: f.eventBroadcaster, eventRecorder: f.eventRecorder, dLister: f.dLister, diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 5259c2d4..f56f7cda 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -39,6 +39,7 @@ import ( rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util" + "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -68,6 +69,8 @@ type DeploymentController struct { // we will use this strategy to replace spec.strategy of deployment strategy rolloutsv1alpha1.DeploymentStrategy + + runtimeClient client.Client } // getReplicaSetsForDeployment uses ControllerRefManager to reconcile diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 11d02a9b..53e84592 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -24,18 +24,138 @@ import ( "testing" apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" intstrutil "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" appslisters "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/record" "k8s.io/utils/pointer" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake" rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" "github.com/openkruise/rollouts/pkg/controller/deployment/util" ) +// mockReplicaSetLister implements appslisters.ReplicaSetLister for testing +type mockReplicaSetLister struct { + client ctrlclient.Client +} + +func (m *mockReplicaSetLister) List(selector labels.Selector) ([]*apps.ReplicaSet, error) { + var rsList apps.ReplicaSetList + if err := m.client.List(context.TODO(), &rsList); err != nil { + return nil, err + } + + var result []*apps.ReplicaSet + for i := range rsList.Items { + if selector.Matches(labels.Set(rsList.Items[i].Labels)) { + result = append(result, &rsList.Items[i]) + } + } + return result, nil +} + +func (m *mockReplicaSetLister) ReplicaSets(namespace string) appslisters.ReplicaSetNamespaceLister { + return &mockReplicaSetNamespaceLister{ + client: m.client, + namespace: namespace, + } +} + +func (m *mockReplicaSetLister) GetPodReplicaSets(pod *v1.Pod) ([]*apps.ReplicaSet, error) { + // For testing purposes, return empty list + return []*apps.ReplicaSet{}, nil +} + +// mockDeploymentLister implements appslisters.DeploymentLister for testing +type mockDeploymentLister struct { + client ctrlclient.Client +} + +func (m *mockDeploymentLister) List(selector labels.Selector) ([]*apps.Deployment, error) { + var deploymentList apps.DeploymentList + if err := m.client.List(context.TODO(), &deploymentList); err != nil { + return nil, err + } + + var result []*apps.Deployment + for i := range deploymentList.Items { + if selector.Matches(labels.Set(deploymentList.Items[i].Labels)) { + result = append(result, &deploymentList.Items[i]) + } + } + return result, nil +} + +func (m *mockDeploymentLister) Deployments(namespace string) appslisters.DeploymentNamespaceLister { + return &mockDeploymentNamespaceLister{ + client: m.client, + namespace: namespace, + } +} + +// mockDeploymentNamespaceLister implements appslisters.DeploymentNamespaceLister for testing +type mockDeploymentNamespaceLister struct { + client ctrlclient.Client + namespace string +} + +func (m *mockDeploymentNamespaceLister) List(selector labels.Selector) ([]*apps.Deployment, error) { + var deploymentList apps.DeploymentList + if err := m.client.List(context.TODO(), &deploymentList, ctrlclient.InNamespace(m.namespace)); err != nil { + return nil, err + } + + var result []*apps.Deployment + for i := range deploymentList.Items { + if selector.Matches(labels.Set(deploymentList.Items[i].Labels)) { + result = append(result, &deploymentList.Items[i]) + } + } + return result, nil +} + +func (m *mockDeploymentNamespaceLister) Get(name string) (*apps.Deployment, error) { + var deployment apps.Deployment + if err := m.client.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: m.namespace, Name: name}, &deployment); err != nil { + return nil, err + } + return &deployment, nil +} + +// mockReplicaSetNamespaceLister implements appslisters.ReplicaSetNamespaceLister for testing +type mockReplicaSetNamespaceLister struct { + client ctrlclient.Client + namespace string +} + +func (m *mockReplicaSetNamespaceLister) List(selector labels.Selector) ([]*apps.ReplicaSet, error) { + var rsList apps.ReplicaSetList + if err := m.client.List(context.TODO(), &rsList, ctrlclient.InNamespace(m.namespace)); err != nil { + return nil, err + } + + var result []*apps.ReplicaSet + for i := range rsList.Items { + if selector.Matches(labels.Set(rsList.Items[i].Labels)) { + result = append(result, &rsList.Items[i]) + } + } + return result, nil +} + +func (m *mockReplicaSetNamespaceLister) Get(name string) (*apps.ReplicaSet, error) { + var rs apps.ReplicaSet + if err := m.client.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: m.namespace, Name: name}, &rs); err != nil { + return nil, err + } + return &rs, nil +} + func TestSyncDeployment(t *testing.T) { tests := map[string]struct { oldRSsReplicas []int32 @@ -126,78 +246,83 @@ func TestSyncDeployment(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - fakeClient := fake.NewSimpleClientset() - fakeRecord := record.NewFakeRecorder(10) - informers := informers.NewSharedInformerFactory(fakeClient, 0) - rsInformer := informers.Apps().V1().ReplicaSets().Informer() - dInformer := informers.Apps().V1().Deployments().Informer() - - var deployment apps.Deployment - var newRS apps.ReplicaSet - { - deployment = generateDeployment("busybox") - deployment.Spec.Replicas = pointer.Int32(test.dReplicas) - deployment.Status.ReadyReplicas = test.newRSReplicas - availableReplicas := test.newRSAvailable - for _, available := range test.oldRSsAvailable { - availableReplicas += available - } - deployment.Status.UpdatedReplicas = test.newRSReplicas - deployment.Status.Replicas = availableReplicas - deployment.Status.AvailableReplicas = availableReplicas - dInformer.GetIndexer().Add(&deployment) - _, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } + deployment := generateDeployment("busybox") + deployment.Spec.Replicas = pointer.Int32(test.dReplicas) + deployment.Status.ReadyReplicas = test.newRSReplicas + availableReplicas := test.newRSAvailable + for _, available := range test.oldRSsAvailable { + availableReplicas += available } - { - for index, replicas := range test.oldRSsReplicas { - rs := generateRS(deployment) - rs.SetName(fmt.Sprintf("rs-%d", index)) - rs.Spec.Replicas = pointer.Int32(replicas) - rs.Status.Replicas = replicas - if strings.HasPrefix(name, "scale") { - rs.Annotations = map[string]string{ - util.ReplicasAnnotation: strconv.Itoa(-1), - util.MaxReplicasAnnotation: strconv.Itoa(int(test.dAvailable + test.maxSurge.IntVal)), - } - } - rs.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("old-version-%d", index) - rs.Status.ReadyReplicas = test.oldRSsAvailable[index] - rs.Status.AvailableReplicas = test.oldRSsAvailable[index] - rsInformer.GetIndexer().Add(&rs) - _, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), &rs, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } - } - } - { - newRS = generateRS(deployment) - newRS.SetName("rs-new") - newRS.Spec.Replicas = pointer.Int32(test.newRSReplicas) + deployment.Status.UpdatedReplicas = test.newRSReplicas + deployment.Status.Replicas = availableReplicas + deployment.Status.AvailableReplicas = availableReplicas + + var allObjects []ctrlclient.Object + allObjects = append(allObjects, &deployment) + + for index, replicas := range test.oldRSsReplicas { + rs := generateRS(deployment) + rs.SetName(fmt.Sprintf("rs-%d", index)) + rs.Spec.Replicas = pointer.Int32(replicas) + rs.Status.Replicas = replicas if strings.HasPrefix(name, "scale") { - newRS.Annotations = map[string]string{ + rs.Annotations = map[string]string{ util.ReplicasAnnotation: strconv.Itoa(-1), util.MaxReplicasAnnotation: strconv.Itoa(int(test.dAvailable + test.maxSurge.IntVal)), } } - newRS.Status.Replicas = test.newRSReplicas - newRS.Status.ReadyReplicas = test.newRSAvailable - newRS.Status.AvailableReplicas = test.newRSAvailable - rsInformer.GetIndexer().Add(&newRS) - _, err := fakeClient.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("got unexpected error: %v", err) + rs.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("old-version-%d", index) + rs.Status.ReadyReplicas = test.oldRSsAvailable[index] + rs.Status.AvailableReplicas = test.oldRSsAvailable[index] + allObjects = append(allObjects, &rs) + } + + newRS := generateRS(deployment) + newRS.SetName("rs-new") + newRS.Spec.Replicas = pointer.Int32(test.newRSReplicas) + if strings.HasPrefix(name, "scale") { + newRS.Annotations = map[string]string{ + util.ReplicasAnnotation: strconv.Itoa(-1), + util.MaxReplicasAnnotation: strconv.Itoa(int(test.dAvailable + test.maxSurge.IntVal)), + } + } + newRS.Status.Replicas = test.newRSReplicas + newRS.Status.ReadyReplicas = test.newRSAvailable + newRS.Status.AvailableReplicas = test.newRSAvailable + allObjects = append(allObjects, &newRS) + + fakeCtrlClient := ctrlfake.NewClientBuilder(). + WithObjects(allObjects...). + Build() + + // Create a mock event recorder + fakeRecord := record.NewFakeRecorder(10) + + // Create a mock deployment lister + mockDeploymentLister := &mockDeploymentLister{client: fakeCtrlClient} + + // Create a fake client with the same objects + fakeClient := fake.NewSimpleClientset() + _, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create deployment in fake client: %v", err) + } + + for _, obj := range allObjects { + if rs, ok := obj.(*apps.ReplicaSet); ok { + _, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), rs, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create replicaset in fake client: %v", err) + } } } dc := &DeploymentController{ client: fakeClient, eventRecorder: fakeRecord, - dLister: appslisters.NewDeploymentLister(dInformer.GetIndexer()), - rsLister: appslisters.NewReplicaSetLister(rsInformer.GetIndexer()), + dLister: mockDeploymentLister, + rsLister: &mockReplicaSetLister{client: fakeCtrlClient}, + runtimeClient: fakeCtrlClient, strategy: rolloutsv1alpha1.DeploymentStrategy{ RollingUpdate: &apps.RollingUpdateDeployment{ MaxSurge: &test.maxSurge, @@ -207,17 +332,18 @@ func TestSyncDeployment(t *testing.T) { }, } - err := dc.syncDeployment(context.TODO(), &deployment) + err = dc.syncDeployment(context.TODO(), &deployment) if err != nil { t.Fatalf("got unexpected error: %v", err) } - rss, err := dc.client.AppsV1().ReplicaSets(deployment.Namespace).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - t.Fatalf("got unexpected error: %v", err) + + var rsList apps.ReplicaSetList + if err := fakeCtrlClient.List(context.TODO(), &rsList); err != nil { + t.Fatalf("list rs error: %v", err) } resultOld := int32(0) resultNew := int32(0) - for _, rs := range rss.Items { + for _, rs := range rsList.Items { if rs.GetName() != "rs-new" { resultOld += *rs.Spec.Replicas } else { diff --git a/pkg/controller/deployment/rolling_test.go b/pkg/controller/deployment/rolling_test.go index b6f3c7fb..642844ea 100644 --- a/pkg/controller/deployment/rolling_test.go +++ b/pkg/controller/deployment/rolling_test.go @@ -33,6 +33,8 @@ import ( "k8s.io/utils/pointer" rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake" ) func newDControllerRef(d *apps.Deployment) *metav1.OwnerReference { @@ -255,16 +257,6 @@ func TestReconcileNewReplicaSet(t *testing.T) { t.Run(name, func(t *testing.T) { fakeClient := fake.NewSimpleClientset() fakeRecord := record.NewFakeRecorder(10) - dc := &DeploymentController{ - client: fakeClient, - eventRecorder: fakeRecord, - strategy: rolloutsv1alpha1.DeploymentStrategy{ - RollingUpdate: &apps.RollingUpdateDeployment{ - MaxSurge: &test.maxSurge, - }, - Partition: test.partition, - }, - } var deployment apps.Deployment var newRS apps.ReplicaSet @@ -299,16 +291,44 @@ func TestReconcileNewReplicaSet(t *testing.T) { t.Fatalf("got unexpected error: %v", err) } } + + // Create controller-runtime client with all objects + var allObjects []ctrlclient.Object + allObjects = append(allObjects, &deployment) + for _, rs := range allRSs { + allObjects = append(allObjects, rs) + } + + fakeCtrlClient := ctrlfake.NewClientBuilder(). + WithObjects(allObjects...). + Build() + + dc := &DeploymentController{ + client: fakeClient, + eventRecorder: fakeRecord, + dLister: &mockDeploymentLister{client: fakeCtrlClient}, + rsLister: &mockReplicaSetLister{client: fakeCtrlClient}, + runtimeClient: fakeCtrlClient, + strategy: rolloutsv1alpha1.DeploymentStrategy{ + RollingUpdate: &apps.RollingUpdateDeployment{ + MaxSurge: &test.maxSurge, + }, + Partition: test.partition, + }, + } + _, err := dc.reconcileNewReplicaSet(context.TODO(), allRSs, &newRS, &deployment) if err != nil { t.Fatalf("got unexpected error: %v", err) } - result, err := dc.client.AppsV1().ReplicaSets(deployment.Namespace).Get(context.TODO(), newRS.Name, metav1.GetOptions{}) - if err != nil { + + // Check result from runtimeClient instead of fakeClient + var resultRS apps.ReplicaSet + if err := fakeCtrlClient.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: newRS.Namespace, Name: newRS.Name}, &resultRS); err != nil { t.Fatalf("got unexpected error: %v", err) } - if *result.Spec.Replicas != test.expect { - t.Fatalf("expect %d, but got %d", test.expect, *result.Spec.Replicas) + if *resultRS.Spec.Replicas != test.expect { + t.Fatalf("expect %d, but got %d", test.expect, *resultRS.Spec.Replicas) } }) } @@ -381,17 +401,6 @@ func TestReconcileOldReplicaSet(t *testing.T) { t.Run(name, func(t *testing.T) { fakeClient := fake.NewSimpleClientset() fakeRecord := record.NewFakeRecorder(10) - dc := &DeploymentController{ - client: fakeClient, - eventRecorder: fakeRecord, - strategy: rolloutsv1alpha1.DeploymentStrategy{ - RollingUpdate: &apps.RollingUpdateDeployment{ - MaxSurge: &test.maxSurge, - MaxUnavailable: &test.maxUnavailable, - }, - Partition: test.partition, - }, - } var deployment apps.Deployment var newRS apps.ReplicaSet @@ -443,16 +452,45 @@ func TestReconcileOldReplicaSet(t *testing.T) { t.Fatalf("got unexpected error: %v", err) } } + + // Create controller-runtime client with all objects + var allObjects []ctrlclient.Object + allObjects = append(allObjects, &deployment) + for _, rs := range allRSs { + allObjects = append(allObjects, rs) + } + + fakeCtrlClient := ctrlfake.NewClientBuilder(). + WithObjects(allObjects...). + Build() + + dc := &DeploymentController{ + client: fakeClient, + eventRecorder: fakeRecord, + dLister: &mockDeploymentLister{client: fakeCtrlClient}, + rsLister: &mockReplicaSetLister{client: fakeCtrlClient}, + runtimeClient: fakeCtrlClient, + strategy: rolloutsv1alpha1.DeploymentStrategy{ + RollingUpdate: &apps.RollingUpdateDeployment{ + MaxSurge: &test.maxSurge, + MaxUnavailable: &test.maxUnavailable, + }, + Partition: test.partition, + }, + } + _, err := dc.reconcileOldReplicaSets(context.TODO(), allRSs, oldRSs, &newRS, &deployment) if err != nil { t.Fatalf("got unexpected error: %v", err) } - rss, err := dc.client.AppsV1().ReplicaSets(deployment.Namespace).List(context.TODO(), metav1.ListOptions{}) - if err != nil { + + // Check result from runtimeClient instead of fakeClient + var rsList apps.ReplicaSetList + if err := fakeCtrlClient.List(context.TODO(), &rsList); err != nil { t.Fatalf("got unexpected error: %v", err) } result := int32(0) - for _, rs := range rss.Items { + for _, rs := range rsList.Items { if rs.GetName() != "rs-new" { result += *rs.Spec.Replicas } diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index ee85337a..26eac086 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "k8s.io/utils/integer" + "sigs.k8s.io/controller-runtime/pkg/client" rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util" @@ -117,8 +118,20 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, &dc.strategy, newRevision, true, maxRevHistoryLengthInChars) minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds if annotationsUpdated || minReadySecondsNeedsUpdate { - rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds - return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{}) + // Use existing state directly for patching, let API Server handle conflicts + rsCopy = existingNewRS.DeepCopy() + deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, &dc.strategy, newRevision, true, maxRevHistoryLengthInChars) + if minReadySecondsNeedsUpdate { + rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds + } + + // Use MergeFrom for patching, if ResourceVersion conflicts, API Server will return 409 error + // Controller-runtime will automatically reschedule for reconciliation + err := dc.runtimeClient.Patch(ctx, rsCopy, client.MergeFrom(existingNewRS)) + if err != nil { + return nil, err + } + return rsCopy, nil } // Should use the revision in existingNewRS's annotation, since it set by before @@ -410,11 +423,29 @@ func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.Re var err error if sizeNeedsUpdate || annotationsNeedUpdate { oldScale := *(rs.Spec.Replicas) + + // Use existing state directly for patching, let API Server handle conflicts rsCopy := rs.DeepCopy() - *(rsCopy.Spec.Replicas) = newScale - deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(deployment, &dc.strategy)) - rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{}) - if err == nil && sizeNeedsUpdate { + + if sizeNeedsUpdate { + rsCopy.Spec.Replicas = &newScale + } + if annotationsNeedUpdate { + // Set the annotations that need to be updated + desiredReplicas := *(deployment.Spec.Replicas) + maxReplicas := *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(deployment, &dc.strategy) + deploymentutil.SetReplicasAnnotations(rsCopy, desiredReplicas, maxReplicas) + } + + // Use MergeFrom for patching, if ResourceVersion conflicts, API Server will return 409 error + // Controller-runtime will automatically reschedule for reconciliation + err = dc.runtimeClient.Patch(ctx, rsCopy, client.MergeFrom(rs)) + if err != nil { + return scaled, rs, err + } + + rs = rsCopy + if sizeNeedsUpdate { scaled = true dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d from %d", scalingOperation, rs.Name, newScale, oldScale) } diff --git a/pkg/util/patch/patch_utils_test.go b/pkg/util/patch/patch_utils_test.go index 6a9b8b36..fc39dfe7 100644 --- a/pkg/util/patch/patch_utils_test.go +++ b/pkg/util/patch/patch_utils_test.go @@ -41,5 +41,4 @@ func TestCommonPatch(t *testing.T) { if !reflect.DeepEqual(patchReq.String(), expectedPatchBody) { t.Fatalf("Not equal: \n%s \n%s", expectedPatchBody, patchReq.String()) } - } From 20bc6e9c74a1a26034834b1be6f04887768b1eb4 Mon Sep 17 00:00:00 2001 From: zhihao jian Date: Mon, 14 Jul 2025 19:37:01 +0800 Subject: [PATCH 2/4] use runtimeClient to operate deployment uniformly Signed-off-by: zhihao jian remove dupl SetNewReplicaSetAnnotations use UnsafeDisableDeepCopy to optimize performance use optimisticLock for patch fix patch extra status always failed fix unit test --- pkg/controller/deployment/controller.go | 21 --- .../deployment/deployment_controller.go | 58 +++--- .../deployment/deployment_controller_test.go | 172 +++--------------- pkg/controller/deployment/progress.go | 6 +- pkg/controller/deployment/rolling_test.go | 19 -- pkg/controller/deployment/sync.go | 57 +++--- 6 files changed, 93 insertions(+), 240 deletions(-) diff --git a/pkg/controller/deployment/controller.go b/pkg/controller/deployment/controller.go index 313f8732..313f6d0a 100644 --- a/pkg/controller/deployment/controller.go +++ b/pkg/controller/deployment/controller.go @@ -31,8 +31,6 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" - appslisters "k8s.io/client-go/listers/apps/v1" - toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -83,19 +81,6 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { - cacher := mgr.GetCache() - dInformer, err := cacher.GetInformerForKind(context.TODO(), appsv1.SchemeGroupVersion.WithKind("Deployment")) - if err != nil { - return nil, err - } - rsInformer, err := cacher.GetInformerForKind(context.TODO(), appsv1.SchemeGroupVersion.WithKind("ReplicaSet")) - if err != nil { - return nil, err - } - - // Lister - dLister := appslisters.NewDeploymentLister(dInformer.(toolscache.SharedIndexInformer).GetIndexer()) - rsLister := appslisters.NewReplicaSetLister(rsInformer.(toolscache.SharedIndexInformer).GetIndexer()) // Client & Recorder genericClient := clientutil.GetGenericClientWithName("advanced-deployment-controller") @@ -106,12 +91,9 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { // Deployment controller factory factory := &controllerFactory{ - client: genericClient.KubeClient, runtimeClient: mgr.GetClient(), eventBroadcaster: eventBroadcaster, eventRecorder: recorder, - dLister: dLister, - rsLister: rsLister, } return &ReconcileDeployment{Client: mgr.GetClient(), controllerFactory: factory}, nil } @@ -268,12 +250,9 @@ func (f *controllerFactory) NewController(deployment *appsv1.Deployment) *Deploy klog.V(4).Infof("Processing deployment %v strategy %v", klog.KObj(deployment), string(marshaled)) return &DeploymentController{ - client: f.client, runtimeClient: f.runtimeClient, eventBroadcaster: f.eventBroadcaster, eventRecorder: f.eventRecorder, - dLister: f.dLister, - rsLister: f.rsLister, strategy: strategy, } } diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index f56f7cda..584e3fc6 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -25,15 +25,12 @@ import ( "encoding/json" "fmt" "reflect" - "strings" "time" + utilclient "github.com/openkruise/rollouts/pkg/util/client" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - clientset "k8s.io/client-go/kubernetes" - appslisters "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -57,16 +54,9 @@ var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment") // DeploymentController is responsible for synchronizing Deployment objects stored // in the system with actual running replica sets and pods. type DeploymentController struct { - client clientset.Interface - eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder - // dLister can list/get deployments from the shared informer's store - dLister appslisters.DeploymentLister - // rsLister can list/get replica sets from the shared informer's store - rsLister appslisters.ReplicaSetLister - // we will use this strategy to replace spec.strategy of deployment strategy rolloutsv1alpha1.DeploymentStrategy @@ -81,15 +71,18 @@ func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context, if err != nil { return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err) } - // List all ReplicaSets to find those we own but that no longer match our - // selector. They will be orphaned by ClaimReplicaSets(). - allRSs, err := dc.rsLister.ReplicaSets(d.Namespace).List(deploymentSelector) + + // List all ReplicaSets using runtimeClient + rsList := &apps.ReplicaSetList{} + err = dc.runtimeClient.List(ctx, rsList, client.InNamespace(d.Namespace), client.MatchingLabelsSelector{Selector: deploymentSelector}, utilclient.DisableDeepCopy) if err != nil { return nil, fmt.Errorf("list %s/%s rs failed:%v", d.Namespace, d.Name, err) } + // select rs owner by current deployment ownedRSs := make([]*apps.ReplicaSet, 0) - for _, rs := range allRSs { + for i := range rsList.Items { + rs := &rsList.Items[i] if !rs.DeletionTimestamp.IsZero() { continue } @@ -119,7 +112,10 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment * dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.") if d.Status.ObservedGeneration < d.Generation { d.Status.ObservedGeneration = d.Generation - dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) + err := dc.runtimeClient.Status().Update(ctx, d) + if err != nil { + klog.Errorf("Failed to update deployment status: %v", err) + } } return nil } @@ -153,38 +149,46 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment * // patchExtraStatus will update extra status for advancedStatus func (dc *DeploymentController) patchExtraStatus(deployment *apps.Deployment) error { - rsList, err := dc.getReplicaSetsForDeployment(context.TODO(), deployment) + latestDeployment := &apps.Deployment{} + err := dc.runtimeClient.Get(context.TODO(), client.ObjectKeyFromObject(deployment), latestDeployment) + if err != nil { + klog.Errorf("Failed to get deployment: %v", err) + return err + } + rsList, err := dc.getReplicaSetsForDeployment(context.TODO(), latestDeployment) if err != nil { return err } updatedReadyReplicas := int32(0) - newRS := deploymentutil.FindNewReplicaSet(deployment, rsList) + newRS := deploymentutil.FindNewReplicaSet(latestDeployment, rsList) if newRS != nil { updatedReadyReplicas = newRS.Status.ReadyReplicas } extraStatus := &rolloutsv1alpha1.DeploymentExtraStatus{ UpdatedReadyReplicas: updatedReadyReplicas, - ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, deployment), + ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, latestDeployment), } extraStatusByte, err := json.Marshal(extraStatus) if err != nil { - klog.Errorf("Failed to marshal extra status for Deployment %v, err: %v", klog.KObj(deployment), err) + klog.Errorf("Failed to marshal extra status for Deployment %v, err: %v", klog.KObj(latestDeployment), err) return nil // no need to retry } extraStatusAnno := string(extraStatusByte) - if deployment.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] == extraStatusAnno { + if latestDeployment.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] == extraStatusAnno { return nil // no need to update } + deploymentCopy := latestDeployment.DeepCopy() + deploymentCopy.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] = extraStatusAnno - body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, - rolloutsv1alpha1.DeploymentExtraStatusAnnotation, - strings.Replace(extraStatusAnno, `"`, `\"`, -1)) - - _, err = dc.client.AppsV1().Deployments(deployment.Namespace). - Patch(context.TODO(), deployment.Name, types.MergePatchType, []byte(body), metav1.PatchOptions{}) + patch := client.MergeFromWithOptions(latestDeployment, client.MergeFromWithOptimisticLock{}) + err = dc.runtimeClient.Patch(context.TODO(), deploymentCopy, patch) + if err != nil { + klog.Errorf("Failed to patch deployment extra status: %v", err) + return err + } return err } diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 53e84592..20d48d04 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -22,14 +22,11 @@ import ( "strconv" "strings" "testing" + "time" apps "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/api/errors" intstrutil "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/kubernetes/fake" - appslisters "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/record" "k8s.io/utils/pointer" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -39,123 +36,6 @@ import ( "github.com/openkruise/rollouts/pkg/controller/deployment/util" ) -// mockReplicaSetLister implements appslisters.ReplicaSetLister for testing -type mockReplicaSetLister struct { - client ctrlclient.Client -} - -func (m *mockReplicaSetLister) List(selector labels.Selector) ([]*apps.ReplicaSet, error) { - var rsList apps.ReplicaSetList - if err := m.client.List(context.TODO(), &rsList); err != nil { - return nil, err - } - - var result []*apps.ReplicaSet - for i := range rsList.Items { - if selector.Matches(labels.Set(rsList.Items[i].Labels)) { - result = append(result, &rsList.Items[i]) - } - } - return result, nil -} - -func (m *mockReplicaSetLister) ReplicaSets(namespace string) appslisters.ReplicaSetNamespaceLister { - return &mockReplicaSetNamespaceLister{ - client: m.client, - namespace: namespace, - } -} - -func (m *mockReplicaSetLister) GetPodReplicaSets(pod *v1.Pod) ([]*apps.ReplicaSet, error) { - // For testing purposes, return empty list - return []*apps.ReplicaSet{}, nil -} - -// mockDeploymentLister implements appslisters.DeploymentLister for testing -type mockDeploymentLister struct { - client ctrlclient.Client -} - -func (m *mockDeploymentLister) List(selector labels.Selector) ([]*apps.Deployment, error) { - var deploymentList apps.DeploymentList - if err := m.client.List(context.TODO(), &deploymentList); err != nil { - return nil, err - } - - var result []*apps.Deployment - for i := range deploymentList.Items { - if selector.Matches(labels.Set(deploymentList.Items[i].Labels)) { - result = append(result, &deploymentList.Items[i]) - } - } - return result, nil -} - -func (m *mockDeploymentLister) Deployments(namespace string) appslisters.DeploymentNamespaceLister { - return &mockDeploymentNamespaceLister{ - client: m.client, - namespace: namespace, - } -} - -// mockDeploymentNamespaceLister implements appslisters.DeploymentNamespaceLister for testing -type mockDeploymentNamespaceLister struct { - client ctrlclient.Client - namespace string -} - -func (m *mockDeploymentNamespaceLister) List(selector labels.Selector) ([]*apps.Deployment, error) { - var deploymentList apps.DeploymentList - if err := m.client.List(context.TODO(), &deploymentList, ctrlclient.InNamespace(m.namespace)); err != nil { - return nil, err - } - - var result []*apps.Deployment - for i := range deploymentList.Items { - if selector.Matches(labels.Set(deploymentList.Items[i].Labels)) { - result = append(result, &deploymentList.Items[i]) - } - } - return result, nil -} - -func (m *mockDeploymentNamespaceLister) Get(name string) (*apps.Deployment, error) { - var deployment apps.Deployment - if err := m.client.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: m.namespace, Name: name}, &deployment); err != nil { - return nil, err - } - return &deployment, nil -} - -// mockReplicaSetNamespaceLister implements appslisters.ReplicaSetNamespaceLister for testing -type mockReplicaSetNamespaceLister struct { - client ctrlclient.Client - namespace string -} - -func (m *mockReplicaSetNamespaceLister) List(selector labels.Selector) ([]*apps.ReplicaSet, error) { - var rsList apps.ReplicaSetList - if err := m.client.List(context.TODO(), &rsList, ctrlclient.InNamespace(m.namespace)); err != nil { - return nil, err - } - - var result []*apps.ReplicaSet - for i := range rsList.Items { - if selector.Matches(labels.Set(rsList.Items[i].Labels)) { - result = append(result, &rsList.Items[i]) - } - } - return result, nil -} - -func (m *mockReplicaSetNamespaceLister) Get(name string) (*apps.ReplicaSet, error) { - var rs apps.ReplicaSet - if err := m.client.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: m.namespace, Name: name}, &rs); err != nil { - return nil, err - } - return &rs, nil -} - func TestSyncDeployment(t *testing.T) { tests := map[string]struct { oldRSsReplicas []int32 @@ -298,30 +178,8 @@ func TestSyncDeployment(t *testing.T) { // Create a mock event recorder fakeRecord := record.NewFakeRecorder(10) - // Create a mock deployment lister - mockDeploymentLister := &mockDeploymentLister{client: fakeCtrlClient} - - // Create a fake client with the same objects - fakeClient := fake.NewSimpleClientset() - _, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("failed to create deployment in fake client: %v", err) - } - - for _, obj := range allObjects { - if rs, ok := obj.(*apps.ReplicaSet); ok { - _, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), rs, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("failed to create replicaset in fake client: %v", err) - } - } - } - dc := &DeploymentController{ - client: fakeClient, eventRecorder: fakeRecord, - dLister: mockDeploymentLister, - rsLister: &mockReplicaSetLister{client: fakeCtrlClient}, runtimeClient: fakeCtrlClient, strategy: rolloutsv1alpha1.DeploymentStrategy{ RollingUpdate: &apps.RollingUpdateDeployment{ @@ -332,9 +190,31 @@ func TestSyncDeployment(t *testing.T) { }, } - err = dc.syncDeployment(context.TODO(), &deployment) + // Retry syncDeployment to handle potential resource conflicts gracefully + // This simulates the behavior of controller-runtime's reconcile loop + var err error + maxRetries := 10 + for i := 0; i < maxRetries; i++ { + err = dc.syncDeployment(context.TODO(), &deployment) + if err == nil { + break + } + + // Check if it's a conflict error (409) + if errors.IsConflict(err) { + if i < maxRetries-1 { + // Wait a bit before retrying, simulating the reconcile delay + time.Sleep(1 * time.Second) + continue + } + } + + // For non-conflict errors or after max retries, break + break + } + if err != nil { - t.Fatalf("got unexpected error: %v", err) + t.Fatalf("got unexpected error after retries: %v", err) } var rsList apps.ReplicaSetList diff --git a/pkg/controller/deployment/progress.go b/pkg/controller/deployment/progress.go index d8dc2e56..73199e2e 100644 --- a/pkg/controller/deployment/progress.go +++ b/pkg/controller/deployment/progress.go @@ -25,7 +25,6 @@ import ( apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "github.com/openkruise/rollouts/pkg/controller/deployment/util" @@ -115,7 +114,10 @@ func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs [] newDeployment := d newDeployment.Status = newStatus - _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{}) + err := dc.runtimeClient.Status().Update(ctx, newDeployment) + if err != nil { + klog.Errorf("Failed to update deployment status in progress: %v", err) + } return err } diff --git a/pkg/controller/deployment/rolling_test.go b/pkg/controller/deployment/rolling_test.go index 642844ea..04f1f7a7 100644 --- a/pkg/controller/deployment/rolling_test.go +++ b/pkg/controller/deployment/rolling_test.go @@ -304,10 +304,7 @@ func TestReconcileNewReplicaSet(t *testing.T) { Build() dc := &DeploymentController{ - client: fakeClient, eventRecorder: fakeRecord, - dLister: &mockDeploymentLister{client: fakeCtrlClient}, - rsLister: &mockReplicaSetLister{client: fakeCtrlClient}, runtimeClient: fakeCtrlClient, strategy: rolloutsv1alpha1.DeploymentStrategy{ RollingUpdate: &apps.RollingUpdateDeployment{ @@ -399,7 +396,6 @@ func TestReconcileOldReplicaSet(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - fakeClient := fake.NewSimpleClientset() fakeRecord := record.NewFakeRecorder(10) var deployment apps.Deployment @@ -417,10 +413,6 @@ func TestReconcileOldReplicaSet(t *testing.T) { deployment.Status.UpdatedReplicas = test.newRSReplicas deployment.Status.Replicas = availableReplicas deployment.Status.AvailableReplicas = availableReplicas - _, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } } { for index, replicas := range test.oldRSsReplicas { @@ -433,10 +425,6 @@ func TestReconcileOldReplicaSet(t *testing.T) { rs.Status.AvailableReplicas = test.oldRSsAvailable[index] allRSs = append(allRSs, &rs) oldRSs = append(oldRSs, &rs) - _, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), &rs, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } } } { @@ -447,10 +435,6 @@ func TestReconcileOldReplicaSet(t *testing.T) { newRS.Status.ReadyReplicas = test.newRSAvailable newRS.Status.AvailableReplicas = test.newRSAvailable allRSs = append(allRSs, &newRS) - _, err := fakeClient.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } } // Create controller-runtime client with all objects @@ -465,10 +449,7 @@ func TestReconcileOldReplicaSet(t *testing.T) { Build() dc := &DeploymentController{ - client: fakeClient, eventRecorder: fakeRecord, - dLister: &mockDeploymentLister{client: fakeCtrlClient}, - rsLister: &mockReplicaSetLister{client: fakeCtrlClient}, runtimeClient: fakeCtrlClient, strategy: rolloutsv1alpha1.DeploymentStrategy{ RollingUpdate: &apps.RollingUpdateDeployment{ diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 26eac086..8a0a63de 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -118,16 +118,15 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, &dc.strategy, newRevision, true, maxRevHistoryLengthInChars) minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds if annotationsUpdated || minReadySecondsNeedsUpdate { - // Use existing state directly for patching, let API Server handle conflicts - rsCopy = existingNewRS.DeepCopy() - deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, &dc.strategy, newRevision, true, maxRevHistoryLengthInChars) + // Update the copy with the new minReadySeconds if minReadySecondsNeedsUpdate { rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds } - // Use MergeFrom for patching, if ResourceVersion conflicts, API Server will return 409 error + // Use MergeFrom with optimistic lock for patching, if ResourceVersion conflicts, API Server will return 409 error // Controller-runtime will automatically reschedule for reconciliation - err := dc.runtimeClient.Patch(ctx, rsCopy, client.MergeFrom(existingNewRS)) + patch := client.MergeFromWithOptions(existingNewRS, client.MergeFromWithOptimisticLock{}) + err := dc.runtimeClient.Patch(ctx, rsCopy, patch) if err != nil { return nil, err } @@ -149,7 +148,7 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De if needsUpdate { var err error - if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}); err != nil { + if err = dc.runtimeClient.Status().Update(ctx, d); err != nil { return nil, err } } @@ -201,14 +200,19 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De // hash collisions. If there is any other error, we need to report it in the status of // the Deployment. alreadyExists := false - createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(ctx, &newRS, metav1.CreateOptions{}) + var createdRS *apps.ReplicaSet + err = dc.runtimeClient.Create(ctx, &newRS) + if err == nil { + createdRS = &newRS + } switch { // We may end up hitting this due to a slow cache or a fast resync of the Deployment. case errors.IsAlreadyExists(err): alreadyExists = true // Fetch a copy of the ReplicaSet. - rs, rsErr := dc.rsLister.ReplicaSets(newRS.Namespace).Get(newRS.Name) + rs := &apps.ReplicaSet{} + rsErr := dc.runtimeClient.Get(ctx, client.ObjectKey{Namespace: newRS.Namespace, Name: newRS.Name}, rs) if rsErr != nil { return nil, rsErr } @@ -233,9 +237,11 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De *d.Status.CollisionCount++ // Update the collisionCount for the Deployment and let it requeue by returning the original // error. - _, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) + dErr := dc.runtimeClient.Status().Update(ctx, d) if dErr == nil { klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount) + } else { + klog.Errorf("Failed to update deployment collision count: %v", dErr) } return nil, err case errors.HasStatusCause(err, v1.NamespaceTerminatingCause): @@ -249,7 +255,9 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De // We don't really care about this error at this point, since we have a bigger issue to report. // TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account // these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568 - _, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) + if updateErr := dc.runtimeClient.Status().Update(ctx, d); updateErr != nil { + klog.Errorf("Failed to update deployment status after RS creation failure: %v", updateErr) + } } dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg) return nil, err @@ -266,7 +274,10 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De needsUpdate = true } if needsUpdate { - _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) + if updateErr := dc.runtimeClient.Status().Update(ctx, d); updateErr != nil { + klog.Errorf("Failed to update deployment status: %v", updateErr) + err = updateErr + } } return createdRS, err } @@ -426,20 +437,13 @@ func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.Re // Use existing state directly for patching, let API Server handle conflicts rsCopy := rs.DeepCopy() + *(rsCopy.Spec.Replicas) = newScale + deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(deployment, &dc.strategy)) - if sizeNeedsUpdate { - rsCopy.Spec.Replicas = &newScale - } - if annotationsNeedUpdate { - // Set the annotations that need to be updated - desiredReplicas := *(deployment.Spec.Replicas) - maxReplicas := *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(deployment, &dc.strategy) - deploymentutil.SetReplicasAnnotations(rsCopy, desiredReplicas, maxReplicas) - } - - // Use MergeFrom for patching, if ResourceVersion conflicts, API Server will return 409 error + // Use MergeFrom with optimistic lock for patching, if ResourceVersion conflicts, API Server will return 409 error // Controller-runtime will automatically reschedule for reconciliation - err = dc.runtimeClient.Patch(ctx, rsCopy, client.MergeFrom(rs)) + patch := client.MergeFromWithOptions(rs, client.MergeFromWithOptimisticLock{}) + err = dc.runtimeClient.Patch(ctx, rsCopy, patch) if err != nil { return scaled, rs, err } @@ -482,7 +486,7 @@ func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs [] continue } klog.V(4).Infof("Trying to cleanup replica set %q for deployment %q", rs.Name, deployment.Name) - if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { + if err := dc.runtimeClient.Delete(ctx, rs); err != nil && !errors.IsNotFound(err) { // Return error instead of aggregating and continuing DELETEs on the theory // that we may be overloading the api server. return err @@ -502,7 +506,10 @@ func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs newDeployment := d newDeployment.Status = newStatus - _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{}) + err := dc.runtimeClient.Status().Update(ctx, newDeployment) + if err != nil { + klog.Errorf("Failed to sync deployment status: %v", err) + } return err } From 884be19fa3b3d8291158f2c63c2829392b7d145b Mon Sep 17 00:00:00 2001 From: zhihao jian Date: Mon, 14 Jul 2025 19:37:01 +0800 Subject: [PATCH 3/4] use runtimeClient to operate deployment uniformly Signed-off-by: zhihao jian remove dupl SetNewReplicaSetAnnotations use UnsafeDisableDeepCopy to optimize performance use optimisticLock for patch fix patch extra status always failed fix unit test add comment --- pkg/controller/deployment/controller.go | 21 --- .../deployment/deployment_controller.go | 63 ++++--- .../deployment/deployment_controller_test.go | 172 +++--------------- pkg/controller/deployment/progress.go | 6 +- pkg/controller/deployment/rolling_test.go | 19 -- pkg/controller/deployment/sync.go | 57 +++--- 6 files changed, 98 insertions(+), 240 deletions(-) diff --git a/pkg/controller/deployment/controller.go b/pkg/controller/deployment/controller.go index 313f8732..313f6d0a 100644 --- a/pkg/controller/deployment/controller.go +++ b/pkg/controller/deployment/controller.go @@ -31,8 +31,6 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" - appslisters "k8s.io/client-go/listers/apps/v1" - toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -83,19 +81,6 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { - cacher := mgr.GetCache() - dInformer, err := cacher.GetInformerForKind(context.TODO(), appsv1.SchemeGroupVersion.WithKind("Deployment")) - if err != nil { - return nil, err - } - rsInformer, err := cacher.GetInformerForKind(context.TODO(), appsv1.SchemeGroupVersion.WithKind("ReplicaSet")) - if err != nil { - return nil, err - } - - // Lister - dLister := appslisters.NewDeploymentLister(dInformer.(toolscache.SharedIndexInformer).GetIndexer()) - rsLister := appslisters.NewReplicaSetLister(rsInformer.(toolscache.SharedIndexInformer).GetIndexer()) // Client & Recorder genericClient := clientutil.GetGenericClientWithName("advanced-deployment-controller") @@ -106,12 +91,9 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { // Deployment controller factory factory := &controllerFactory{ - client: genericClient.KubeClient, runtimeClient: mgr.GetClient(), eventBroadcaster: eventBroadcaster, eventRecorder: recorder, - dLister: dLister, - rsLister: rsLister, } return &ReconcileDeployment{Client: mgr.GetClient(), controllerFactory: factory}, nil } @@ -268,12 +250,9 @@ func (f *controllerFactory) NewController(deployment *appsv1.Deployment) *Deploy klog.V(4).Infof("Processing deployment %v strategy %v", klog.KObj(deployment), string(marshaled)) return &DeploymentController{ - client: f.client, runtimeClient: f.runtimeClient, eventBroadcaster: f.eventBroadcaster, eventRecorder: f.eventRecorder, - dLister: f.dLister, - rsLister: f.rsLister, strategy: strategy, } } diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index f56f7cda..5f584da5 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -25,15 +25,12 @@ import ( "encoding/json" "fmt" "reflect" - "strings" "time" + utilclient "github.com/openkruise/rollouts/pkg/util/client" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - clientset "k8s.io/client-go/kubernetes" - appslisters "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -57,16 +54,9 @@ var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment") // DeploymentController is responsible for synchronizing Deployment objects stored // in the system with actual running replica sets and pods. type DeploymentController struct { - client clientset.Interface - eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder - // dLister can list/get deployments from the shared informer's store - dLister appslisters.DeploymentLister - // rsLister can list/get replica sets from the shared informer's store - rsLister appslisters.ReplicaSetLister - // we will use this strategy to replace spec.strategy of deployment strategy rolloutsv1alpha1.DeploymentStrategy @@ -81,15 +71,18 @@ func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context, if err != nil { return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err) } - // List all ReplicaSets to find those we own but that no longer match our - // selector. They will be orphaned by ClaimReplicaSets(). - allRSs, err := dc.rsLister.ReplicaSets(d.Namespace).List(deploymentSelector) + + // List all ReplicaSets using runtimeClient + rsList := &apps.ReplicaSetList{} + err = dc.runtimeClient.List(ctx, rsList, client.InNamespace(d.Namespace), client.MatchingLabelsSelector{Selector: deploymentSelector}, utilclient.DisableDeepCopy) if err != nil { return nil, fmt.Errorf("list %s/%s rs failed:%v", d.Namespace, d.Name, err) } + // select rs owner by current deployment ownedRSs := make([]*apps.ReplicaSet, 0) - for _, rs := range allRSs { + for i := range rsList.Items { + rs := &rsList.Items[i] if !rs.DeletionTimestamp.IsZero() { continue } @@ -119,7 +112,10 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment * dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.") if d.Status.ObservedGeneration < d.Generation { d.Status.ObservedGeneration = d.Generation - dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) + err := dc.runtimeClient.Status().Update(ctx, d) + if err != nil { + klog.Errorf("Failed to update deployment status: %v", err) + } } return nil } @@ -153,38 +149,51 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment * // patchExtraStatus will update extra status for advancedStatus func (dc *DeploymentController) patchExtraStatus(deployment *apps.Deployment) error { - rsList, err := dc.getReplicaSetsForDeployment(context.TODO(), deployment) + // It is necessary to fetch the latest Deployment here because previous steps in the reconcile loop + // may update the condition fields (such as lastTransitionTime and lastUpdateTime), causing the + // resourceVersion to change. This can lead to patch failures if we do not use the latest object. + // The deployment passed in here has an old resourceVersion, so we need to fetch the latest deployment + // to ensure patch success. + latestDeployment := &apps.Deployment{} + err := dc.runtimeClient.Get(context.TODO(), client.ObjectKeyFromObject(deployment), latestDeployment) + if err != nil { + klog.Errorf("Failed to get deployment: %v", err) + return err + } + rsList, err := dc.getReplicaSetsForDeployment(context.TODO(), latestDeployment) if err != nil { return err } updatedReadyReplicas := int32(0) - newRS := deploymentutil.FindNewReplicaSet(deployment, rsList) + newRS := deploymentutil.FindNewReplicaSet(latestDeployment, rsList) if newRS != nil { updatedReadyReplicas = newRS.Status.ReadyReplicas } extraStatus := &rolloutsv1alpha1.DeploymentExtraStatus{ UpdatedReadyReplicas: updatedReadyReplicas, - ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, deployment), + ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, latestDeployment), } extraStatusByte, err := json.Marshal(extraStatus) if err != nil { - klog.Errorf("Failed to marshal extra status for Deployment %v, err: %v", klog.KObj(deployment), err) + klog.Errorf("Failed to marshal extra status for Deployment %v, err: %v", klog.KObj(latestDeployment), err) return nil // no need to retry } extraStatusAnno := string(extraStatusByte) - if deployment.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] == extraStatusAnno { + if latestDeployment.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] == extraStatusAnno { return nil // no need to update } + deploymentCopy := latestDeployment.DeepCopy() + deploymentCopy.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] = extraStatusAnno - body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, - rolloutsv1alpha1.DeploymentExtraStatusAnnotation, - strings.Replace(extraStatusAnno, `"`, `\"`, -1)) - - _, err = dc.client.AppsV1().Deployments(deployment.Namespace). - Patch(context.TODO(), deployment.Name, types.MergePatchType, []byte(body), metav1.PatchOptions{}) + patch := client.MergeFromWithOptions(latestDeployment, client.MergeFromWithOptimisticLock{}) + err = dc.runtimeClient.Patch(context.TODO(), deploymentCopy, patch) + if err != nil { + klog.Errorf("Failed to patch deployment extra status: %v", err) + return err + } return err } diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 53e84592..20d48d04 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -22,14 +22,11 @@ import ( "strconv" "strings" "testing" + "time" apps "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/api/errors" intstrutil "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/kubernetes/fake" - appslisters "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/record" "k8s.io/utils/pointer" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -39,123 +36,6 @@ import ( "github.com/openkruise/rollouts/pkg/controller/deployment/util" ) -// mockReplicaSetLister implements appslisters.ReplicaSetLister for testing -type mockReplicaSetLister struct { - client ctrlclient.Client -} - -func (m *mockReplicaSetLister) List(selector labels.Selector) ([]*apps.ReplicaSet, error) { - var rsList apps.ReplicaSetList - if err := m.client.List(context.TODO(), &rsList); err != nil { - return nil, err - } - - var result []*apps.ReplicaSet - for i := range rsList.Items { - if selector.Matches(labels.Set(rsList.Items[i].Labels)) { - result = append(result, &rsList.Items[i]) - } - } - return result, nil -} - -func (m *mockReplicaSetLister) ReplicaSets(namespace string) appslisters.ReplicaSetNamespaceLister { - return &mockReplicaSetNamespaceLister{ - client: m.client, - namespace: namespace, - } -} - -func (m *mockReplicaSetLister) GetPodReplicaSets(pod *v1.Pod) ([]*apps.ReplicaSet, error) { - // For testing purposes, return empty list - return []*apps.ReplicaSet{}, nil -} - -// mockDeploymentLister implements appslisters.DeploymentLister for testing -type mockDeploymentLister struct { - client ctrlclient.Client -} - -func (m *mockDeploymentLister) List(selector labels.Selector) ([]*apps.Deployment, error) { - var deploymentList apps.DeploymentList - if err := m.client.List(context.TODO(), &deploymentList); err != nil { - return nil, err - } - - var result []*apps.Deployment - for i := range deploymentList.Items { - if selector.Matches(labels.Set(deploymentList.Items[i].Labels)) { - result = append(result, &deploymentList.Items[i]) - } - } - return result, nil -} - -func (m *mockDeploymentLister) Deployments(namespace string) appslisters.DeploymentNamespaceLister { - return &mockDeploymentNamespaceLister{ - client: m.client, - namespace: namespace, - } -} - -// mockDeploymentNamespaceLister implements appslisters.DeploymentNamespaceLister for testing -type mockDeploymentNamespaceLister struct { - client ctrlclient.Client - namespace string -} - -func (m *mockDeploymentNamespaceLister) List(selector labels.Selector) ([]*apps.Deployment, error) { - var deploymentList apps.DeploymentList - if err := m.client.List(context.TODO(), &deploymentList, ctrlclient.InNamespace(m.namespace)); err != nil { - return nil, err - } - - var result []*apps.Deployment - for i := range deploymentList.Items { - if selector.Matches(labels.Set(deploymentList.Items[i].Labels)) { - result = append(result, &deploymentList.Items[i]) - } - } - return result, nil -} - -func (m *mockDeploymentNamespaceLister) Get(name string) (*apps.Deployment, error) { - var deployment apps.Deployment - if err := m.client.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: m.namespace, Name: name}, &deployment); err != nil { - return nil, err - } - return &deployment, nil -} - -// mockReplicaSetNamespaceLister implements appslisters.ReplicaSetNamespaceLister for testing -type mockReplicaSetNamespaceLister struct { - client ctrlclient.Client - namespace string -} - -func (m *mockReplicaSetNamespaceLister) List(selector labels.Selector) ([]*apps.ReplicaSet, error) { - var rsList apps.ReplicaSetList - if err := m.client.List(context.TODO(), &rsList, ctrlclient.InNamespace(m.namespace)); err != nil { - return nil, err - } - - var result []*apps.ReplicaSet - for i := range rsList.Items { - if selector.Matches(labels.Set(rsList.Items[i].Labels)) { - result = append(result, &rsList.Items[i]) - } - } - return result, nil -} - -func (m *mockReplicaSetNamespaceLister) Get(name string) (*apps.ReplicaSet, error) { - var rs apps.ReplicaSet - if err := m.client.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: m.namespace, Name: name}, &rs); err != nil { - return nil, err - } - return &rs, nil -} - func TestSyncDeployment(t *testing.T) { tests := map[string]struct { oldRSsReplicas []int32 @@ -298,30 +178,8 @@ func TestSyncDeployment(t *testing.T) { // Create a mock event recorder fakeRecord := record.NewFakeRecorder(10) - // Create a mock deployment lister - mockDeploymentLister := &mockDeploymentLister{client: fakeCtrlClient} - - // Create a fake client with the same objects - fakeClient := fake.NewSimpleClientset() - _, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("failed to create deployment in fake client: %v", err) - } - - for _, obj := range allObjects { - if rs, ok := obj.(*apps.ReplicaSet); ok { - _, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), rs, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("failed to create replicaset in fake client: %v", err) - } - } - } - dc := &DeploymentController{ - client: fakeClient, eventRecorder: fakeRecord, - dLister: mockDeploymentLister, - rsLister: &mockReplicaSetLister{client: fakeCtrlClient}, runtimeClient: fakeCtrlClient, strategy: rolloutsv1alpha1.DeploymentStrategy{ RollingUpdate: &apps.RollingUpdateDeployment{ @@ -332,9 +190,31 @@ func TestSyncDeployment(t *testing.T) { }, } - err = dc.syncDeployment(context.TODO(), &deployment) + // Retry syncDeployment to handle potential resource conflicts gracefully + // This simulates the behavior of controller-runtime's reconcile loop + var err error + maxRetries := 10 + for i := 0; i < maxRetries; i++ { + err = dc.syncDeployment(context.TODO(), &deployment) + if err == nil { + break + } + + // Check if it's a conflict error (409) + if errors.IsConflict(err) { + if i < maxRetries-1 { + // Wait a bit before retrying, simulating the reconcile delay + time.Sleep(1 * time.Second) + continue + } + } + + // For non-conflict errors or after max retries, break + break + } + if err != nil { - t.Fatalf("got unexpected error: %v", err) + t.Fatalf("got unexpected error after retries: %v", err) } var rsList apps.ReplicaSetList diff --git a/pkg/controller/deployment/progress.go b/pkg/controller/deployment/progress.go index d8dc2e56..73199e2e 100644 --- a/pkg/controller/deployment/progress.go +++ b/pkg/controller/deployment/progress.go @@ -25,7 +25,6 @@ import ( apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "github.com/openkruise/rollouts/pkg/controller/deployment/util" @@ -115,7 +114,10 @@ func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs [] newDeployment := d newDeployment.Status = newStatus - _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{}) + err := dc.runtimeClient.Status().Update(ctx, newDeployment) + if err != nil { + klog.Errorf("Failed to update deployment status in progress: %v", err) + } return err } diff --git a/pkg/controller/deployment/rolling_test.go b/pkg/controller/deployment/rolling_test.go index 642844ea..04f1f7a7 100644 --- a/pkg/controller/deployment/rolling_test.go +++ b/pkg/controller/deployment/rolling_test.go @@ -304,10 +304,7 @@ func TestReconcileNewReplicaSet(t *testing.T) { Build() dc := &DeploymentController{ - client: fakeClient, eventRecorder: fakeRecord, - dLister: &mockDeploymentLister{client: fakeCtrlClient}, - rsLister: &mockReplicaSetLister{client: fakeCtrlClient}, runtimeClient: fakeCtrlClient, strategy: rolloutsv1alpha1.DeploymentStrategy{ RollingUpdate: &apps.RollingUpdateDeployment{ @@ -399,7 +396,6 @@ func TestReconcileOldReplicaSet(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - fakeClient := fake.NewSimpleClientset() fakeRecord := record.NewFakeRecorder(10) var deployment apps.Deployment @@ -417,10 +413,6 @@ func TestReconcileOldReplicaSet(t *testing.T) { deployment.Status.UpdatedReplicas = test.newRSReplicas deployment.Status.Replicas = availableReplicas deployment.Status.AvailableReplicas = availableReplicas - _, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } } { for index, replicas := range test.oldRSsReplicas { @@ -433,10 +425,6 @@ func TestReconcileOldReplicaSet(t *testing.T) { rs.Status.AvailableReplicas = test.oldRSsAvailable[index] allRSs = append(allRSs, &rs) oldRSs = append(oldRSs, &rs) - _, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), &rs, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } } } { @@ -447,10 +435,6 @@ func TestReconcileOldReplicaSet(t *testing.T) { newRS.Status.ReadyReplicas = test.newRSAvailable newRS.Status.AvailableReplicas = test.newRSAvailable allRSs = append(allRSs, &newRS) - _, err := fakeClient.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("got unexpected error: %v", err) - } } // Create controller-runtime client with all objects @@ -465,10 +449,7 @@ func TestReconcileOldReplicaSet(t *testing.T) { Build() dc := &DeploymentController{ - client: fakeClient, eventRecorder: fakeRecord, - dLister: &mockDeploymentLister{client: fakeCtrlClient}, - rsLister: &mockReplicaSetLister{client: fakeCtrlClient}, runtimeClient: fakeCtrlClient, strategy: rolloutsv1alpha1.DeploymentStrategy{ RollingUpdate: &apps.RollingUpdateDeployment{ diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 26eac086..8a0a63de 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -118,16 +118,15 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, &dc.strategy, newRevision, true, maxRevHistoryLengthInChars) minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds if annotationsUpdated || minReadySecondsNeedsUpdate { - // Use existing state directly for patching, let API Server handle conflicts - rsCopy = existingNewRS.DeepCopy() - deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, &dc.strategy, newRevision, true, maxRevHistoryLengthInChars) + // Update the copy with the new minReadySeconds if minReadySecondsNeedsUpdate { rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds } - // Use MergeFrom for patching, if ResourceVersion conflicts, API Server will return 409 error + // Use MergeFrom with optimistic lock for patching, if ResourceVersion conflicts, API Server will return 409 error // Controller-runtime will automatically reschedule for reconciliation - err := dc.runtimeClient.Patch(ctx, rsCopy, client.MergeFrom(existingNewRS)) + patch := client.MergeFromWithOptions(existingNewRS, client.MergeFromWithOptimisticLock{}) + err := dc.runtimeClient.Patch(ctx, rsCopy, patch) if err != nil { return nil, err } @@ -149,7 +148,7 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De if needsUpdate { var err error - if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}); err != nil { + if err = dc.runtimeClient.Status().Update(ctx, d); err != nil { return nil, err } } @@ -201,14 +200,19 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De // hash collisions. If there is any other error, we need to report it in the status of // the Deployment. alreadyExists := false - createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(ctx, &newRS, metav1.CreateOptions{}) + var createdRS *apps.ReplicaSet + err = dc.runtimeClient.Create(ctx, &newRS) + if err == nil { + createdRS = &newRS + } switch { // We may end up hitting this due to a slow cache or a fast resync of the Deployment. case errors.IsAlreadyExists(err): alreadyExists = true // Fetch a copy of the ReplicaSet. - rs, rsErr := dc.rsLister.ReplicaSets(newRS.Namespace).Get(newRS.Name) + rs := &apps.ReplicaSet{} + rsErr := dc.runtimeClient.Get(ctx, client.ObjectKey{Namespace: newRS.Namespace, Name: newRS.Name}, rs) if rsErr != nil { return nil, rsErr } @@ -233,9 +237,11 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De *d.Status.CollisionCount++ // Update the collisionCount for the Deployment and let it requeue by returning the original // error. - _, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) + dErr := dc.runtimeClient.Status().Update(ctx, d) if dErr == nil { klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount) + } else { + klog.Errorf("Failed to update deployment collision count: %v", dErr) } return nil, err case errors.HasStatusCause(err, v1.NamespaceTerminatingCause): @@ -249,7 +255,9 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De // We don't really care about this error at this point, since we have a bigger issue to report. // TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account // these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568 - _, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) + if updateErr := dc.runtimeClient.Status().Update(ctx, d); updateErr != nil { + klog.Errorf("Failed to update deployment status after RS creation failure: %v", updateErr) + } } dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg) return nil, err @@ -266,7 +274,10 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De needsUpdate = true } if needsUpdate { - _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) + if updateErr := dc.runtimeClient.Status().Update(ctx, d); updateErr != nil { + klog.Errorf("Failed to update deployment status: %v", updateErr) + err = updateErr + } } return createdRS, err } @@ -426,20 +437,13 @@ func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.Re // Use existing state directly for patching, let API Server handle conflicts rsCopy := rs.DeepCopy() + *(rsCopy.Spec.Replicas) = newScale + deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(deployment, &dc.strategy)) - if sizeNeedsUpdate { - rsCopy.Spec.Replicas = &newScale - } - if annotationsNeedUpdate { - // Set the annotations that need to be updated - desiredReplicas := *(deployment.Spec.Replicas) - maxReplicas := *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(deployment, &dc.strategy) - deploymentutil.SetReplicasAnnotations(rsCopy, desiredReplicas, maxReplicas) - } - - // Use MergeFrom for patching, if ResourceVersion conflicts, API Server will return 409 error + // Use MergeFrom with optimistic lock for patching, if ResourceVersion conflicts, API Server will return 409 error // Controller-runtime will automatically reschedule for reconciliation - err = dc.runtimeClient.Patch(ctx, rsCopy, client.MergeFrom(rs)) + patch := client.MergeFromWithOptions(rs, client.MergeFromWithOptimisticLock{}) + err = dc.runtimeClient.Patch(ctx, rsCopy, patch) if err != nil { return scaled, rs, err } @@ -482,7 +486,7 @@ func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs [] continue } klog.V(4).Infof("Trying to cleanup replica set %q for deployment %q", rs.Name, deployment.Name) - if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { + if err := dc.runtimeClient.Delete(ctx, rs); err != nil && !errors.IsNotFound(err) { // Return error instead of aggregating and continuing DELETEs on the theory // that we may be overloading the api server. return err @@ -502,7 +506,10 @@ func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs newDeployment := d newDeployment.Status = newStatus - _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{}) + err := dc.runtimeClient.Status().Update(ctx, newDeployment) + if err != nil { + klog.Errorf("Failed to sync deployment status: %v", err) + } return err } From 0c2faf967301bc196d03736030c2618ae7a7bcc3 Mon Sep 17 00:00:00 2001 From: zhihao jian Date: Fri, 18 Jul 2025 17:02:17 +0800 Subject: [PATCH 4/4] combine patchExtraStatus and syncDeploymentStatus Signed-off-by: zhihao jian --- pkg/controller/deployment/controller.go | 4 - .../deployment/deployment_controller.go | 48 ------------ pkg/controller/deployment/progress.go | 22 +++--- pkg/controller/deployment/sync.go | 73 +++++++++++++++++-- 4 files changed, 78 insertions(+), 69 deletions(-) diff --git a/pkg/controller/deployment/controller.go b/pkg/controller/deployment/controller.go index 313f6d0a..48ec2509 100644 --- a/pkg/controller/deployment/controller.go +++ b/pkg/controller/deployment/controller.go @@ -186,10 +186,6 @@ func (r *ReconcileDeployment) Reconcile(_ context.Context, request reconcile.Req if err != nil { errList = append(errList, field.InternalError(field.NewPath("syncDeployment"), err)) } - err = dc.patchExtraStatus(deployment) - if err != nil { - errList = append(errList, field.InternalError(field.NewPath("patchExtraStatus"), err)) - } if len(errList) > 0 { return ctrl.Result{}, errList.ToAggregate() } diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 584e3fc6..4569d51a 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -22,7 +22,6 @@ package deployment import ( "context" - "encoding/json" "fmt" "reflect" "time" @@ -35,7 +34,6 @@ import ( "k8s.io/klog/v2" rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1" - deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -146,49 +144,3 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment * return dc.rolloutRolling(ctx, d, rsList) } - -// patchExtraStatus will update extra status for advancedStatus -func (dc *DeploymentController) patchExtraStatus(deployment *apps.Deployment) error { - latestDeployment := &apps.Deployment{} - err := dc.runtimeClient.Get(context.TODO(), client.ObjectKeyFromObject(deployment), latestDeployment) - if err != nil { - klog.Errorf("Failed to get deployment: %v", err) - return err - } - rsList, err := dc.getReplicaSetsForDeployment(context.TODO(), latestDeployment) - if err != nil { - return err - } - - updatedReadyReplicas := int32(0) - newRS := deploymentutil.FindNewReplicaSet(latestDeployment, rsList) - if newRS != nil { - updatedReadyReplicas = newRS.Status.ReadyReplicas - } - - extraStatus := &rolloutsv1alpha1.DeploymentExtraStatus{ - UpdatedReadyReplicas: updatedReadyReplicas, - ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, latestDeployment), - } - - extraStatusByte, err := json.Marshal(extraStatus) - if err != nil { - klog.Errorf("Failed to marshal extra status for Deployment %v, err: %v", klog.KObj(latestDeployment), err) - return nil // no need to retry - } - - extraStatusAnno := string(extraStatusByte) - if latestDeployment.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] == extraStatusAnno { - return nil // no need to update - } - deploymentCopy := latestDeployment.DeepCopy() - deploymentCopy.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] = extraStatusAnno - - patch := client.MergeFromWithOptions(latestDeployment, client.MergeFromWithOptimisticLock{}) - err = dc.runtimeClient.Patch(context.TODO(), deploymentCopy, patch) - if err != nil { - klog.Errorf("Failed to patch deployment extra status: %v", err) - return err - } - return err -} diff --git a/pkg/controller/deployment/progress.go b/pkg/controller/deployment/progress.go index 73199e2e..09d78114 100644 --- a/pkg/controller/deployment/progress.go +++ b/pkg/controller/deployment/progress.go @@ -20,7 +20,6 @@ package deployment import ( "context" "fmt" - "reflect" "time" apps "k8s.io/api/apps/v1" @@ -105,20 +104,21 @@ func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs [] util.RemoveDeploymentCondition(&newStatus, apps.DeploymentReplicaFailure) } - // Do not update if there is nothing new to add. - if reflect.DeepEqual(d.Status, newStatus) { - // Requeue the deployment if required. - dc.requeueStuckDeployment(d, newStatus) - return nil + // Calculate extra status annotation + extraStatusAnno, err := dc.updateDeploymentExtraStatus(ctx, newRS, d) + if err != nil { + return nil // no need to retry } - newDeployment := d - newDeployment.Status = newStatus - err := dc.runtimeClient.Status().Update(ctx, newDeployment) + // Update both status and annotation + err = dc.patchDeploymentStatusAndAnnotation(ctx, d, newStatus, extraStatusAnno) if err != nil { - klog.Errorf("Failed to update deployment status in progress: %v", err) + return err } - return err + + // Requeue the deployment if required. + dc.requeueStuckDeployment(d, newStatus) + return nil } // getReplicaFailures will convert replica failure conditions from replica sets diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 8a0a63de..84695936 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -19,6 +19,7 @@ package deployment import ( "context" + "encoding/json" "fmt" "reflect" "sort" @@ -497,20 +498,80 @@ func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs [] } // syncDeploymentStatus checks if the status is up-to-date and sync it if necessary +// It also updates the extra status annotation for advanced deployment func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error { newStatus := calculateStatus(allRSs, newRS, d, &dc.strategy) - if reflect.DeepEqual(d.Status, newStatus) { + // Calculate extra status annotation + extraStatusAnno, err := dc.updateDeploymentExtraStatus(ctx, newRS, d) + if err != nil { + return nil // no need to retry + } + + // Update both status and annotation + return dc.patchDeploymentStatusAndAnnotation(ctx, d, newStatus, extraStatusAnno) +} + +// updateDeploymentExtraStatus updates the extra status annotation for advanced deployment +func (dc *DeploymentController) updateDeploymentExtraStatus(ctx context.Context, newRS *apps.ReplicaSet, d *apps.Deployment) (string, error) { + // For consistency with original logic, we can optionally get the latest deployment + // if the passed deployment might be stale. However, in most cases, the passed data + // should be fresh enough since it comes from the current reconciliation cycle. + + updatedReadyReplicas := int32(0) + if newRS != nil { + updatedReadyReplicas = newRS.Status.ReadyReplicas + } + + extraStatus := &rolloutsv1alpha1.DeploymentExtraStatus{ + UpdatedReadyReplicas: updatedReadyReplicas, + ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, d), + } + + extraStatusByte, err := json.Marshal(extraStatus) + if err != nil { + klog.Errorf("Failed to marshal extra status for Deployment %v, err: %v", klog.KObj(d), err) + return "", err + } + + return string(extraStatusByte), nil +} + +// patchDeploymentStatusAndAnnotation updates both status and annotation in one operation +func (dc *DeploymentController) patchDeploymentStatusAndAnnotation(ctx context.Context, d *apps.Deployment, newStatus apps.DeploymentStatus, extraStatusAnno string) error { + statusNeedsUpdate := !reflect.DeepEqual(d.Status, newStatus) + annotationNeedsUpdate := d.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] != extraStatusAnno + + // If neither status nor annotation needs update, return early + if !statusNeedsUpdate && !annotationNeedsUpdate { return nil } - newDeployment := d - newDeployment.Status = newStatus - err := dc.runtimeClient.Status().Update(ctx, newDeployment) + // Create a copy for updating both status and annotation + deploymentCopy := d.DeepCopy() + + // Update status if needed + if statusNeedsUpdate { + deploymentCopy.Status = newStatus + } + + // Update annotation if needed + if annotationNeedsUpdate { + if deploymentCopy.Annotations == nil { + deploymentCopy.Annotations = make(map[string]string) + } + deploymentCopy.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] = extraStatusAnno + } + + // Use Strategic Merge Patch to update both status and annotation in one operation + patch := client.MergeFrom(d) + err := dc.runtimeClient.Patch(ctx, deploymentCopy, patch) if err != nil { - klog.Errorf("Failed to sync deployment status: %v", err) + klog.Errorf("Failed to patch deployment status and annotation: %v", err) + return err } - return err + + return nil } // calculateStatus calculates the latest status for the provided deployment by looking into the provided replica sets.