@@ -407,16 +407,7 @@ func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses
407407 config := properties .NewProperties ()
408408
409409 l := kcs .ListenersConfig
410- r := kcs .ReadOnlyConfig
411-
412- securityProtocolSet := false
413- for _ , broker := range kcs .Brokers {
414- b := broker .ReadOnlyConfig
415- if strings .Contains (b , kafkautils .KafkaConfigSecurityInterBrokerProtocol + "=" ) {
416- securityProtocolSet = true
417- break
418- }
419- }
410+ //r := kcs.ReadOnlyConfig
420411
421412 interBrokerListenerName , securityProtocolMapConfig , listenerConfig , internalListenerSSLConfig , externalListenerSSLConfig := getListenerSpecificConfig (& l , serverPasses , log )
422413
@@ -436,10 +427,11 @@ func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses
436427 log .Error (err , fmt .Sprintf ("setting '%s' parameter in broker configuration resulted an error" , kafkautils .KafkaConfigListenerSecurityProtocolMap ))
437428 }
438429
439- if ! securityProtocolSet {
440- if ! strings .Contains (r , kafkautils .KafkaConfigSecurityInterBrokerProtocol + "=" ) {
430+ for _ , broker := range kcs .Brokers {
431+ b := broker .ReadOnlyConfig
432+ if ! strings .Contains (b , kafkautils .KafkaConfigSecurityInterBrokerProtocol + "=" ) {
441433 if err := config .Set (kafkautils .KafkaConfigInterBrokerListenerName , interBrokerListenerName ); err != nil {
442- log .Error (err , fmt .Sprintf ("setting '%s' parameter in broker configuration resulted an error" , kafkautils .KafkaConfigInterBrokerListenerName ))
434+ log .Error (err , fmt .Sprintf ("setting '%s' parameter in broker configuration resulted in an error" , kafkautils .KafkaConfigInterBrokerListenerName ))
443435 }
444436 }
445437 }
0 commit comments