@@ -143,6 +143,21 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
143143
144144 log .V (1 ).Info ("Reconciling" )
145145
146+ //TODO remove this in a future release (baluchicken)
147+ // this is only required to keep backward compatibility
148+ var brokerIdWithDeprecatedStatus []string
149+ for bId , bStatus := range r .KafkaCluster .Status .BrokersState {
150+ if bStatus .GracefulActionState .CruiseControlState == "GracefulUpdateNotRequired" {
151+ brokerIdWithDeprecatedStatus = append (brokerIdWithDeprecatedStatus , bId )
152+ }
153+ }
154+ statusErr := k8sutil .UpdateBrokerStatus (r .Client , brokerIdWithDeprecatedStatus , r .KafkaCluster ,
155+ v1beta1.GracefulActionState {CruiseControlState : v1beta1 .GracefulUpscaleSucceeded }, log )
156+
157+ if statusErr != nil {
158+ return errorfactory .New (errorfactory.StatusUpdateError {}, statusErr , "updating deprecated status failed" )
159+ }
160+
146161 if r .KafkaCluster .Spec .HeadlessServiceEnabled {
147162 o := r .headlessService ()
148163 err := k8sutil .Reconcile (log , r .Client , o , r .KafkaCluster )
@@ -569,7 +584,7 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod)
569584 statusErr := k8sutil .UpdateBrokerStatus (r .Client , []string {desiredPod .Labels ["brokerId" ]}, r .KafkaCluster , v1beta1 .ConfigInSync , log )
570585
571586 if statusErr != nil {
572- return errorfactory .New (errorfactory.StatusUpdateError {}, err , "updating status for resource failed" , "kind" , desiredType )
587+ return errorfactory .New (errorfactory.StatusUpdateError {}, statusErr , "updating status for resource failed" , "kind" , desiredType )
573588 }
574589
575590 if val , ok := r .KafkaCluster .Status .BrokersState [desiredPod .Labels ["brokerId" ]]; ok && val .GracefulActionState .CruiseControlState != v1beta1 .GracefulUpscaleSucceeded {
@@ -580,7 +595,7 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod)
580595 }
581596 statusErr = k8sutil .UpdateBrokerStatus (r .Client , []string {desiredPod .Labels ["brokerId" ]}, r .KafkaCluster , gracefulActionState , log )
582597 if statusErr != nil {
583- return errorfactory .New (errorfactory.StatusUpdateError {}, err , "could not update broker graceful action state" )
598+ return errorfactory .New (errorfactory.StatusUpdateError {}, statusErr , "could not update broker graceful action state" )
584599 }
585600 }
586601 log .Info ("resource created" )
@@ -598,7 +613,7 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod)
598613 }
599614 statusErr := k8sutil .UpdateBrokerStatus (r .Client , []string {brokerId }, r .KafkaCluster , rackAwarenessState , log )
600615 if statusErr != nil {
601- return errorfactory .New (errorfactory.StatusUpdateError {}, err , "updating status for resource failed" , "kind" , desiredType )
616+ return errorfactory .New (errorfactory.StatusUpdateError {}, statusErr , "updating status for resource failed" , "kind" , desiredType )
602617 }
603618 }
604619 } else {
0 commit comments