Skip to content

Commit 838d4e7

Browse files
fix(trafficrouting): ensure DestinationRule is updated before SetWeight on rollback (#4612)
* fix(istio): ensure DestinationRule is updated before SetWeight on rollback - Return error from UpdateHash when delaying DR switch so SetWeight is not called, preventing VS weight update before DR subset switch (fixes rollback traffic hitting wrong subsets). - When shifting traffic to stable only (abort or dynamic rollback), only require stable RS availability so abort still completes when canary is failed (no regression of #4128). Signed-off-by: Andrew Brown <andrew.brown@wealthsimple.com> * chore: add trafficrouting test for #4612 Signed-off-by: Andrew Brown <andrew.brown@wealthsimple.com> * refactor Signed-off-by: Andrew Brown <andrew.brown@wealthsimple.com> * trigger build Signed-off-by: Andrew Brown <andrew.brown@wealthsimple.com> * reduce cognitive complexity Signed-off-by: Andrew Brown <andrew.brown@wealthsimple.com> * remove unused function Signed-off-by: Andrew Brown <andrew.brown@wealthsimple.com> --------- Signed-off-by: Andrew Brown <andrew.brown@wealthsimple.com> Signed-off-by: Zach Aller <zachaller@users.noreply.github.com> Co-authored-by: Zach Aller <zachaller@users.noreply.github.com>
1 parent 48079f5 commit 838d4e7

9 files changed

Lines changed: 467 additions & 83 deletions

File tree

rollout/canary.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -259,28 +259,6 @@ func (c *rolloutContext) scaleDownOldReplicaSetsForCanary(oldRSs []*appsv1.Repli
259259
return totalScaledDown, nil
260260
}
261261

262-
// isDynamicallyRollingBackToStable returns true if we were in the middle of an canary update with
263-
// dynamic stable scaling, but was interrupted and are now rolling back to stable RS. This is similar
264-
// to, but different than aborting. With abort, desired hash != stable hash and so we know the
265-
// two hashes to balance traffic against. But with dynamically rolling back to stable, the
266-
// desired hash == stable hash, and so we must use the *previous* desired hash and balance traffic
267-
// between previous desired vs. stable hash, in order to safely shift traffic back to stable.
268-
// This function also returns the previous desired hash (where we are weighted to)
269-
func isDynamicallyRollingBackToStable(ro *v1alpha1.Rollout, desiredRS *appsv1.ReplicaSet) (bool, string) {
270-
if rolloututil.IsFullyPromoted(ro) && ro.Spec.Strategy.Canary.TrafficRouting != nil && ro.Spec.Strategy.Canary.DynamicStableScale {
271-
if ro.Status.Canary.Weights != nil {
272-
currSelector := ro.Status.Canary.Weights.Canary.PodTemplateHash
273-
desiredSelector := replicasetutil.GetPodTemplateHash(desiredRS)
274-
if currSelector != desiredSelector {
275-
if desiredRS.Status.AvailableReplicas < *ro.Spec.Replicas {
276-
return true, currSelector
277-
}
278-
}
279-
}
280-
}
281-
return false, ""
282-
}
283-
284262
// canProceedWithScaleDownAnnotation returns whether or not it is safe to proceed with annotating
285263
// old replicasets with the scale-down-deadline in the traffic-routed canary strategy.
286264
// This method only matters with ALB canary + the target group verification feature.

rollout/canary_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2107,7 +2107,7 @@ func TestIsDynamicallyRollingBackToStable(t *testing.T) {
21072107
desiredRS.Status.AvailableReplicas = *tc.rsAvailableReplicas
21082108
}
21092109

2110-
rbToStable, _ := isDynamicallyRollingBackToStable(ro, desiredRS)
2110+
rbToStable, _ := rolloututil.IsDynamicallyRollingBackToStable(ro, desiredRS)
21112111

21122112
assert.Equal(t, tc.expectedResult, rbToStable)
21132113
})

rollout/controller_test.go

Lines changed: 86 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ type fixture struct {
119119
// Objects from here preloaded into NewSimpleFake.
120120
kubeobjects []runtime.Object
121121
objects []runtime.Object
122+
// dynamicOnlyObjects: added to dynamic client only (not Argo client). Use for Istio VS/DR so listers don't see unstructured as rollout.
123+
dynamicOnlyObjects []runtime.Object
122124
// Acquire 'enqueuedObjectsLock' before accessing enqueuedObjects
123125
enqueuedObjects map[string]int
124126
enqueuedObjectsLock sync.Mutex
@@ -127,6 +129,10 @@ type fixture struct {
127129
// events holds all the K8s Event Reasons emitted during the run
128130
events []string
129131
fakeTrafficRouting *mocks.TrafficRoutingReconciler
132+
// reseedRolloutMutator, if set, is applied to the rollout when re-seeding between syncs (for multi-sync tests).
133+
reseedRolloutMutator func(*v1alpha1.Rollout)
134+
// allowErrorOnLastSync, if set, do not fail the test when the final sync returns an error (e.g. "delaying destination rule switch").
135+
allowErrorOnLastSync bool
130136
}
131137

132138
func newFixture(t *testing.T) *fixture {
@@ -562,6 +568,11 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
562568
f.client = fake.NewSimpleClientset(f.objects...)
563569
f.kubeclient = k8sfake.NewSimpleClientset(f.kubeobjects...)
564570

571+
dynamicClientObjects := f.objects
572+
if len(f.dynamicOnlyObjects) > 0 {
573+
dynamicClientObjects = append(append([]runtime.Object{}, f.objects...), f.dynamicOnlyObjects...)
574+
}
575+
565576
i := informers.NewSharedInformerFactory(f.client, resync())
566577
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, resync())
567578

@@ -581,7 +592,7 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
581592
destGVR: destGVR.Resource + "List",
582593
}
583594

584-
dynamicClient := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, listMapping, f.objects...)
595+
dynamicClient := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, listMapping, dynamicClientObjects...)
585596
dynamicInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0)
586597
istioVirtualServiceInformer := dynamicInformerFactory.ForResource(istioutil.GetIstioVirtualServiceGVR()).Informer()
587598
istioDestinationRuleInformer := dynamicInformerFactory.ForResource(istioutil.GetIstioDestinationRuleGVR()).Informer()
@@ -647,13 +658,15 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
647658
c.enqueueRolloutAfter = func(obj any, duration time.Duration) {
648659
c.enqueueRollout(obj)
649660
}
650-
c.newTrafficRoutingReconciler = func(roCtx *rolloutContext) ([]trafficrouting.TrafficRoutingReconciler, error) {
651-
if roCtx.rollout.Spec.Strategy.Canary == nil || roCtx.rollout.Spec.Strategy.Canary.TrafficRouting == nil {
652-
return nil, nil
661+
if f.fakeTrafficRouting != nil {
662+
c.newTrafficRoutingReconciler = func(roCtx *rolloutContext) ([]trafficrouting.TrafficRoutingReconciler, error) {
663+
if roCtx.rollout.Spec.Strategy.Canary == nil || roCtx.rollout.Spec.Strategy.Canary.TrafficRouting == nil {
664+
return nil, nil
665+
}
666+
var reconcilers = []trafficrouting.TrafficRoutingReconciler{}
667+
reconcilers = append(reconcilers, f.fakeTrafficRouting)
668+
return reconcilers, nil
653669
}
654-
var reconcilers = []trafficrouting.TrafficRoutingReconciler{}
655-
reconcilers = append(reconcilers, f.fakeTrafficRouting)
656-
return reconcilers, nil
657670
}
658671

659672
for _, r := range f.rolloutLister {
@@ -698,11 +711,68 @@ func (f *fixture) run(rolloutName string) {
698711
f.runController(rolloutName, true, false, c, i, k8sI)
699712
}
700713

714+
// runWithSyncs runs the controller syncHandler n times (e.g. 2 for two reconciliation loops) then verifies actions.
715+
func (f *fixture) runWithSyncs(rolloutName string, syncs int) {
716+
c, i, k8sI := f.newController(noResyncPeriodFunc)
717+
f.runControllerWithSyncs(rolloutName, syncs, true, false, c, i, k8sI)
718+
}
719+
701720
func (f *fixture) runExpectError(rolloutName string, startInformers bool) {
702721
c, i, k8sI := f.newController(noResyncPeriodFunc)
703722
f.runController(rolloutName, startInformers, true, c, i, k8sI)
704723
}
705724

725+
func (f *fixture) runControllerWithSyncs(rolloutName string, syncs int, startInformers bool, expectError bool, c *Controller, i informers.SharedInformerFactory, k8sI kubeinformers.SharedInformerFactory) *Controller {
726+
if startInformers {
727+
stopCh := make(chan struct{})
728+
defer close(stopCh)
729+
i.Start(stopCh)
730+
k8sI.Start(stopCh)
731+
assert.True(f.t, cache.WaitForCacheSync(stopCh, c.replicaSetSynced, c.rolloutsSynced))
732+
}
733+
734+
for n := 0; n < syncs; n++ {
735+
err := c.syncHandler(context.Background(), rolloutName)
736+
allowErr := f.allowErrorOnLastSync && (n == syncs-1)
737+
f.assertSyncHandlerResult(n, syncs, err, expectError, allowErr)
738+
f.reseedRolloutInInformerIfNeeded(c, rolloutName, n, syncs)
739+
}
740+
741+
return f.verifyActionsAndReturn(c)
742+
}
743+
744+
func (f *fixture) assertSyncHandlerResult(n, syncs int, err error, expectError, allowErr bool) {
745+
if !expectError && err != nil && !allowErr {
746+
f.t.Errorf("error syncing rollout (sync %d/%d): %v", n+1, syncs, err)
747+
return
748+
}
749+
if expectError && err == nil {
750+
f.t.Error("expected error syncing rollout, got nil")
751+
}
752+
}
753+
754+
// reseedRolloutInInformer re-seeds the rollout in the informer so the next sync sees a typed Rollout
755+
// (controller writes Unstructured via persistRolloutToInformer). No-op when syncs <= 1 or on the last sync.
756+
func (f *fixture) reseedRolloutInInformerIfNeeded(c *Controller, rolloutName string, n, syncs int) {
757+
if syncs <= 1 || n >= syncs-1 {
758+
return
759+
}
760+
namespace, name, err := cache.SplitMetaNamespaceKey(rolloutName)
761+
if err != nil {
762+
f.t.Fatalf("re-seed rollout: split key: %v", err)
763+
}
764+
ro, err := f.client.ArgoprojV1alpha1().Rollouts(namespace).Get(context.Background(), name, metav1.GetOptions{})
765+
if err != nil {
766+
f.t.Fatalf("re-seed rollout: get: %v", err)
767+
}
768+
if f.reseedRolloutMutator != nil {
769+
f.reseedRolloutMutator(ro)
770+
}
771+
if err := c.rolloutsIndexer.Update(ro); err != nil {
772+
f.t.Fatalf("re-seed rollout: update indexer: %v", err)
773+
}
774+
}
775+
706776
func (f *fixture) runController(rolloutName string, startInformers bool, expectError bool, c *Controller, i informers.SharedInformerFactory, k8sI kubeinformers.SharedInformerFactory) *Controller {
707777
if startInformers {
708778
stopCh := make(chan struct{})
@@ -720,6 +790,10 @@ func (f *fixture) runController(rolloutName string, startInformers bool, expectE
720790
f.t.Error("expected error syncing rollout, got nil")
721791
}
722792

793+
return f.verifyActionsAndReturn(c)
794+
}
795+
796+
func (f *fixture) verifyActionsAndReturn(c *Controller) *Controller {
723797
actions := filterInformerActions(f.client.Actions())
724798
for i, action := range actions {
725799
if len(f.actions) < i+1 {
@@ -752,7 +826,6 @@ func (f *fixture) runController(rolloutName string, startInformers bool, expectE
752826
f.t.Errorf("%d expected actions did not happen:%+v", len(f.kubeactions)-len(k8sActions), f.kubeactions[len(k8sActions):])
753827
}
754828
fakeRecorder := c.recorder.(*record.FakeEventRecorder)
755-
756829
f.events = fakeRecorder.Events()
757830
return c
758831
}
@@ -850,6 +923,11 @@ func (f *fixture) expectUpdatePodAction(p *corev1.Pod) int {
850923
return len
851924
}
852925

926+
func (f *fixture) expectGetRolloutAction(rollout *v1alpha1.Rollout) int {
927+
len := len(f.actions)
928+
f.actions = append(f.actions, core.NewGetAction(v1alpha1.RolloutGVR, rollout.Namespace, rollout.Name))
929+
return len
930+
}
853931
func (f *fixture) expectCreateExperimentAction(ex *v1alpha1.Experiment) int {
854932
action := core.NewCreateAction(schema.GroupVersionResource{Resource: "experiments"}, ex.Namespace, ex)
855933
len := len(f.actions)

rollout/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ func (c *rolloutContext) reconcileStableAndCanaryService() error {
269269
return nil
270270
}
271271

272-
if dynamicallyRollingBackToStable, currSelector := isDynamicallyRollingBackToStable(c.rollout, c.newRS); dynamicallyRollingBackToStable {
272+
if dynamicallyRollingBackToStable, currSelector := rolloututils.IsDynamicallyRollingBackToStable(c.rollout, c.newRS); dynamicallyRollingBackToStable {
273273
// User may have interrupted an update in order go back to stableRS, and is using dynamic
274274
// stable scaling. If that is the case, the stableRS might be undersized and if we blindly
275275
// switch service selector we could overwhelm stableRS pods.

rollout/trafficrouting.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (c *rolloutContext) reconcileTrafficRouting() error {
189189
canaryHash = c.newRS.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
190190
}
191191

192-
if dynamicallyRollingBackToStable, prevDesiredHash := isDynamicallyRollingBackToStable(c.rollout, c.newRS); dynamicallyRollingBackToStable {
192+
if dynamicallyRollingBackToStable, prevDesiredHash := rolloututil.IsDynamicallyRollingBackToStable(c.rollout, c.newRS); dynamicallyRollingBackToStable {
193193
desiredWeight = c.calculateDesiredWeightOnAbortOrStableRollback()
194194
// Since stableRS == desiredRS, we must balance traffic between the
195195
// *previous desired* vs. stable (as opposed to current desired vs. stable).

rollout/trafficrouting/istio/istio.go

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
istioutil "github.com/argoproj/argo-rollouts/utils/istio"
2828
logutil "github.com/argoproj/argo-rollouts/utils/log"
2929
"github.com/argoproj/argo-rollouts/utils/record"
30+
rolloututil "github.com/argoproj/argo-rollouts/utils/rollout"
3031
)
3132

3233
const Http = "http"
@@ -36,12 +37,11 @@ const Type = "Istio"
3637

3738
const SpecHttpNotFound = "spec.http not found"
3839

39-
// NewReconciler returns a reconciler struct that brings the Virtual Service into the desired state
40+
// NewReconciler returns a reconciler struct that brings the Virtual Service into the desired state.
4041
func NewReconciler(r *v1alpha1.Rollout, client dynamic.Interface, recorder record.EventRecorder, virtualServiceLister, destinationRuleLister dynamiclister.Lister, replicaSets []*appsv1.ReplicaSet) *Reconciler {
4142
return &Reconciler{
42-
rollout: r,
43-
log: logutil.WithRollout(r),
44-
43+
rollout: r,
44+
log: logutil.WithRollout(r),
4545
client: client,
4646
recorder: recorder,
4747
virtualServiceLister: virtualServiceLister,
@@ -350,21 +350,36 @@ func (r *Reconciler) reconcileVirtualService(obj *unstructured.Unstructured, vsv
350350
return newObj, len(patches) > 0, err
351351
}
352352

353-
func (r *Reconciler) UpdateHash(canaryHash, stableHash string, additionalDestinations ...v1alpha1.WeightDestination) error {
354-
// We need to check if the replicasets are ready here as well if we didn't define any services in the rollout
355-
// See: https://github.com/argoproj/argo-rollouts/issues/2507
356-
if r.rollout.Spec.Strategy.Canary.CanaryService == "" && r.rollout.Spec.Strategy.Canary.StableService == "" {
357-
358-
for _, rs := range r.replicaSets {
359-
if *rs.Spec.Replicas > 0 {
360-
rsHash := rs.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
361-
// Only check availability for ReplicaSets that will receive traffic
362-
if (rsHash == stableHash || rsHash == canaryHash) && rsHash != "" && !replicasetutil.IsReplicaSetAvailable(rs) {
363-
r.log.Infof("delaying destination rule switch: ReplicaSet %s not fully available", rs.Name)
364-
return nil
365-
}
366-
}
353+
// shouldDelayDestinationRuleUpdate returns true if updating the DestinationRule should be
354+
// delayed because a traffic-receiving ReplicaSet is not yet fully available.
355+
// See: https://github.com/argoproj/argo-rollouts/issues/2507
356+
func (r *Reconciler) shouldDelayDestinationRuleUpdate(canaryHash, stableHash string) (bool, string) {
357+
if r.rollout.Spec.Strategy.Canary.CanaryService != "" || r.rollout.Spec.Strategy.Canary.StableService != "" {
358+
return false, ""
359+
}
360+
abortOrDynamicRollbackToStable := rolloututil.AbortOrDynamicRollbackToStable(r.rollout, r.replicaSets, stableHash)
361+
for _, rs := range r.replicaSets {
362+
if *rs.Spec.Replicas == 0 {
363+
continue
364+
}
365+
rsHash := rs.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
366+
if rsHash == "" || (rsHash != stableHash && rsHash != canaryHash) {
367+
continue
367368
}
369+
if abortOrDynamicRollbackToStable && rsHash == canaryHash {
370+
continue
371+
}
372+
if !replicasetutil.IsReplicaSetAvailable(rs) {
373+
return true, rs.Name
374+
}
375+
}
376+
return false, ""
377+
}
378+
379+
func (r *Reconciler) UpdateHash(canaryHash, stableHash string, additionalDestinations ...v1alpha1.WeightDestination) error {
380+
if shouldDelay, rsName := r.shouldDelayDestinationRuleUpdate(canaryHash, stableHash); shouldDelay {
381+
r.log.Infof("delaying destination rule switch: ReplicaSet %s not fully available", rsName)
382+
return fmt.Errorf("delaying destination rule switch: ReplicaSet %s not fully available", rsName)
368383
}
369384

370385
dRuleSpec := r.rollout.Spec.Strategy.Canary.TrafficRouting.Istio.DestinationRule

0 commit comments

Comments
 (0)