@@ -17,6 +17,7 @@ package k8sutil
1717import (
1818 "context"
1919 "fmt"
20+ "strings"
2021 "time"
2122
2223 "emperror.dev/errors"
@@ -42,54 +43,10 @@ func IsMarkedForDeletion(m metav1.ObjectMeta) bool {
4243}
4344
4445// UpdateBrokerStatus updates the broker status with rack and configuration infos
45- func UpdateBrokerStatus (c client.Client , brokerId string , cluster * v1beta1.KafkaCluster , state interface {}, logger logr.Logger ) error {
46+ func UpdateBrokerStatus (c client.Client , brokerIds [] string , cluster * v1beta1.KafkaCluster , state interface {}, logger logr.Logger ) error {
4647 typeMeta := cluster .TypeMeta
4748
48- if cluster .Status .BrokersState == nil {
49- switch s := state .(type ) {
50- case banzaicloudv1beta1.RackAwarenessState :
51- cluster .Status .BrokersState = map [string ]banzaicloudv1beta1.BrokerState {brokerId : {RackAwarenessState : s }}
52- case banzaicloudv1beta1.GracefulActionState :
53- cluster .Status .BrokersState = map [string ]banzaicloudv1beta1.BrokerState {brokerId : {GracefulActionState : s }}
54- case banzaicloudv1beta1.ConfigurationState :
55- cluster .Status .BrokersState = map [string ]banzaicloudv1beta1.BrokerState {brokerId : {ConfigurationState : s }}
56- }
57- } else if val , ok := cluster .Status .BrokersState [brokerId ]; ok {
58- switch s := state .(type ) {
59- case banzaicloudv1beta1.RackAwarenessState :
60- val .RackAwarenessState = s
61- case banzaicloudv1beta1.GracefulActionState :
62- val .GracefulActionState = s
63- case banzaicloudv1beta1.ConfigurationState :
64- val .ConfigurationState = s
65- }
66- cluster .Status .BrokersState [brokerId ] = val
67- } else {
68- switch s := state .(type ) {
69- case banzaicloudv1beta1.RackAwarenessState :
70- cluster .Status .BrokersState [brokerId ] = banzaicloudv1beta1.BrokerState {RackAwarenessState : s }
71- case banzaicloudv1beta1.GracefulActionState :
72- cluster .Status .BrokersState [brokerId ] = banzaicloudv1beta1.BrokerState {GracefulActionState : s }
73- case banzaicloudv1beta1.ConfigurationState :
74- cluster .Status .BrokersState [brokerId ] = banzaicloudv1beta1.BrokerState {ConfigurationState : s }
75- }
76- }
77-
78- err := c .Status ().Update (context .Background (), cluster )
79- if apierrors .IsNotFound (err ) {
80- err = c .Update (context .Background (), cluster )
81- }
82- if err != nil {
83- if ! apierrors .IsConflict (err ) {
84- return errors .WrapIff (err , "could not update Kafka broker %s state" , brokerId )
85- }
86- err := c .Get (context .TODO (), types.NamespacedName {
87- Namespace : cluster .Namespace ,
88- Name : cluster .Name ,
89- }, cluster )
90- if err != nil {
91- return errors .WrapIf (err , "could not get config for updating status" )
92- }
49+ for _ , brokerId := range brokerIds {
9350
9451 if cluster .Status .BrokersState == nil {
9552 switch s := state .(type ) {
@@ -120,13 +77,63 @@ func UpdateBrokerStatus(c client.Client, brokerId string, cluster *v1beta1.Kafka
12077 cluster .Status .BrokersState [brokerId ] = banzaicloudv1beta1.BrokerState {ConfigurationState : s }
12178 }
12279 }
80+ }
81+
82+ err := c .Status ().Update (context .Background (), cluster )
83+ if apierrors .IsNotFound (err ) {
84+ err = c .Update (context .Background (), cluster )
85+ }
86+ if err != nil {
87+ if ! apierrors .IsConflict (err ) {
88+ return errors .WrapIff (err , "could not update Kafka broker(s) %s state" , strings .Join (brokerIds , "," ))
89+ }
90+ err := c .Get (context .TODO (), types.NamespacedName {
91+ Namespace : cluster .Namespace ,
92+ Name : cluster .Name ,
93+ }, cluster )
94+ if err != nil {
95+ return errors .WrapIf (err , "could not get config for updating status" )
96+ }
97+
98+ for _ , brokerId := range brokerIds {
99+
100+ if cluster .Status .BrokersState == nil {
101+ switch s := state .(type ) {
102+ case banzaicloudv1beta1.RackAwarenessState :
103+ cluster .Status .BrokersState = map [string ]banzaicloudv1beta1.BrokerState {brokerId : {RackAwarenessState : s }}
104+ case banzaicloudv1beta1.GracefulActionState :
105+ cluster .Status .BrokersState = map [string ]banzaicloudv1beta1.BrokerState {brokerId : {GracefulActionState : s }}
106+ case banzaicloudv1beta1.ConfigurationState :
107+ cluster .Status .BrokersState = map [string ]banzaicloudv1beta1.BrokerState {brokerId : {ConfigurationState : s }}
108+ }
109+ } else if val , ok := cluster .Status .BrokersState [brokerId ]; ok {
110+ switch s := state .(type ) {
111+ case banzaicloudv1beta1.RackAwarenessState :
112+ val .RackAwarenessState = s
113+ case banzaicloudv1beta1.GracefulActionState :
114+ val .GracefulActionState = s
115+ case banzaicloudv1beta1.ConfigurationState :
116+ val .ConfigurationState = s
117+ }
118+ cluster .Status .BrokersState [brokerId ] = val
119+ } else {
120+ switch s := state .(type ) {
121+ case banzaicloudv1beta1.RackAwarenessState :
122+ cluster .Status .BrokersState [brokerId ] = banzaicloudv1beta1.BrokerState {RackAwarenessState : s }
123+ case banzaicloudv1beta1.GracefulActionState :
124+ cluster .Status .BrokersState [brokerId ] = banzaicloudv1beta1.BrokerState {GracefulActionState : s }
125+ case banzaicloudv1beta1.ConfigurationState :
126+ cluster .Status .BrokersState [brokerId ] = banzaicloudv1beta1.BrokerState {ConfigurationState : s }
127+ }
128+ }
129+ }
123130
124131 err = c .Status ().Update (context .Background (), cluster )
125132 if apierrors .IsNotFound (err ) {
126133 err = c .Update (context .Background (), cluster )
127134 }
128135 if err != nil {
129- return errors .WrapIff (err , "could not update Kafka clusters broker %s state" , brokerId )
136+ return errors .WrapIff (err , "could not update Kafka clusters broker(s) %s state" , strings . Join ( brokerIds , "," ) )
130137 }
131138 }
132139 // update loses the typeMeta of the config that's used later when setting ownerrefs
0 commit comments