From 147c13bec710aca567f9a97e6a7a3f111bd47b90 Mon Sep 17 00:00:00 2001 From: Cameron Wright Date: Wed, 26 Feb 2025 00:52:01 -0500 Subject: [PATCH 1/9] adding fix for koperator to add interBrokerListenerName one at a time --- pkg/resources/kafka/configmap.go | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index f5c2db1e3..c4c3b6486 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -407,16 +407,7 @@ func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses config := properties.NewProperties() l := kcs.ListenersConfig - r := kcs.ReadOnlyConfig - - securityProtocolSet := false - for _, broker := range kcs.Brokers { - b := broker.ReadOnlyConfig - if strings.Contains(b, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") { - securityProtocolSet = true - break - } - } + //r := kcs.ReadOnlyConfig interBrokerListenerName, securityProtocolMapConfig, listenerConfig, internalListenerSSLConfig, externalListenerSSLConfig := getListenerSpecificConfig(&l, serverPasses, log) @@ -436,10 +427,11 @@ func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListenerSecurityProtocolMap)) } - if !securityProtocolSet { - if !strings.Contains(r, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") { + for _, broker := range kcs.Brokers { + b := broker.ReadOnlyConfig + if !strings.Contains(b, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") { if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil { - log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigInterBrokerListenerName)) + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted in an error", kafkautils.KafkaConfigInterBrokerListenerName)) } } } From 79349dd4f16911ae1170466069de2e6bae68580a Mon Sep 17 00:00:00 2001 From: Cameron Wright Date: Wed, 26 Feb 2025 13:42:36 -0500 Subject: [PATCH 2/9] updating test to include inter.broker.listener.name --- pkg/resources/kafka/configmap_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/resources/kafka/configmap_test.go b/pkg/resources/kafka/configmap_test.go index e23f27e9a..caef59341 100644 --- a/pkg/resources/kafka/configmap_test.go +++ b/pkg/resources/kafka/configmap_test.go @@ -622,6 +622,7 @@ cruise.control.metrics.reporter.kubernetes.mode=true listener.security.protocol.map=INTERNAL:PLAINTEXT listeners=INTERNAL://:9092 security.inter.broker.protocol=SASL_SSL +inter.broker.listener.name=INTERNAL zookeeper.connect=example.zk:2181/`, }, } From b2c9776b738c77c8730ff8e5b8977ce666363387 Mon Sep 17 00:00:00 2001 From: Cameron Wright Date: Wed, 26 Feb 2025 18:30:50 -0500 Subject: [PATCH 3/9] fixing code that checks for inter.broker.listener.name --- pkg/resources/kafka/configmap.go | 61 ++++++++++++++++++++++++++------ 1 file changed, 50 insertions(+), 11 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index c4c3b6486..c4da69e28 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -51,8 +51,16 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v // } // Add listener configuration - listenerConf, _ := generateListenerSpecificConfig(&r.KafkaCluster.Spec, serverPasses, log) - config.Merge(listenerConf) + generalConfig, brokerConfigs, _ := generateListenerSpecificConfig(&r.KafkaCluster.Spec, serverPasses, log) + + brokerConfig, exists := brokerConfigs[broker.Id] + if !exists { + log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", broker.Id)) + brokerConfig = properties.NewProperties() + } + + config.Merge(brokerConfig) + config.Merge(generalConfig) // Cruise Control metrics reporter configuration r.configCCMetricsReporter(broker, config, clientPass, log) @@ -198,9 +206,19 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf } // Add listener configuration - listenerConf, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) - config.Merge(listenerConf) + generalConfig, brokerConfigs, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) + + brokerConfig, exists := brokerConfigs[brokerID] + if !exists { + log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) + brokerConfig = properties.NewProperties() + } else { + log.Info("Applying listener-specific config for broker", + "brokerID", brokerID, "config", brokerConfig.String()) + } + config.Merge(brokerConfig) + config.Merge(generalConfig) var advertisedListenerConf []string // only expose "advertised.listeners" when the node serves as a regular broker or a combined node if bConfig.IsBrokerNode() { @@ -257,9 +275,16 @@ func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, c } // Add listener configuration - listenerConf, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) - config.Merge(listenerConf) + generalConfig, brokerConfigs, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) + brokerConfig, exists := brokerConfigs[brokerID] + if !exists { + log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) + brokerConfig = properties.NewProperties() + } + + config.Merge(brokerConfig) + config.Merge(generalConfig) // Add advertised listener configuration advertisedListenerConf := generateAdvertisedListenerConfig(brokerID, kafkaCluster.Spec.ListenersConfig, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses) @@ -403,8 +428,9 @@ func generateControlPlaneListener(iListeners []v1beta1.InternalListenerConfig) s return controlPlaneListener } -func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses map[string]string, log logr.Logger) (*properties.Properties, []string) { +func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses map[string]string, log logr.Logger) (*properties.Properties, map[int32]*properties.Properties, []string) { config := properties.NewProperties() + brokerConfigs := make(map[int32]*properties.Properties) l := kcs.ListenersConfig //r := kcs.ReadOnlyConfig @@ -428,19 +454,32 @@ func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses } for _, broker := range kcs.Brokers { + brokerConfig := properties.NewProperties() + brokerID := broker.Id + b := broker.ReadOnlyConfig - if !strings.Contains(b, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") { - if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil { - log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted in an error", kafkautils.KafkaConfigInterBrokerListenerName)) + trimmedConfig := strings.TrimSpace(b) + + if strings.Contains(trimmedConfig, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") { + log.Info("Security InterBrokerProtocol is set for this broker, skipping config update", "broker", broker) + } else { + log.Info("Security InterBrokerProtocol NOT found for broker, setting inter.broker.listener.name", + "interBrokerListenerName", interBrokerListenerName, "broker", broker) + + if err := brokerConfig.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted in an error", + kafkautils.KafkaConfigInterBrokerListenerName)) } } + + brokerConfigs[brokerID] = brokerConfig } if err := config.Set(kafkautils.KafkaConfigListeners, listenerConfig); err != nil { log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListeners)) } - return config, listenerConfig + return config, brokerConfigs, listenerConfig } func getListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[string]string, log logr.Logger) (string, []string, []string, map[string]string, map[string]string) { From 866107998f6eaef102cb54328241c3ebf50d45c1 Mon Sep 17 00:00:00 2001 From: Cameron Wright Date: Thu, 27 Feb 2025 10:34:17 -0500 Subject: [PATCH 4/9] removing unecessary function calls --- pkg/resources/kafka/configmap.go | 40 +++++++++++++++++--------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index c4da69e28..8abe60027 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -206,19 +206,20 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf } // Add listener configuration - generalConfig, brokerConfigs, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) + _, _, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) + + // brokerConfig, exists := brokerConfigs[brokerID] + // if !exists { + // log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) + // brokerConfig = properties.NewProperties() + // } else { + // log.Info("Applying listener-specific config for broker", + // "brokerID", brokerID, "config", brokerConfig.String()) + // } - brokerConfig, exists := brokerConfigs[brokerID] - if !exists { - log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) - brokerConfig = properties.NewProperties() - } else { - log.Info("Applying listener-specific config for broker", - "brokerID", brokerID, "config", brokerConfig.String()) - } + // config.Merge(brokerConfig) + // config.Merge(generalConfig) - config.Merge(brokerConfig) - config.Merge(generalConfig) var advertisedListenerConf []string // only expose "advertised.listeners" when the node serves as a regular broker or a combined node if bConfig.IsBrokerNode() { @@ -275,16 +276,17 @@ func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, c } // Add listener configuration - generalConfig, brokerConfigs, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) + // generalConfig, brokerConfigs, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) - brokerConfig, exists := brokerConfigs[brokerID] - if !exists { - log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) - brokerConfig = properties.NewProperties() - } + // brokerConfig, exists := brokerConfigs[brokerID] + // if !exists { + // log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) + // brokerConfig = properties.NewProperties() + // } + + // config.Merge(brokerConfig) + // config.Merge(generalConfig) - config.Merge(brokerConfig) - config.Merge(generalConfig) // Add advertised listener configuration advertisedListenerConf := generateAdvertisedListenerConfig(brokerID, kafkaCluster.Spec.ListenersConfig, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses) From a71a575951fef829c1accfa93340c099192b6e9a Mon Sep 17 00:00:00 2001 From: Cameron Wright Date: Thu, 27 Feb 2025 10:54:02 -0500 Subject: [PATCH 5/9] updating test expectations to be alphabetized --- pkg/resources/kafka/configmap.go | 5 ++--- pkg/resources/kafka/configmap_test.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 8abe60027..61ecf52a6 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -71,7 +71,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v if r.KafkaCluster.Spec.KRaftMode { configureBrokerKRaftMode(bConfig, broker.Id, r.KafkaCluster, config, quorumVoters, serverPasses, extListenerStatuses, intListenerStatuses, log, brokerReadOnlyConfig) } else { - configureBrokerZKMode(broker.Id, r.KafkaCluster, config, serverPasses, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses, log) + configureBrokerZKMode(broker.Id, r.KafkaCluster, config, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses, log) } // This logic prevents the removal of the mountPath from the broker configmap @@ -268,8 +268,7 @@ func shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig *properties.P return !found || migrationBrokerControllerQuorumConfigEnabled.Value() == "true" } -func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties, - serverPasses map[string]string, extListenerStatuses, intListenerStatuses, +func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger) { if err := config.Set(kafkautils.KafkaConfigBrokerID, brokerID); err != nil { log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigBrokerID)) diff --git a/pkg/resources/kafka/configmap_test.go b/pkg/resources/kafka/configmap_test.go index caef59341..aacb31bdb 100644 --- a/pkg/resources/kafka/configmap_test.go +++ b/pkg/resources/kafka/configmap_test.go @@ -619,10 +619,10 @@ zookeeper.connect=example.zk:2181/`, broker.id=0 cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 cruise.control.metrics.reporter.kubernetes.mode=true +inter.broker.listener.name=INTERNAL listener.security.protocol.map=INTERNAL:PLAINTEXT listeners=INTERNAL://:9092 security.inter.broker.protocol=SASL_SSL -inter.broker.listener.name=INTERNAL zookeeper.connect=example.zk:2181/`, }, } From 33b3458c2c396cb92296bd442c49f2d15aeb939c Mon Sep 17 00:00:00 2001 From: Cameron Wright Date: Thu, 27 Feb 2025 11:27:47 -0500 Subject: [PATCH 6/9] adding generateListenerSpecificConfig back --- pkg/resources/kafka/configmap.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 61ecf52a6..ca069a29e 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -71,7 +71,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v if r.KafkaCluster.Spec.KRaftMode { configureBrokerKRaftMode(bConfig, broker.Id, r.KafkaCluster, config, quorumVoters, serverPasses, extListenerStatuses, intListenerStatuses, log, brokerReadOnlyConfig) } else { - configureBrokerZKMode(broker.Id, r.KafkaCluster, config, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses, log) + configureBrokerZKMode(broker.Id, r.KafkaCluster, config, serverPasses, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses, log) } // This logic prevents the removal of the mountPath from the broker configmap @@ -268,14 +268,14 @@ func shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig *properties.P return !found || migrationBrokerControllerQuorumConfigEnabled.Value() == "true" } -func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties, extListenerStatuses, intListenerStatuses, +func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties, serverPasses map[string]string, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger) { if err := config.Set(kafkautils.KafkaConfigBrokerID, brokerID); err != nil { log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigBrokerID)) } // Add listener configuration - // generalConfig, brokerConfigs, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) + generalConfig, _, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) // brokerConfig, exists := brokerConfigs[brokerID] // if !exists { @@ -284,7 +284,7 @@ func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, c // } // config.Merge(brokerConfig) - // config.Merge(generalConfig) + config.Merge(generalConfig) // Add advertised listener configuration advertisedListenerConf := generateAdvertisedListenerConfig(brokerID, kafkaCluster.Spec.ListenersConfig, From f118a442e4084d5526c5a2c6cad675d38d9566ae Mon Sep 17 00:00:00 2001 From: Cameron Wright Date: Thu, 27 Feb 2025 11:40:15 -0500 Subject: [PATCH 7/9] adding merges back in --- pkg/resources/kafka/configmap.go | 38 ++++++++++++++++---------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index ca069a29e..624c0dd01 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -206,19 +206,19 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf } // Add listener configuration - _, _, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) - - // brokerConfig, exists := brokerConfigs[brokerID] - // if !exists { - // log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) - // brokerConfig = properties.NewProperties() - // } else { - // log.Info("Applying listener-specific config for broker", - // "brokerID", brokerID, "config", brokerConfig.String()) - // } + generalConfig, brokerConfigs, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) - // config.Merge(brokerConfig) - // config.Merge(generalConfig) + brokerConfig, exists := brokerConfigs[brokerID] + if !exists { + log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) + brokerConfig = properties.NewProperties() + } else { + log.Info("Applying listener-specific config for broker", + "brokerID", brokerID, "config", brokerConfig.String()) + } + + config.Merge(brokerConfig) + config.Merge(generalConfig) var advertisedListenerConf []string // only expose "advertised.listeners" when the node serves as a regular broker or a combined node @@ -275,15 +275,15 @@ func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, c } // Add listener configuration - generalConfig, _, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) + generalConfig, brokerConfigs, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) - // brokerConfig, exists := brokerConfigs[brokerID] - // if !exists { - // log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) - // brokerConfig = properties.NewProperties() - // } + brokerConfig, exists := brokerConfigs[brokerID] + if !exists { + log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) + brokerConfig = properties.NewProperties() + } - // config.Merge(brokerConfig) + config.Merge(brokerConfig) config.Merge(generalConfig) // Add advertised listener configuration From 4c79c755400d86059b1870aed9aa6f6e0fed4837 Mon Sep 17 00:00:00 2001 From: Cameron Wright Date: Thu, 27 Feb 2025 14:18:57 -0500 Subject: [PATCH 8/9] reverting back to having getConfigProperties only call generateListenerSpecificConfig --- pkg/resources/kafka/configmap.go | 44 ++++++++++++++++---------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 624c0dd01..f62e5405a 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -71,7 +71,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v if r.KafkaCluster.Spec.KRaftMode { configureBrokerKRaftMode(bConfig, broker.Id, r.KafkaCluster, config, quorumVoters, serverPasses, extListenerStatuses, intListenerStatuses, log, brokerReadOnlyConfig) } else { - configureBrokerZKMode(broker.Id, r.KafkaCluster, config, serverPasses, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses, log) + configureBrokerZKMode(broker.Id, r.KafkaCluster, config, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses, log) } // This logic prevents the removal of the mountPath from the broker configmap @@ -205,20 +205,20 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf } } - // Add listener configuration - generalConfig, brokerConfigs, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) + // // Add listener configuration + _, _, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) - brokerConfig, exists := brokerConfigs[brokerID] - if !exists { - log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) - brokerConfig = properties.NewProperties() - } else { - log.Info("Applying listener-specific config for broker", - "brokerID", brokerID, "config", brokerConfig.String()) - } + // brokerConfig, exists := brokerConfigs[brokerID] + // if !exists { + // log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) + // brokerConfig = properties.NewProperties() + // } else { + // log.Info("Applying listener-specific config for broker", + // "brokerID", brokerID, "config", brokerConfig.String()) + // } - config.Merge(brokerConfig) - config.Merge(generalConfig) + // config.Merge(brokerConfig) + // config.Merge(generalConfig) var advertisedListenerConf []string // only expose "advertised.listeners" when the node serves as a regular broker or a combined node @@ -268,23 +268,23 @@ func shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig *properties.P return !found || migrationBrokerControllerQuorumConfigEnabled.Value() == "true" } -func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties, serverPasses map[string]string, extListenerStatuses, intListenerStatuses, +func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger) { if err := config.Set(kafkautils.KafkaConfigBrokerID, brokerID); err != nil { log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigBrokerID)) } // Add listener configuration - generalConfig, brokerConfigs, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) + // generalConfig, brokerConfigs, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) - brokerConfig, exists := brokerConfigs[brokerID] - if !exists { - log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) - brokerConfig = properties.NewProperties() - } + // brokerConfig, exists := brokerConfigs[brokerID] + // if !exists { + // log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) + // brokerConfig = properties.NewProperties() + // } - config.Merge(brokerConfig) - config.Merge(generalConfig) + // config.Merge(brokerConfig) + // config.Merge(generalConfig) // Add advertised listener configuration advertisedListenerConf := generateAdvertisedListenerConfig(brokerID, kafkaCluster.Spec.ListenersConfig, From b1512a2a616f15279f6aef0f6786362a11086e31 Mon Sep 17 00:00:00 2001 From: Cameron Wright Date: Thu, 27 Feb 2025 14:29:22 -0500 Subject: [PATCH 9/9] removing unnecessary comments --- pkg/resources/kafka/configmap.go | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index f62e5405a..a2ce6f527 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -208,18 +208,6 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf // // Add listener configuration _, _, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) - // brokerConfig, exists := brokerConfigs[brokerID] - // if !exists { - // log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) - // brokerConfig = properties.NewProperties() - // } else { - // log.Info("Applying listener-specific config for broker", - // "brokerID", brokerID, "config", brokerConfig.String()) - // } - - // config.Merge(brokerConfig) - // config.Merge(generalConfig) - var advertisedListenerConf []string // only expose "advertised.listeners" when the node serves as a regular broker or a combined node if bConfig.IsBrokerNode() { @@ -274,18 +262,6 @@ func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, c log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigBrokerID)) } - // Add listener configuration - // generalConfig, brokerConfigs, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log) - - // brokerConfig, exists := brokerConfigs[brokerID] - // if !exists { - // log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID)) - // brokerConfig = properties.NewProperties() - // } - - // config.Merge(brokerConfig) - // config.Merge(generalConfig) - // Add advertised listener configuration advertisedListenerConf := generateAdvertisedListenerConfig(brokerID, kafkaCluster.Spec.ListenersConfig, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses)