Skip to content

Commit b2c9776

Browse files
author
Cameron Wright
committed
fixing code that checks for inter.broker.listener.name
1 parent 79349dd commit b2c9776

File tree

1 file changed

+50
-11
lines changed

1 file changed

+50
-11
lines changed

pkg/resources/kafka/configmap.go

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,16 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v
5151
// }
5252

5353
// Add listener configuration
54-
listenerConf, _ := generateListenerSpecificConfig(&r.KafkaCluster.Spec, serverPasses, log)
55-
config.Merge(listenerConf)
54+
generalConfig, brokerConfigs, _ := generateListenerSpecificConfig(&r.KafkaCluster.Spec, serverPasses, log)
55+
56+
brokerConfig, exists := brokerConfigs[broker.Id]
57+
if !exists {
58+
log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", broker.Id))
59+
brokerConfig = properties.NewProperties()
60+
}
61+
62+
config.Merge(brokerConfig)
63+
config.Merge(generalConfig)
5664

5765
// Cruise Control metrics reporter configuration
5866
r.configCCMetricsReporter(broker, config, clientPass, log)
@@ -198,9 +206,19 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf
198206
}
199207

200208
// Add listener configuration
201-
listenerConf, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log)
202-
config.Merge(listenerConf)
209+
generalConfig, brokerConfigs, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log)
210+
211+
brokerConfig, exists := brokerConfigs[brokerID]
212+
if !exists {
213+
log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID))
214+
brokerConfig = properties.NewProperties()
215+
} else {
216+
log.Info("Applying listener-specific config for broker",
217+
"brokerID", brokerID, "config", brokerConfig.String())
218+
}
203219

220+
config.Merge(brokerConfig)
221+
config.Merge(generalConfig)
204222
var advertisedListenerConf []string
205223
// only expose "advertised.listeners" when the node serves as a regular broker or a combined node
206224
if bConfig.IsBrokerNode() {
@@ -257,9 +275,16 @@ func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, c
257275
}
258276

259277
// Add listener configuration
260-
listenerConf, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log)
261-
config.Merge(listenerConf)
278+
generalConfig, brokerConfigs, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log)
262279

280+
brokerConfig, exists := brokerConfigs[brokerID]
281+
if !exists {
282+
log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", brokerID))
283+
brokerConfig = properties.NewProperties()
284+
}
285+
286+
config.Merge(brokerConfig)
287+
config.Merge(generalConfig)
263288
// Add advertised listener configuration
264289
advertisedListenerConf := generateAdvertisedListenerConfig(brokerID, kafkaCluster.Spec.ListenersConfig,
265290
extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses)
@@ -403,8 +428,9 @@ func generateControlPlaneListener(iListeners []v1beta1.InternalListenerConfig) s
403428
return controlPlaneListener
404429
}
405430

406-
func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses map[string]string, log logr.Logger) (*properties.Properties, []string) {
431+
func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses map[string]string, log logr.Logger) (*properties.Properties, map[int32]*properties.Properties, []string) {
407432
config := properties.NewProperties()
433+
brokerConfigs := make(map[int32]*properties.Properties)
408434

409435
l := kcs.ListenersConfig
410436
//r := kcs.ReadOnlyConfig
@@ -428,19 +454,32 @@ func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses
428454
}
429455

430456
for _, broker := range kcs.Brokers {
457+
brokerConfig := properties.NewProperties()
458+
brokerID := broker.Id
459+
431460
b := broker.ReadOnlyConfig
432-
if !strings.Contains(b, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") {
433-
if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil {
434-
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted in an error", kafkautils.KafkaConfigInterBrokerListenerName))
461+
trimmedConfig := strings.TrimSpace(b)
462+
463+
if strings.Contains(trimmedConfig, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") {
464+
log.Info("Security InterBrokerProtocol is set for this broker, skipping config update", "broker", broker)
465+
} else {
466+
log.Info("Security InterBrokerProtocol NOT found for broker, setting inter.broker.listener.name",
467+
"interBrokerListenerName", interBrokerListenerName, "broker", broker)
468+
469+
if err := brokerConfig.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil {
470+
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted in an error",
471+
kafkautils.KafkaConfigInterBrokerListenerName))
435472
}
436473
}
474+
475+
brokerConfigs[brokerID] = brokerConfig
437476
}
438477

439478
if err := config.Set(kafkautils.KafkaConfigListeners, listenerConfig); err != nil {
440479
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListeners))
441480
}
442481

443-
return config, listenerConfig
482+
return config, brokerConfigs, listenerConfig
444483
}
445484

446485
func getListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[string]string, log logr.Logger) (string, []string, []string, map[string]string, map[string]string) {

0 commit comments

Comments
 (0)