Skip to content

Commit 8feced9

Browse files
cawright-rhCameron Wright
andauthored
adding fix for koperator to add interBrokerListenerName one at a time and check for security.inter.broker.protocol (#101)
* adding fix for koperator to add interBrokerListenerName one at a time * updating test to include inter.broker.listener.name * fixing code that checks for inter.broker.listener.name * removing unecessary function calls * updating test expectations to be alphabetized * adding generateListenerSpecificConfig back * adding merges back in * reverting back to having getConfigProperties only call generateListenerSpecificConfig * removing unnecessary comments --------- Co-authored-by: Cameron Wright <[email protected]>
1 parent 9978dfa commit 8feced9

File tree

2 files changed

+37
-28
lines changed

2 files changed

+37
-28
lines changed

pkg/resources/kafka/configmap.go

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,16 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v
5151
// }
5252

5353
// Add listener configuration
54-
listenerConf, _ := generateListenerSpecificConfig(&r.KafkaCluster.Spec, serverPasses, log)
55-
config.Merge(listenerConf)
54+
generalConfig, brokerConfigs, _ := generateListenerSpecificConfig(&r.KafkaCluster.Spec, serverPasses, log)
55+
56+
brokerConfig, exists := brokerConfigs[broker.Id]
57+
if !exists {
58+
log.Error(nil, fmt.Sprintf("No specific listener config found for broker %d, using default config", broker.Id))
59+
brokerConfig = properties.NewProperties()
60+
}
61+
62+
config.Merge(brokerConfig)
63+
config.Merge(generalConfig)
5664

5765
// Cruise Control metrics reporter configuration
5866
r.configCCMetricsReporter(broker, config, clientPass, log)
@@ -63,7 +71,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v
6371
if r.KafkaCluster.Spec.KRaftMode {
6472
configureBrokerKRaftMode(bConfig, broker.Id, r.KafkaCluster, config, quorumVoters, serverPasses, extListenerStatuses, intListenerStatuses, log, brokerReadOnlyConfig)
6573
} else {
66-
configureBrokerZKMode(broker.Id, r.KafkaCluster, config, serverPasses, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses, log)
74+
configureBrokerZKMode(broker.Id, r.KafkaCluster, config, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses, log)
6775
}
6876

6977
// This logic prevents the removal of the mountPath from the broker configmap
@@ -197,9 +205,8 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf
197205
}
198206
}
199207

200-
// Add listener configuration
201-
listenerConf, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log)
202-
config.Merge(listenerConf)
208+
// // Add listener configuration
209+
_, _, listenerConfig := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log)
203210

204211
var advertisedListenerConf []string
205212
// only expose "advertised.listeners" when the node serves as a regular broker or a combined node
@@ -249,17 +256,12 @@ func shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig *properties.P
249256
return !found || migrationBrokerControllerQuorumConfigEnabled.Value() == "true"
250257
}
251258

252-
func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties,
253-
serverPasses map[string]string, extListenerStatuses, intListenerStatuses,
259+
func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties, extListenerStatuses, intListenerStatuses,
254260
controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger) {
255261
if err := config.Set(kafkautils.KafkaConfigBrokerID, brokerID); err != nil {
256262
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigBrokerID))
257263
}
258264

259-
// Add listener configuration
260-
listenerConf, _ := generateListenerSpecificConfig(&kafkaCluster.Spec, serverPasses, log)
261-
config.Merge(listenerConf)
262-
263265
// Add advertised listener configuration
264266
advertisedListenerConf := generateAdvertisedListenerConfig(brokerID, kafkaCluster.Spec.ListenersConfig,
265267
extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses)
@@ -403,20 +405,12 @@ func generateControlPlaneListener(iListeners []v1beta1.InternalListenerConfig) s
403405
return controlPlaneListener
404406
}
405407

406-
func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses map[string]string, log logr.Logger) (*properties.Properties, []string) {
408+
func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses map[string]string, log logr.Logger) (*properties.Properties, map[int32]*properties.Properties, []string) {
407409
config := properties.NewProperties()
410+
brokerConfigs := make(map[int32]*properties.Properties)
408411

409412
l := kcs.ListenersConfig
410-
r := kcs.ReadOnlyConfig
411-
412-
securityProtocolSet := false
413-
for _, broker := range kcs.Brokers {
414-
b := broker.ReadOnlyConfig
415-
if strings.Contains(b, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") {
416-
securityProtocolSet = true
417-
break
418-
}
419-
}
413+
//r := kcs.ReadOnlyConfig
420414

421415
interBrokerListenerName, securityProtocolMapConfig, listenerConfig, internalListenerSSLConfig, externalListenerSSLConfig := getListenerSpecificConfig(&l, serverPasses, log)
422416

@@ -436,19 +430,33 @@ func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses
436430
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListenerSecurityProtocolMap))
437431
}
438432

439-
if !securityProtocolSet {
440-
if !strings.Contains(r, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") {
441-
if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil {
442-
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigInterBrokerListenerName))
433+
for _, broker := range kcs.Brokers {
434+
brokerConfig := properties.NewProperties()
435+
brokerID := broker.Id
436+
437+
b := broker.ReadOnlyConfig
438+
trimmedConfig := strings.TrimSpace(b)
439+
440+
if strings.Contains(trimmedConfig, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") {
441+
log.Info("Security InterBrokerProtocol is set for this broker, skipping config update", "broker", broker)
442+
} else {
443+
log.Info("Security InterBrokerProtocol NOT found for broker, setting inter.broker.listener.name",
444+
"interBrokerListenerName", interBrokerListenerName, "broker", broker)
445+
446+
if err := brokerConfig.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil {
447+
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted in an error",
448+
kafkautils.KafkaConfigInterBrokerListenerName))
443449
}
444450
}
451+
452+
brokerConfigs[brokerID] = brokerConfig
445453
}
446454

447455
if err := config.Set(kafkautils.KafkaConfigListeners, listenerConfig); err != nil {
448456
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListeners))
449457
}
450458

451-
return config, listenerConfig
459+
return config, brokerConfigs, listenerConfig
452460
}
453461

454462
func getListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[string]string, log logr.Logger) (string, []string, []string, map[string]string, map[string]string) {

pkg/resources/kafka/configmap_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,7 @@ zookeeper.connect=example.zk:2181/`,
619619
broker.id=0
620620
cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092
621621
cruise.control.metrics.reporter.kubernetes.mode=true
622+
inter.broker.listener.name=INTERNAL
622623
listener.security.protocol.map=INTERNAL:PLAINTEXT
623624
listeners=INTERNAL://:9092
624625
security.inter.broker.protocol=SASL_SSL

0 commit comments

Comments
 (0)