@@ -57,9 +57,11 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v
5757 // Cruise Control metrics reporter configuration
5858 r .configCCMetricsReporter (broker , config , clientPass , log )
5959
60+ brokerReadOnlyConfig := getBrokerReadOnlyConfig (broker , r .KafkaCluster , log )
61+
6062 // Kafka Broker configurations
6163 if r .KafkaCluster .Spec .KRaftMode {
62- configureBrokerKRaftMode (bConfig , broker .Id , r .KafkaCluster , config , quorumVoters , serverPasses , extListenerStatuses , intListenerStatuses , log )
64+ configureBrokerKRaftMode (bConfig , broker .Id , r .KafkaCluster , config , quorumVoters , serverPasses , extListenerStatuses , intListenerStatuses , log , brokerReadOnlyConfig )
6365 } else {
6466 configureBrokerZKMode (broker .Id , r .KafkaCluster , config , serverPasses , extListenerStatuses , intListenerStatuses , controllerIntListenerStatuses , log )
6567 }
@@ -148,23 +150,42 @@ func (r *Reconciler) configCCMetricsReporter(broker v1beta1.Broker, config *prop
148150}
149151
150152func configureBrokerKRaftMode (bConfig * v1beta1.BrokerConfig , brokerID int32 , kafkaCluster * v1beta1.KafkaCluster , config * properties.Properties ,
151- quorumVoters []string , serverPasses map [string ]string , extListenerStatuses , intListenerStatuses map [string ]v1beta1.ListenerStatusList , log logr.Logger ) {
152- if err := config .Set (kafkautils .KafkaConfigNodeID , brokerID ); err != nil {
153- log .Error (err , fmt .Sprintf (kafkautils .BrokerConfigErrorMsgTemplate , kafkautils .KafkaConfigNodeID ))
154- }
153+ quorumVoters []string , serverPasses map [string ]string , extListenerStatuses , intListenerStatuses map [string ]v1beta1.ListenerStatusList , log logr.Logger ,
154+ brokerReadOnlyConfig * properties.Properties ) {
155+ controllerListenerName := generateControlPlaneListener (kafkaCluster .Spec .ListenersConfig .InternalListeners )
155156
156- if err := config .Set (kafkautils .KafkaConfigProcessRoles , bConfig .Roles ); err != nil {
157- log .Error (err , fmt .Sprintf (kafkautils .BrokerConfigErrorMsgTemplate , kafkautils .KafkaConfigProcessRoles ))
158- }
157+ // when kRaft is enabled for the cluster, brokers can still be configured to use zookeeper for metadata.
158+ // this is to support the zk to kRaft migration where both zookeeper and kRaft controllers are running in parallel.
159+ if shouldUseKRaftModeForBroker (brokerReadOnlyConfig ) {
160+ if err := config .Set (kafkautils .KafkaConfigNodeID , brokerID ); err != nil {
161+ log .Error (err , fmt .Sprintf (kafkautils .BrokerConfigErrorMsgTemplate , kafkautils .KafkaConfigNodeID ))
162+ }
159163
160- if err := config .Set (kafkautils .KafkaConfigControllerQuorumVoters , quorumVoters ); err != nil {
161- log .Error (err , fmt .Sprintf (kafkautils .BrokerConfigErrorMsgTemplate , kafkautils .KafkaConfigControllerQuorumVoters ))
164+ if err := config .Set (kafkautils .KafkaConfigProcessRoles , bConfig .Roles ); err != nil {
165+ log .Error (err , fmt .Sprintf (kafkautils .BrokerConfigErrorMsgTemplate , kafkautils .KafkaConfigProcessRoles ))
166+ }
167+ } else { // use zk mode for broker.
168+ // when in zk mode, "broker.id" and "zookeeper.connect" are configured so it will communicate with zookeeper
169+ // control.plane.listener.name will not be set in zk mode. There for it will default to the interbroker listener.
170+ if err := config .Set (kafkautils .KafkaConfigBrokerID , brokerID ); err != nil {
171+ log .Error (err , fmt .Sprintf (kafkautils .BrokerConfigErrorMsgTemplate , kafkautils .KafkaConfigBrokerID ))
172+ }
173+
174+ if err := config .Set (kafkautils .KafkaConfigZooKeeperConnect , zookeeperutils .PrepareConnectionAddress (
175+ kafkaCluster .Spec .ZKAddresses , kafkaCluster .Spec .GetZkPath ())); err != nil {
176+ log .Error (err , fmt .Sprintf (kafkautils .BrokerConfigErrorMsgTemplate , kafkautils .KafkaConfigZooKeeperConnect ))
177+ }
162178 }
163179
164- controllerListenerName := generateControlPlaneListener (kafkaCluster .Spec .ListenersConfig .InternalListeners )
165- if controllerListenerName != "" {
166- if err := config .Set (kafkautils .KafkaConfigControllerListenerName , controllerListenerName ); err != nil {
167- log .Error (err , fmt .Sprintf (kafkautils .BrokerConfigErrorMsgTemplate , kafkautils .KafkaConfigControllerListenerName ))
180+ if shouldConfigureControllerQuorumForBroker (brokerReadOnlyConfig ) {
181+ if err := config .Set (kafkautils .KafkaConfigControllerQuorumVoters , quorumVoters ); err != nil {
182+ log .Error (err , fmt .Sprintf (kafkautils .BrokerConfigErrorMsgTemplate , kafkautils .KafkaConfigControllerQuorumVoters ))
183+ }
184+
185+ if controllerListenerName != "" {
186+ if err := config .Set (kafkautils .KafkaConfigControllerListenerName , controllerListenerName ); err != nil {
187+ log .Error (err , fmt .Sprintf (kafkautils .BrokerConfigErrorMsgTemplate , kafkautils .KafkaConfigControllerListenerName ))
188+ }
168189 }
169190 }
170191
@@ -214,6 +235,20 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf
214235 }
215236}
216237
238+ // Returns true by default (not in migration configured) OR when MigrationBrokerKRaftMode is set and 'true'.
239+ // this is to support the zk to kRaft migration
240+ func shouldUseKRaftModeForBroker (brokerReadOnlyConfig * properties.Properties ) bool {
241+ migrationBrokerKRaftMode , found := brokerReadOnlyConfig .Get (kafkautils .MigrationBrokerKRaftMode )
242+ return ! found || migrationBrokerKRaftMode .Value () == "true"
243+ }
244+
245+ // Returns true by default (not in migration) OR when MigrationBrokerControllerQuorumConfigEnabled is set and 'true'.
246+ // this is to support the zk to kRaft migration
247+ func shouldConfigureControllerQuorumForBroker (brokerReadOnlyConfig * properties.Properties ) bool {
248+ migrationBrokerControllerQuorumConfigEnabled , found := brokerReadOnlyConfig .Get (kafkautils .MigrationBrokerControllerQuorumConfigEnabled )
249+ return ! found || migrationBrokerControllerQuorumConfigEnabled .Value () == "true"
250+ }
251+
217252func configureBrokerZKMode (brokerID int32 , kafkaCluster * v1beta1.KafkaCluster , config * properties.Properties ,
218253 serverPasses map [string ]string , extListenerStatuses , intListenerStatuses ,
219254 controllerIntListenerStatuses map [string ]v1beta1.ListenerStatusList , log logr.Logger ) {
@@ -547,6 +582,10 @@ func (r Reconciler) generateBrokerConfig(broker v1beta1.Broker, brokerConfig *v1
547582 finalBrokerConfig .Merge (opGenConf )
548583 }
549584
585+ // Remove the migration broker configuration since its only used as flags to derive other configs
586+ finalBrokerConfig .Delete (kafkautils .MigrationBrokerControllerQuorumConfigEnabled )
587+ finalBrokerConfig .Delete (kafkautils .MigrationBrokerKRaftMode )
588+
550589 finalBrokerConfig .Sort ()
551590
552591 return finalBrokerConfig .String ()
0 commit comments