Skip to content

Commit cb840ef

Browse files
authored
fix: Wait for full deployment rollout before marking IntegrationSink Ready (#8858)
Previously, IntegrationSink would mark DeploymentReady=True when any replica was available, even if old pods with stale AUTH_POLICIES were still running during a rollout. This caused race conditions where: 1. Tests would send requests hitting old pods with outdated policies, resulting in 403 errors and CI flakes 2. Production clients could see Ready=True but hit pods without the latest EventPolicy changes Now PropagateDeploymentStatus checks that: - ObservedGeneration == Generation (controller processed latest spec) - UpdatedReplicas == Replicas (all pods updated to new template) - AvailableReplicas == Replicas (all updated pods are ready) This ensures IntegrationSink only becomes Ready when all pods have the current AUTH_POLICIES configuration, fixing both test flakes and the production correctness issue.
1 parent 86c43a6 commit cb840ef

File tree

6 files changed

+81
-28
lines changed

6 files changed

+81
-28
lines changed

pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -106,22 +106,51 @@ func (s *IntegrationSinkStatus) MarkEventPoliciesTrueWithReason(reason, messageF
106106
IntegrationSinkCondSet.Manage(s).MarkTrueWithReason(IntegrationSinkConditionEventPoliciesReady, reason, messageFormat, messageA...)
107107
}
108108

109-
func (s *IntegrationSinkStatus) PropagateDeploymentStatus(d *appsv1.DeploymentStatus) {
109+
func (s *IntegrationSinkStatus) PropagateDeploymentStatus(d *appsv1.Deployment) {
110+
// A deployment is fully rolled out when:
111+
// 1. ObservedGeneration == Generation: controller has observed the latest spec
112+
// 2. UpdatedReplicas == Replicas: all pods updated to the new pod template
113+
// 3. AvailableReplicas == Replicas: all updated pods are ready
114+
// This ensures EventPolicy changes have propagated to all pods before marking Ready.
115+
desiredReplicas := int32(1)
116+
if d.Spec.Replicas != nil {
117+
desiredReplicas = *d.Spec.Replicas
118+
}
119+
120+
deploymentFullyRolledOut := d.Status.ObservedGeneration == d.Generation &&
121+
d.Status.UpdatedReplicas == desiredReplicas &&
122+
d.Status.AvailableReplicas == desiredReplicas
123+
124+
if deploymentFullyRolledOut {
125+
IntegrationSinkCondSet.Manage(s).MarkTrue(IntegrationSinkConditionDeploymentReady)
126+
return
127+
}
128+
129+
// Check deployment conditions for error states
110130
deploymentAvailableFound := false
111-
for _, cond := range d.Conditions {
131+
for _, cond := range d.Status.Conditions {
112132
if cond.Type == appsv1.DeploymentAvailable {
113133
deploymentAvailableFound = true
114-
if cond.Status == corev1.ConditionTrue {
115-
IntegrationSinkCondSet.Manage(s).MarkTrue(IntegrationSinkConditionDeploymentReady)
116-
} else if cond.Status == corev1.ConditionFalse {
134+
if cond.Status == corev1.ConditionFalse {
117135
IntegrationSinkCondSet.Manage(s).MarkFalse(IntegrationSinkConditionDeploymentReady, cond.Reason, cond.Message)
118-
} else if cond.Status == corev1.ConditionUnknown {
119-
IntegrationSinkCondSet.Manage(s).MarkUnknown(IntegrationSinkConditionDeploymentReady, cond.Reason, cond.Message)
136+
return
120137
}
121138
}
139+
// Also check Progressing condition for failures (e.g., ImagePullBackOff, insufficient quota)
140+
if cond.Type == appsv1.DeploymentProgressing && cond.Status == corev1.ConditionFalse {
141+
IntegrationSinkCondSet.Manage(s).MarkFalse(IntegrationSinkConditionDeploymentReady, cond.Reason, cond.Message)
142+
return
143+
}
122144
}
123-
if !deploymentAvailableFound {
124-
IntegrationSinkCondSet.Manage(s).MarkUnknown(IntegrationSinkConditionDeploymentReady, "DeploymentUnavailable", "The Deployment '%s' is unavailable.", d)
145+
146+
// Deployment is progressing but not fully rolled out yet
147+
if deploymentAvailableFound {
148+
IntegrationSinkCondSet.Manage(s).MarkUnknown(IntegrationSinkConditionDeploymentReady,
149+
"DeploymentRollingOut",
150+
"Deployment rollout in progress: %d/%d replicas updated and available",
151+
d.Status.AvailableReplicas, desiredReplicas)
152+
} else {
153+
IntegrationSinkCondSet.Manage(s).MarkUnknown(IntegrationSinkConditionDeploymentReady, "DeploymentUnavailable", "The Deployment is unavailable")
125154
}
126155
}
127156

pkg/reconciler/integration/sink/integrationsink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (r *Reconciler) reconcileDeployment(ctx context.Context, sink *sinks.Integr
177177
logging.FromContext(ctx).Debugw("Reusing existing Deployment", zap.Any("Deployment", deployment))
178178
}
179179

180-
sink.Status.PropagateDeploymentStatus(&deployment.Status)
180+
sink.Status.PropagateDeploymentStatus(deployment)
181181
return deployment, nil
182182
}
183183

pkg/reconciler/integration/sink/integrationsink_test.go

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -193,24 +193,35 @@ func makeDeployment(sink *sinksv1alpha1.IntegrationSink, ready *corev1.Condition
193193
}
194194
if *ready == corev1.ConditionTrue {
195195
status.ReadyReplicas = 1
196+
status.UpdatedReplicas = 1
197+
status.AvailableReplicas = 1
198+
status.Replicas = 1
199+
status.ObservedGeneration = 1
196200
}
197201
}
198202

203+
objectMeta := metav1.ObjectMeta{
204+
Name: deploymentName,
205+
Namespace: sink.Namespace,
206+
OwnerReferences: []metav1.OwnerReference{
207+
*kmeta.NewControllerRef(sink),
208+
},
209+
Labels: integration.Labels(sink.Name),
210+
}
211+
// Simulate what Kubernetes API server would set for existing deployments
212+
if ready != nil {
213+
objectMeta.Generation = 1
214+
}
215+
199216
d := &appsv1.Deployment{
200217
TypeMeta: metav1.TypeMeta{
201218
APIVersion: "apps/v1",
202219
Kind: "Deployment",
203220
},
204-
ObjectMeta: metav1.ObjectMeta{
205-
Name: deploymentName,
206-
Namespace: sink.Namespace,
207-
OwnerReferences: []metav1.OwnerReference{
208-
*kmeta.NewControllerRef(sink),
209-
},
210-
Labels: integration.Labels(sink.Name),
211-
},
212-
Status: status,
221+
ObjectMeta: objectMeta,
222+
Status: status,
213223
Spec: appsv1.DeploymentSpec{
224+
Replicas: ptr.To(int32(1)),
214225
Selector: &metav1.LabelSelector{
215226
MatchLabels: integration.Labels(sink.Name),
216227
},
@@ -376,12 +387,23 @@ func makeIntegrationSinkSpec() sinksv1alpha1.IntegrationSinkSpec {
376387
}
377388
}
378389

379-
func makeDeploymentStatus(ready *corev1.ConditionStatus) *appsv1.DeploymentStatus {
380-
return &appsv1.DeploymentStatus{
381-
Conditions: []appsv1.DeploymentCondition{{
382-
Type: appsv1.DeploymentAvailable,
383-
Status: *ready,
384-
}},
385-
Replicas: 1,
390+
func makeDeploymentStatus(ready *corev1.ConditionStatus) *appsv1.Deployment {
391+
return &appsv1.Deployment{
392+
ObjectMeta: metav1.ObjectMeta{
393+
Generation: 1,
394+
},
395+
Spec: appsv1.DeploymentSpec{
396+
Replicas: ptr.To(int32(1)),
397+
},
398+
Status: appsv1.DeploymentStatus{
399+
Conditions: []appsv1.DeploymentCondition{{
400+
Type: appsv1.DeploymentAvailable,
401+
Status: *ready,
402+
}},
403+
Replicas: 1,
404+
UpdatedReplicas: 1,
405+
AvailableReplicas: 1,
406+
ObservedGeneration: 1,
407+
},
386408
}
387409
}

pkg/reconciler/integration/sink/resources/container_image.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ func MakeDeploymentSpec(sink *v1alpha1.IntegrationSink, authProxyImage string, f
6262
Labels: integration.Labels(sink.Name),
6363
},
6464
Spec: appsv1.DeploymentSpec{
65+
Replicas: ptr.To(int32(1)),
6566
Selector: &metav1.LabelSelector{
6667
MatchLabels: integration.Labels(sink.Name),
6768
},

pkg/reconciler/integration/sink/resources/container_image_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ func TestNewSQSContainerSink(t *testing.T) {
8888
Labels: integration.Labels(testName),
8989
},
9090
Spec: appsv1.DeploymentSpec{
91+
Replicas: ptr.To(int32(1)),
9192
Selector: &metav1.LabelSelector{
9293
MatchLabels: integration.Labels(testName),
9394
},

pkg/reconciler/testing/v1alpha1/integrationsink.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ func WithInitIntegrationSinkConditions(s *v1alpha1.IntegrationSink) {
5757
s.Status.InitializeConditions()
5858
}
5959

60-
func WithIntegrationSinkPropagateDeploymenteStatus(status *appsv1.DeploymentStatus) IntegrationSinkOption {
60+
func WithIntegrationSinkPropagateDeploymenteStatus(deployment *appsv1.Deployment) IntegrationSinkOption {
6161
return func(s *v1alpha1.IntegrationSink) {
62-
s.Status.PropagateDeploymentStatus(status)
62+
s.Status.PropagateDeploymentStatus(deployment)
6363
}
6464
}
6565

0 commit comments

Comments
 (0)