Skip to content

Commit 672c2f8

Browse files
committed
Add zkConnect property when broker is in zk mode
1 parent 00322db commit 672c2f8

File tree

2 files changed

+13
-0
lines changed

2 files changed

+13
-0
lines changed

pkg/resources/kafka/configmap.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf
175175
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControlPlaneListener))
176176
}
177177
}
178+
179+
if err := config.Set(kafkautils.KafkaConfigZooKeeperConnect, zookeeperutils.PrepareConnectionAddress(
180+
kafkaCluster.Spec.ZKAddresses, kafkaCluster.Spec.GetZkPath())); err != nil {
181+
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigZooKeeperConnect))
182+
}
178183
}
179184

180185
if shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig) {

pkg/resources/kafka/configmap_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,8 @@ func TestGenerateBrokerConfigKRaftMode(t *testing.T) { //nolint funlen
729729
listenersConfig v1beta1.ListenersConfig
730730
internalListenerStatuses map[string]v1beta1.ListenerStatusList
731731
controllerListenerStatus map[string]v1beta1.ListenerStatusList
732+
zkAddresses []string
733+
zkPath string
732734
expectedBrokerConfigs []string
733735
}{
734736
{
@@ -1141,6 +1143,8 @@ process.roles=broker,controller
11411143
},
11421144
},
11431145
},
1146+
zkAddresses: []string{"example.zk:2181"},
1147+
zkPath: "/kafka",
11441148
expectedBrokerConfigs: []string{
11451149
`advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092
11461150
controller.listener.names=CONTROLLER
@@ -1177,6 +1181,7 @@ listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
11771181
listeners=INTERNAL://:9092,CONTROLLER://:9093
11781182
log.dirs=/test-kafka-logs/kafka,/test-kafka-logs-50/kafka,/test-kafka-logs-100/kafka
11791183
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
1184+
zookeeper.connect=example.zk:2181/kafka
11801185
`,
11811186
`advertised.listeners=INTERNAL://kafka-200.kafka.svc.cluster.local:9092
11821187
broker.id=200
@@ -1190,6 +1195,7 @@ listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
11901195
listeners=INTERNAL://:9092
11911196
log.dirs=/test-kafka-logs/kafka
11921197
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
1198+
zookeeper.connect=example.zk:2181/kafka
11931199
`,
11941200
`advertised.listeners=INTERNAL://kafka-300.kafka.svc.cluster.local:9092
11951201
cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092
@@ -1225,6 +1231,8 @@ process.roles=broker
12251231
KRaftMode: true,
12261232
ListenersConfig: test.listenersConfig,
12271233
Brokers: test.brokers,
1234+
ZKAddresses: test.zkAddresses,
1235+
ZKPath: test.zkPath,
12281236
},
12291237
},
12301238
},

0 commit comments

Comments
 (0)