Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,20 @@ func expectDefaultBrokerSettingsForExternalListenerBinding(ctx context.Context,
if kafkaCluster.Spec.KRaftMode {
switch broker.Id {
case 0:
expectedListeners = "INTERNAL://:29092,TEST://:9094"
expectedListeners = "TEST://:9094,INTERNAL://:29092"
case 1:
expectedListeners = "CONTROLLER://:29093"
case 2:
expectedListeners = "INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094"
expectedListeners = "TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093"
}
} else {
expectedListeners = "INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094"
expectedListeners = "TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093"
}
Expect(listeners.Value()).To(Equal(expectedListeners))

listenerSecMap, found := brokerConfig.Get(kafkautils.KafkaConfigListenerSecurityProtocolMap)
Expect(found).To(BeTrue())
Expect(listenerSecMap.Value()).To(Equal("INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT"))
Expect(listenerSecMap.Value()).To(Equal("TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"))
// check service
service := corev1.Service{}
Eventually(ctx, func() error {
Expand Down
14 changes: 7 additions & 7 deletions controllers/tests/kafkacluster_controller_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ controller.quorum.voters=1@%s-1.%s.svc.cluster.local:29093,2@%s-2.%s.svc.cluster
cruise.control.metrics.reporter.bootstrap.servers=%s-all-broker.%s.svc.cluster.local:29092
cruise.control.metrics.reporter.kubernetes.mode=true
inter.broker.listener.name=INTERNAL
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT
listeners=INTERNAL://:29092,TEST://:9094
listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=TEST://:9094,INTERNAL://:29092
log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
node.id=%d
Expand All @@ -265,7 +265,7 @@ controller.quorum.voters=1@%s-1.%s.svc.cluster.local:29093,2@%s-2.%s.svc.cluster
cruise.control.metrics.reporter.bootstrap.servers=%s-all-broker.%s.svc.cluster.local:29092
cruise.control.metrics.reporter.kubernetes.mode=true
inter.broker.listener.name=INTERNAL
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT
listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=CONTROLLER://:29093
log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
Expand All @@ -283,8 +283,8 @@ controller.quorum.voters=1@%s-1.%s.svc.cluster.local:29093,2@%s-2.%s.svc.cluster
cruise.control.metrics.reporter.bootstrap.servers=%s-all-broker.%s.svc.cluster.local:29092
cruise.control.metrics.reporter.kubernetes.mode=true
inter.broker.listener.name=INTERNAL
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT
listeners=INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094
listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093
log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
node.id=%d
Expand All @@ -299,8 +299,8 @@ control.plane.listener.name=CONTROLLER
cruise.control.metrics.reporter.bootstrap.servers=kafkacluster-1-all-broker.kafka-1.svc.cluster.local:29092
cruise.control.metrics.reporter.kubernetes.mode=true
inter.broker.listener.name=INTERNAL
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT
listeners=INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094
listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
listeners=TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093
log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
zookeeper.connect=/
Expand Down
22 changes: 11 additions & 11 deletions pkg/resources/kafka/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,17 @@ func getListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[stri
internalListenerSSLConfig = make(map[string]string)
externalListenerSSLConfig = make(map[string]string)

for _, eListener := range l.ExternalListeners {
upperedListenerType := eListener.Type.ToUpperString()
upperedListenerName := strings.ToUpper(eListener.Name)
securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", upperedListenerName, upperedListenerType))
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", upperedListenerName, eListener.ContainerPort))
// Add external listeners SSL configuration
if eListener.Type == v1beta1.SecurityProtocolSSL {
maps.Copy(externalListenerSSLConfig, generateListenerSSLConfig(eListener.Name, eListener.SSLClientAuth, serverPasses[eListener.Name]))
}
}

for _, iListener := range l.InternalListeners {
if iListener.UsedForInnerBrokerCommunication {
if interBrokerListenerName == "" {
Expand All @@ -436,17 +447,6 @@ func getListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[stri
}
}

for _, eListener := range l.ExternalListeners {
upperedListenerType := eListener.Type.ToUpperString()
upperedListenerName := strings.ToUpper(eListener.Name)
securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", upperedListenerName, upperedListenerType))
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", upperedListenerName, eListener.ContainerPort))
// Add external listeners SSL configuration
if eListener.Type == v1beta1.SecurityProtocolSSL {
maps.Copy(externalListenerSSLConfig, generateListenerSSLConfig(eListener.Name, eListener.SSLClientAuth, serverPasses[eListener.Name]))
}
}

return interBrokerListenerName, securityProtocolMapConfig, listenerConfig, internalListenerSSLConfig, externalListenerSSLConfig
}

Expand Down
Loading