Skip to content

Commit c96e846

Browse files
authored
Ensure externallisteners are listed first (#97)
1 parent 1b56507 commit c96e846

File tree

3 files changed

+22
-22
lines changed

3 files changed

+22
-22
lines changed

controllers/tests/kafkacluster_controller_externallistenerbindings_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,20 +81,20 @@ func expectDefaultBrokerSettingsForExternalListenerBinding(ctx context.Context,
8181
if kafkaCluster.Spec.KRaftMode {
8282
switch broker.Id {
8383
case 0:
84-
expectedListeners = "INTERNAL://:29092,TEST://:9094"
84+
expectedListeners = "TEST://:9094,INTERNAL://:29092"
8585
case 1:
8686
expectedListeners = "CONTROLLER://:29093"
8787
case 2:
88-
expectedListeners = "INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094"
88+
expectedListeners = "TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093"
8989
}
9090
} else {
91-
expectedListeners = "INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094"
91+
expectedListeners = "TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093"
9292
}
9393
Expect(listeners.Value()).To(Equal(expectedListeners))
9494

9595
listenerSecMap, found := brokerConfig.Get(kafkautils.KafkaConfigListenerSecurityProtocolMap)
9696
Expect(found).To(BeTrue())
97-
Expect(listenerSecMap.Value()).To(Equal("INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT"))
97+
Expect(listenerSecMap.Value()).To(Equal("TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"))
9898
// check service
9999
service := corev1.Service{}
100100
Eventually(ctx, func() error {

controllers/tests/kafkacluster_controller_kafka_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,8 @@ controller.quorum.voters=1@%s-1.%s.svc.cluster.local:29093,2@%s-2.%s.svc.cluster
248248
cruise.control.metrics.reporter.bootstrap.servers=%s-all-broker.%s.svc.cluster.local:29092
249249
cruise.control.metrics.reporter.kubernetes.mode=true
250250
inter.broker.listener.name=INTERNAL
251-
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT
252-
listeners=INTERNAL://:29092,TEST://:9094
251+
listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
252+
listeners=TEST://:9094,INTERNAL://:29092
253253
log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka
254254
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
255255
node.id=%d
@@ -265,7 +265,7 @@ controller.quorum.voters=1@%s-1.%s.svc.cluster.local:29093,2@%s-2.%s.svc.cluster
265265
cruise.control.metrics.reporter.bootstrap.servers=%s-all-broker.%s.svc.cluster.local:29092
266266
cruise.control.metrics.reporter.kubernetes.mode=true
267267
inter.broker.listener.name=INTERNAL
268-
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT
268+
listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
269269
listeners=CONTROLLER://:29093
270270
log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka
271271
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
@@ -283,8 +283,8 @@ controller.quorum.voters=1@%s-1.%s.svc.cluster.local:29093,2@%s-2.%s.svc.cluster
283283
cruise.control.metrics.reporter.bootstrap.servers=%s-all-broker.%s.svc.cluster.local:29092
284284
cruise.control.metrics.reporter.kubernetes.mode=true
285285
inter.broker.listener.name=INTERNAL
286-
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT
287-
listeners=INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094
286+
listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
287+
listeners=TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093
288288
log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka
289289
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
290290
node.id=%d
@@ -299,8 +299,8 @@ control.plane.listener.name=CONTROLLER
299299
cruise.control.metrics.reporter.bootstrap.servers=kafkacluster-1-all-broker.kafka-1.svc.cluster.local:29092
300300
cruise.control.metrics.reporter.kubernetes.mode=true
301301
inter.broker.listener.name=INTERNAL
302-
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT
303-
listeners=INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094
302+
listener.security.protocol.map=TEST:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
303+
listeners=TEST://:9094,INTERNAL://:29092,CONTROLLER://:29093
304304
log.dirs=/kafka-logs/kafka,/ephemeral-dir1/kafka
305305
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
306306
zookeeper.connect=/

pkg/resources/kafka/configmap.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,17 @@ func getListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[stri
417417
internalListenerSSLConfig = make(map[string]string)
418418
externalListenerSSLConfig = make(map[string]string)
419419

420+
for _, eListener := range l.ExternalListeners {
421+
upperedListenerType := eListener.Type.ToUpperString()
422+
upperedListenerName := strings.ToUpper(eListener.Name)
423+
securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", upperedListenerName, upperedListenerType))
424+
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", upperedListenerName, eListener.ContainerPort))
425+
// Add external listeners SSL configuration
426+
if eListener.Type == v1beta1.SecurityProtocolSSL {
427+
maps.Copy(externalListenerSSLConfig, generateListenerSSLConfig(eListener.Name, eListener.SSLClientAuth, serverPasses[eListener.Name]))
428+
}
429+
}
430+
420431
for _, iListener := range l.InternalListeners {
421432
if iListener.UsedForInnerBrokerCommunication {
422433
if interBrokerListenerName == "" {
@@ -436,17 +447,6 @@ func getListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[stri
436447
}
437448
}
438449

439-
for _, eListener := range l.ExternalListeners {
440-
upperedListenerType := eListener.Type.ToUpperString()
441-
upperedListenerName := strings.ToUpper(eListener.Name)
442-
securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", upperedListenerName, upperedListenerType))
443-
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", upperedListenerName, eListener.ContainerPort))
444-
// Add external listeners SSL configuration
445-
if eListener.Type == v1beta1.SecurityProtocolSSL {
446-
maps.Copy(externalListenerSSLConfig, generateListenerSSLConfig(eListener.Name, eListener.SSLClientAuth, serverPasses[eListener.Name]))
447-
}
448-
}
449-
450450
return interBrokerListenerName, securityProtocolMapConfig, listenerConfig, internalListenerSSLConfig, externalListenerSSLConfig
451451
}
452452

0 commit comments

Comments
 (0)