@@ -374,6 +374,15 @@ func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses
374374 l := kcs .ListenersConfig
375375 r := kcs .ReadOnlyConfig
376376
377+ securityProtocolSet := false
378+ for _ , broker := range kcs .Brokers {
379+ b := broker .ReadOnlyConfig
380+ if strings .Contains (b , kafkautils .KafkaConfigSecurityInterBrokerProtocol + "=" ) {
381+ securityProtocolSet = true
382+ break
383+ }
384+ }
385+
377386 interBrokerListenerName , securityProtocolMapConfig , listenerConfig , internalListenerSSLConfig , externalListenerSSLConfig := getListenerSpecificConfig (& l , serverPasses , log )
378387
379388 for k , v := range internalListenerSSLConfig {
@@ -392,9 +401,11 @@ func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses
392401 log .Error (err , fmt .Sprintf ("setting '%s' parameter in broker configuration resulted an error" , kafkautils .KafkaConfigListenerSecurityProtocolMap ))
393402 }
394403
395- if ! strings .Contains (r , kafkautils .KafkaConfigSecurityInterBrokerProtocol + "=" ) {
396- if err := config .Set (kafkautils .KafkaConfigInterBrokerListenerName , interBrokerListenerName ); err != nil {
397- log .Error (err , fmt .Sprintf ("setting '%s' parameter in broker configuration resulted an error" , kafkautils .KafkaConfigInterBrokerListenerName ))
404+ if ! securityProtocolSet {
405+ if ! strings .Contains (r , kafkautils .KafkaConfigSecurityInterBrokerProtocol + "=" ) {
406+ if err := config .Set (kafkautils .KafkaConfigInterBrokerListenerName , interBrokerListenerName ); err != nil {
407+ log .Error (err , fmt .Sprintf ("setting '%s' parameter in broker configuration resulted an error" , kafkautils .KafkaConfigInterBrokerListenerName ))
408+ }
398409 }
399410 }
400411
0 commit comments