Skip to content

Commit ed46ce3

Browse files
committed
Refactor functions and additional test cases
1 parent 50c8c23 commit ed46ce3

File tree

2 files changed

+99
-13
lines changed

2 files changed

+99
-13
lines changed

pkg/resources/kafka/configmap.go

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -153,35 +153,32 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf
153153
quorumVoters []string, serverPasses map[string]string, extListenerStatuses, intListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger,
154154
brokerReadOnlyConfig *properties.Properties) {
155155

156-
// configure "node.id" and "process.roles" by default OR when MigrationBrokerKRaftMode is set and 'true'.
157-
// this is to support the zk to kRaft migration
158-
if migrationBrokerKRaftMode, found := brokerReadOnlyConfig.Get(kafkautils.MigrationBrokerKRaftMode); !found || migrationBrokerKRaftMode.Value() == "true" {
156+
controllerListenerName := generateControlPlaneListener(kafkaCluster.Spec.ListenersConfig.InternalListeners)
157+
158+
// when kRaft is enabled for the cluster, brokers can still be configured to use zookeeper for metadata.
159+
// this is to support the zk to kRaft migration where both zookeeper and kRaft controllers are running in parallel.
160+
if shouldUseKRaftModeForBroker(brokerReadOnlyConfig) {
159161
if err := config.Set(kafkautils.KafkaConfigNodeID, brokerID); err != nil {
160162
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigNodeID))
161163
}
162164

163165
if err := config.Set(kafkautils.KafkaConfigProcessRoles, bConfig.Roles); err != nil {
164166
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigProcessRoles))
165167
}
166-
} else { // migrationBrokerKRaftMode.Value() == "false" -> use zk mode for broker
167-
// when in zk mode, "broker.id" and "control.plane.listener.name" are set so it can communicate with zookeeper
168+
} else { // use zk mode for broker.
169+
// when in zk mode, "broker.id" and "control.plane.listener.name" are configured so it will communicate with zookeeper
168170
if err := config.Set(kafkautils.KafkaConfigBrokerID, brokerID); err != nil {
169171
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigBrokerID))
170172
}
171173

172-
cclConf := generateControlPlaneListener(kafkaCluster.Spec.ListenersConfig.InternalListeners)
173-
if cclConf != "" {
174-
if err := config.Set(kafkautils.KafkaConfigControlPlaneListener, cclConf); err != nil {
174+
if controllerListenerName != "" {
175+
if err := config.Set(kafkautils.KafkaConfigControlPlaneListener, controllerListenerName); err != nil {
175176
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControlPlaneListener))
176177
}
177178
}
178179
}
179180

180-
controllerListenerName := generateControlPlaneListener(kafkaCluster.Spec.ListenersConfig.InternalListeners)
181-
182-
// configure "controller.quorum.voters" and "controller.listener.names" by default OR when MigrationBrokerControllerQuorumConfigEnabled is set and 'true'.
183-
// this is to support the zk to kRaft migration
184-
if migrationBrokerControllerQuorumConfigEnabled, found := brokerReadOnlyConfig.Get(kafkautils.MigrationBrokerControllerQuorumConfigEnabled); !found || migrationBrokerControllerQuorumConfigEnabled.Value() == "true" {
181+
if shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig) {
185182
if err := config.Set(kafkautils.KafkaConfigControllerQuorumVoters, quorumVoters); err != nil {
186183
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControllerQuorumVoters))
187184
}
@@ -239,6 +236,20 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf
239236
}
240237
}
241238

239+
// Returns true by default (not in migration configured) OR when MigrationBrokerKRaftMode is set and 'true'.
240+
// this is to support the zk to kRaft migration
241+
func shouldUseKRaftModeForBroker(brokerReadOnlyConfig *properties.Properties) bool {
242+
migrationBrokerKRaftMode, found := brokerReadOnlyConfig.Get(kafkautils.MigrationBrokerKRaftMode)
243+
return !found || migrationBrokerKRaftMode.Value() == "true"
244+
}
245+
246+
// Returns true by default (not in migration) OR when MigrationBrokerControllerQuorumConfigEnabled is set and 'true'.
247+
// this is to support the zk to kRaft migration
248+
func shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig *properties.Properties) bool {
249+
migrationBrokerControllerQuorumConfigEnabled, found := brokerReadOnlyConfig.Get(kafkautils.MigrationBrokerControllerQuorumConfigEnabled)
250+
return !found || migrationBrokerControllerQuorumConfigEnabled.Value() == "true"
251+
}
252+
242253
func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties,
243254
serverPasses map[string]string, extListenerStatuses, intListenerStatuses,
244255
controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger) {

pkg/resources/kafka/configmap_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,6 +1048,42 @@ process.roles=broker,controller
10481048
},
10491049
ReadOnlyConfig: "migration.broker.controllerQuorumConfigEnabled=false\nmigration.broker.kRaftMode=false",
10501050
},
1051+
{
1052+
Id: 200,
1053+
BrokerConfig: &v1beta1.BrokerConfig{
1054+
Roles: []string{"broker"},
1055+
StorageConfigs: []v1beta1.StorageConfig{
1056+
{
1057+
MountPath: "/test-kafka-logs",
1058+
},
1059+
{
1060+
MountPath: "/test-kafka-logs-50",
1061+
},
1062+
{
1063+
MountPath: "/test-kafka-logs-100",
1064+
},
1065+
},
1066+
},
1067+
ReadOnlyConfig: "migration.broker.controllerQuorumConfigEnabled=true\nmigration.broker.kRaftMode=false",
1068+
},
1069+
{
1070+
Id: 300,
1071+
BrokerConfig: &v1beta1.BrokerConfig{
1072+
Roles: []string{"broker"},
1073+
StorageConfigs: []v1beta1.StorageConfig{
1074+
{
1075+
MountPath: "/test-kafka-logs",
1076+
},
1077+
{
1078+
MountPath: "/test-kafka-logs-50",
1079+
},
1080+
{
1081+
MountPath: "/test-kafka-logs-100",
1082+
},
1083+
},
1084+
},
1085+
ReadOnlyConfig: "migration.broker.controllerQuorumConfigEnabled=false\nmigration.broker.kRaftMode=true",
1086+
},
10511087
},
10521088
listenersConfig: v1beta1.ListenersConfig{
10531089
InternalListeners: []v1beta1.InternalListenerConfig{
@@ -1083,6 +1119,14 @@ process.roles=broker,controller
10831119
Name: "broker-100",
10841120
Address: "kafka-100.kafka.svc.cluster.local:9092",
10851121
},
1122+
{
1123+
Name: "broker-200",
1124+
Address: "kafka-100.kafka.svc.cluster.local:9092",
1125+
},
1126+
{
1127+
Name: "broker-300",
1128+
Address: "kafka-100.kafka.svc.cluster.local:9092",
1129+
},
10861130
},
10871131
},
10881132
controllerListenerStatus: map[string]v1beta1.ListenerStatusList{
@@ -1099,6 +1143,14 @@ process.roles=broker,controller
10991143
Name: "broker-100",
11001144
Address: "kafka-100.kafka.svc.cluster.local:9093",
11011145
},
1146+
{
1147+
Name: "broker-100",
1148+
Address: "kafka-100.kafka.svc.cluster.local:9093",
1149+
},
1150+
{
1151+
Name: "broker-100",
1152+
Address: "kafka-100.kafka.svc.cluster.local:9093",
1153+
},
11021154
},
11031155
},
11041156
expectedBrokerConfigs: []string{
@@ -1137,6 +1189,29 @@ listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
11371189
listeners=INTERNAL://:9092,CONTROLLER://:9093
11381190
log.dirs=/test-kafka-logs/kafka,/test-kafka-logs-50/kafka,/test-kafka-logs-100/kafka
11391191
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
1192+
`,
1193+
`advertised.listeners=INTERNAL://kafka-200.kafka.svc.cluster.local:9092
1194+
broker.id=200
1195+
cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092
1196+
cruise.control.metrics.reporter.kubernetes.mode=true
1197+
inter.broker.listener.name=INTERNAL
1198+
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
1199+
listeners=INTERNAL://:9092
1200+
log.dirs=/test-kafka-logs/kafka
1201+
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
1202+
`,
1203+
`advertised.listeners=INTERNAL://kafka-300.kafka.svc.cluster.local:9092
1204+
controller.listener.names=CONTROLLER
1205+
1206+
cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092
1207+
cruise.control.metrics.reporter.kubernetes.mode=true
1208+
inter.broker.listener.name=INTERNAL
1209+
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
1210+
listeners=INTERNAL://:9092
1211+
log.dirs=/test-kafka-logs/kafka
1212+
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
1213+
node.id=300
1214+
process.roles=broker
11401215
`},
11411216
},
11421217
}

0 commit comments

Comments
 (0)