Skip to content
80 changes: 56 additions & 24 deletions pkg/resources/kafka/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,16 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v
// }

// Add listener configuration
listenerConf, _ := generateListenerSpecificConfig(&r.KafkaCluster.Spec, serverPasses, log)
config.Merge(listenerConf)
generalConfig, brokerConfigs, _ := generateListenerSpecificConfig(&r.KafkaCluster.Spec, serverPasses, log)

brokerConfig, exists := brokerConfigs[broker.Id]
if !exists {
log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", broker.Id))
brokerConfig = properties.NewProperties()
}

config.Merge(brokerConfig)
config.Merge(generalConfig)

// Cruise Control metrics reporter configuration
r.configCCMetricsReporter(broker, config, clientPass, log)
Expand Down Expand Up @@ -198,8 +206,19 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf
}

// Add listener configuration
listenerConf, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log)
config.Merge(listenerConf)
generalConfig, brokerConfigs, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log)

brokerConfig, exists := brokerConfigs[brokerID]
if !exists {
log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID))
brokerConfig = properties.NewProperties()
} else {
log.Info("Applying listener-specific config for broker",
"brokerID", brokerID, "config", brokerConfig.String())
}

config.Merge(brokerConfig)
config.Merge(generalConfig)

var advertisedListenerConf []string
// only expose "advertised.listeners" when the node serves as a regular broker or a combined node
Expand Down Expand Up @@ -249,16 +268,23 @@ func shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig *properties.P
return !found || migrationBrokerControllerQuorumConfigEnabled.Value() == "true"
}

func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties,
serverPasses map[string]string, extListenerStatuses, intListenerStatuses,
func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties, serverPasses map[string]string, extListenerStatuses, intListenerStatuses,
controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger) {
if err := config.Set(kafkautils.KafkaConfigBrokerID, brokerID); err != nil {
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigBrokerID))
}

// Add listener configuration
listenerConf, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log)
config.Merge(listenerConf)
generalConfig, brokerConfigs, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log)

brokerConfig, exists := brokerConfigs[brokerID]
if !exists {
log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID))
brokerConfig = properties.NewProperties()
}

config.Merge(brokerConfig)
config.Merge(generalConfig)

// Add advertised listener configuration
advertisedListenerConf := generateAdvertisedListenerConfig(brokerID, kafkaCluster.Spec.ListenersConfig,
Expand Down Expand Up @@ -403,20 +429,12 @@ func generateControlPlaneListener(iListeners []v1beta1.InternalListenerConfig) s
return controlPlaneListener
}

func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses map[string]string, log logr.Logger) (*properties.Properties, []string) {
func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses map[string]string, log logr.Logger) (*properties.Properties, map[int32]*properties.Properties, []string) {
config := properties.NewProperties()
brokerConfigs := make(map[int32]*properties.Properties)

l := kcs.ListenersConfig
r := kcs.ReadOnlyConfig

securityProtocolSet := false
for _, broker := range kcs.Brokers {
b := broker.ReadOnlyConfig
if strings.Contains(b, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") {
securityProtocolSet = true
break
}
}
//r := kcs.ReadOnlyConfig

interBrokerListenerName, securityProtocolMapConfig, listenerConfig, internalListenerSSLConfig, externalListenerSSLConfig := getListenerSpecificConfig(&l, serverPasses, log)

Expand All @@ -436,19 +454,33 @@ func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListenerSecurityProtocolMap))
}

if !securityProtocolSet {
if !strings.Contains(r, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") {
if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigInterBrokerListenerName))
for _, broker := range kcs.Brokers {
brokerConfig := properties.NewProperties()
brokerID := broker.Id

b := broker.ReadOnlyConfig
trimmedConfig := strings.TrimSpace(b)

if strings.Contains(trimmedConfig, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") {
log.Info("Security InterBrokerProtocol is set for this broker, skipping config update", "broker", broker)
} else {
log.Info("Security InterBrokerProtocol NOT found for broker, setting inter.broker.listener.name",
"interBrokerListenerName", interBrokerListenerName, "broker", broker)

if err := brokerConfig.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted in an error",
kafkautils.KafkaConfigInterBrokerListenerName))
}
}

brokerConfigs[brokerID] = brokerConfig
}

if err := config.Set(kafkautils.KafkaConfigListeners, listenerConfig); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListeners))
}

return config, listenerConfig
return config, brokerConfigs, listenerConfig
}

func getListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[string]string, log logr.Logger) (string, []string, []string, map[string]string, map[string]string) {
Expand Down
1 change: 1 addition & 0 deletions pkg/resources/kafka/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ zookeeper.connect=example.zk:2181/`,
broker.id=0
cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092
cruise.control.metrics.reporter.kubernetes.mode=true
inter.broker.listener.name=INTERNAL
listener.security.protocol.map=INTERNAL:PLAINTEXT
listeners=INTERNAL://:9092
security.inter.broker.protocol=SASL_SSL
Expand Down
Loading