Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 2 additions & 25 deletions pkg/controller/deployment/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
appslisters "k8s.io/client-go/listers/apps/v1"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -83,19 +81,6 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
cacher := mgr.GetCache()
dInformer, err := cacher.GetInformerForKind(context.TODO(), appsv1.SchemeGroupVersion.WithKind("Deployment"))
if err != nil {
return nil, err
}
rsInformer, err := cacher.GetInformerForKind(context.TODO(), appsv1.SchemeGroupVersion.WithKind("ReplicaSet"))
if err != nil {
return nil, err
}

// Lister
dLister := appslisters.NewDeploymentLister(dInformer.(toolscache.SharedIndexInformer).GetIndexer())
rsLister := appslisters.NewReplicaSetLister(rsInformer.(toolscache.SharedIndexInformer).GetIndexer())

// Client & Recorder
genericClient := clientutil.GetGenericClientWithName("advanced-deployment-controller")
Expand All @@ -106,11 +91,9 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {

// Deployment controller factory
factory := &controllerFactory{
client: genericClient.KubeClient,
runtimeClient: mgr.GetClient(),
eventBroadcaster: eventBroadcaster,
eventRecorder: recorder,
dLister: dLister,
rsLister: rsLister,
}
return &ReconcileDeployment{Client: mgr.GetClient(), controllerFactory: factory}, nil
}
Expand Down Expand Up @@ -203,10 +186,6 @@ func (r *ReconcileDeployment) Reconcile(_ context.Context, request reconcile.Req
if err != nil {
errList = append(errList, field.InternalError(field.NewPath("syncDeployment"), err))
}
err = dc.patchExtraStatus(deployment)
if err != nil {
errList = append(errList, field.InternalError(field.NewPath("patchExtraStatus"), err))
}
if len(errList) > 0 {
return ctrl.Result{}, errList.ToAggregate()
}
Expand Down Expand Up @@ -267,11 +246,9 @@ func (f *controllerFactory) NewController(deployment *appsv1.Deployment) *Deploy
klog.V(4).Infof("Processing deployment %v strategy %v", klog.KObj(deployment), string(marshaled))

return &DeploymentController{
client: f.client,
runtimeClient: f.runtimeClient,
eventBroadcaster: f.eventBroadcaster,
eventRecorder: f.eventRecorder,
dLister: f.dLister,
rsLister: f.rsLister,
strategy: strategy,
}
}
71 changes: 15 additions & 56 deletions pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,19 @@ package deployment

import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"time"

utilclient "github.com/openkruise/rollouts/pkg/util/client"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
appslisters "k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
Expand All @@ -56,18 +52,13 @@ var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment")
// DeploymentController is responsible for synchronizing Deployment objects stored
// in the system with actual running replica sets and pods.
type DeploymentController struct {
client clientset.Interface

eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder

// dLister can list/get deployments from the shared informer's store
dLister appslisters.DeploymentLister
// rsLister can list/get replica sets from the shared informer's store
rsLister appslisters.ReplicaSetLister

// we will use this strategy to replace spec.strategy of deployment
strategy rolloutsv1alpha1.DeploymentStrategy

runtimeClient client.Client
}

// getReplicaSetsForDeployment uses ControllerRefManager to reconcile
Expand All @@ -78,15 +69,18 @@ func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context,
if err != nil {
return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
}
// List all ReplicaSets to find those we own but that no longer match our
// selector. They will be orphaned by ClaimReplicaSets().
allRSs, err := dc.rsLister.ReplicaSets(d.Namespace).List(deploymentSelector)

// List all ReplicaSets using runtimeClient
rsList := &apps.ReplicaSetList{}
err = dc.runtimeClient.List(ctx, rsList, client.InNamespace(d.Namespace), client.MatchingLabelsSelector{Selector: deploymentSelector}, utilclient.DisableDeepCopy)
if err != nil {
return nil, fmt.Errorf("list %s/%s rs failed:%v", d.Namespace, d.Name, err)
}

// select rs owner by current deployment
ownedRSs := make([]*apps.ReplicaSet, 0)
for _, rs := range allRSs {
for i := range rsList.Items {
rs := &rsList.Items[i]
if !rs.DeletionTimestamp.IsZero() {
continue
}
Expand Down Expand Up @@ -116,7 +110,10 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment *
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if d.Status.ObservedGeneration < d.Generation {
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
err := dc.runtimeClient.Status().Update(ctx, d)
if err != nil {
klog.Errorf("Failed to update deployment status: %v", err)
}
}
return nil
}
Expand Down Expand Up @@ -147,41 +144,3 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, deployment *

return dc.rolloutRolling(ctx, d, rsList)
}

// patchExtraStatus will update extra status for advancedStatus
func (dc *DeploymentController) patchExtraStatus(deployment *apps.Deployment) error {
rsList, err := dc.getReplicaSetsForDeployment(context.TODO(), deployment)
if err != nil {
return err
}

updatedReadyReplicas := int32(0)
newRS := deploymentutil.FindNewReplicaSet(deployment, rsList)
if newRS != nil {
updatedReadyReplicas = newRS.Status.ReadyReplicas
}

extraStatus := &rolloutsv1alpha1.DeploymentExtraStatus{
UpdatedReadyReplicas: updatedReadyReplicas,
ExpectedUpdatedReplicas: deploymentutil.NewRSReplicasLimit(dc.strategy.Partition, deployment),
}

extraStatusByte, err := json.Marshal(extraStatus)
if err != nil {
klog.Errorf("Failed to marshal extra status for Deployment %v, err: %v", klog.KObj(deployment), err)
return nil // no need to retry
}

extraStatusAnno := string(extraStatusByte)
if deployment.Annotations[rolloutsv1alpha1.DeploymentExtraStatusAnnotation] == extraStatusAnno {
return nil // no need to update
}

body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`,
rolloutsv1alpha1.DeploymentExtraStatusAnnotation,
strings.Replace(extraStatusAnno, `"`, `\"`, -1))

_, err = dc.client.AppsV1().Deployments(deployment.Namespace).
Patch(context.TODO(), deployment.Name, types.MergePatchType, []byte(body), metav1.PatchOptions{})
return err
}
148 changes: 77 additions & 71 deletions pkg/controller/deployment/deployment_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import (
"strconv"
"strings"
"testing"
"time"

apps "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/api/errors"
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
appslisters "k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake"

rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/controller/deployment/util"
Expand Down Expand Up @@ -126,78 +126,61 @@ func TestSyncDeployment(t *testing.T) {

for name, test := range tests {
t.Run(name, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fakeRecord := record.NewFakeRecorder(10)
informers := informers.NewSharedInformerFactory(fakeClient, 0)
rsInformer := informers.Apps().V1().ReplicaSets().Informer()
dInformer := informers.Apps().V1().Deployments().Informer()

var deployment apps.Deployment
var newRS apps.ReplicaSet
{
deployment = generateDeployment("busybox")
deployment.Spec.Replicas = pointer.Int32(test.dReplicas)
deployment.Status.ReadyReplicas = test.newRSReplicas
availableReplicas := test.newRSAvailable
for _, available := range test.oldRSsAvailable {
availableReplicas += available
}
deployment.Status.UpdatedReplicas = test.newRSReplicas
deployment.Status.Replicas = availableReplicas
deployment.Status.AvailableReplicas = availableReplicas
dInformer.GetIndexer().Add(&deployment)
_, err := fakeClient.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), &deployment, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
{
for index, replicas := range test.oldRSsReplicas {
rs := generateRS(deployment)
rs.SetName(fmt.Sprintf("rs-%d", index))
rs.Spec.Replicas = pointer.Int32(replicas)
rs.Status.Replicas = replicas
if strings.HasPrefix(name, "scale") {
rs.Annotations = map[string]string{
util.ReplicasAnnotation: strconv.Itoa(-1),
util.MaxReplicasAnnotation: strconv.Itoa(int(test.dAvailable + test.maxSurge.IntVal)),
}
}
rs.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("old-version-%d", index)
rs.Status.ReadyReplicas = test.oldRSsAvailable[index]
rs.Status.AvailableReplicas = test.oldRSsAvailable[index]
rsInformer.GetIndexer().Add(&rs)
_, err := fakeClient.AppsV1().ReplicaSets(rs.Namespace).Create(context.TODO(), &rs, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
deployment := generateDeployment("busybox")
deployment.Spec.Replicas = pointer.Int32(test.dReplicas)
deployment.Status.ReadyReplicas = test.newRSReplicas
availableReplicas := test.newRSAvailable
for _, available := range test.oldRSsAvailable {
availableReplicas += available
}
{
newRS = generateRS(deployment)
newRS.SetName("rs-new")
newRS.Spec.Replicas = pointer.Int32(test.newRSReplicas)
deployment.Status.UpdatedReplicas = test.newRSReplicas
deployment.Status.Replicas = availableReplicas
deployment.Status.AvailableReplicas = availableReplicas

var allObjects []ctrlclient.Object
allObjects = append(allObjects, &deployment)

for index, replicas := range test.oldRSsReplicas {
rs := generateRS(deployment)
rs.SetName(fmt.Sprintf("rs-%d", index))
rs.Spec.Replicas = pointer.Int32(replicas)
rs.Status.Replicas = replicas
if strings.HasPrefix(name, "scale") {
newRS.Annotations = map[string]string{
rs.Annotations = map[string]string{
util.ReplicasAnnotation: strconv.Itoa(-1),
util.MaxReplicasAnnotation: strconv.Itoa(int(test.dAvailable + test.maxSurge.IntVal)),
}
}
newRS.Status.Replicas = test.newRSReplicas
newRS.Status.ReadyReplicas = test.newRSAvailable
newRS.Status.AvailableReplicas = test.newRSAvailable
rsInformer.GetIndexer().Add(&newRS)
_, err := fakeClient.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{})
if err != nil {
t.Fatalf("got unexpected error: %v", err)
rs.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("old-version-%d", index)
rs.Status.ReadyReplicas = test.oldRSsAvailable[index]
rs.Status.AvailableReplicas = test.oldRSsAvailable[index]
allObjects = append(allObjects, &rs)
}

newRS := generateRS(deployment)
newRS.SetName("rs-new")
newRS.Spec.Replicas = pointer.Int32(test.newRSReplicas)
if strings.HasPrefix(name, "scale") {
newRS.Annotations = map[string]string{
util.ReplicasAnnotation: strconv.Itoa(-1),
util.MaxReplicasAnnotation: strconv.Itoa(int(test.dAvailable + test.maxSurge.IntVal)),
}
}
newRS.Status.Replicas = test.newRSReplicas
newRS.Status.ReadyReplicas = test.newRSAvailable
newRS.Status.AvailableReplicas = test.newRSAvailable
allObjects = append(allObjects, &newRS)

fakeCtrlClient := ctrlfake.NewClientBuilder().
WithObjects(allObjects...).
Build()

// Create a mock event recorder
fakeRecord := record.NewFakeRecorder(10)

dc := &DeploymentController{
client: fakeClient,
eventRecorder: fakeRecord,
dLister: appslisters.NewDeploymentLister(dInformer.GetIndexer()),
rsLister: appslisters.NewReplicaSetLister(rsInformer.GetIndexer()),
runtimeClient: fakeCtrlClient,
strategy: rolloutsv1alpha1.DeploymentStrategy{
RollingUpdate: &apps.RollingUpdateDeployment{
MaxSurge: &test.maxSurge,
Expand All @@ -207,17 +190,40 @@ func TestSyncDeployment(t *testing.T) {
},
}

err := dc.syncDeployment(context.TODO(), &deployment)
if err != nil {
t.Fatalf("got unexpected error: %v", err)
// Retry syncDeployment to handle potential resource conflicts gracefully
// This simulates the behavior of controller-runtime's reconcile loop
var err error
maxRetries := 10
for i := 0; i < maxRetries; i++ {
err = dc.syncDeployment(context.TODO(), &deployment)
if err == nil {
break
}

// Check if it's a conflict error (409)
if errors.IsConflict(err) {
if i < maxRetries-1 {
// Wait a bit before retrying, simulating the reconcile delay
time.Sleep(1 * time.Second)
continue
}
}

// For non-conflict errors or after max retries, break
break
}
rss, err := dc.client.AppsV1().ReplicaSets(deployment.Namespace).List(context.TODO(), metav1.ListOptions{})

if err != nil {
t.Fatalf("got unexpected error: %v", err)
t.Fatalf("got unexpected error after retries: %v", err)
}

var rsList apps.ReplicaSetList
if err := fakeCtrlClient.List(context.TODO(), &rsList); err != nil {
t.Fatalf("list rs error: %v", err)
}
resultOld := int32(0)
resultNew := int32(0)
for _, rs := range rss.Items {
for _, rs := range rsList.Items {
if rs.GetName() != "rs-new" {
resultOld += *rs.Spec.Replicas
} else {
Expand Down
Loading