diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index f341c023a..c2bc8c63c 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -1102,13 +1102,18 @@ func (bConfig *BrokerConfig) GetBrokerAnnotations() map[string]string { // GetBrokerLabels returns the labels that are applied to broker pods func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId int32, kRaftMode bool) map[string]string { - kraftLabels := make(map[string]string, 0) + var kraftLabels map[string]string if kRaftMode { kraftLabels = map[string]string{ ProcessRolesKey: strings.Join(bConfig.Roles, "_"), IsControllerNodeKey: fmt.Sprintf("%t", bConfig.IsControllerNode()), IsBrokerNodeKey: fmt.Sprintf("%t", bConfig.IsBrokerNode()), } + } else { // in ZK mode -> new labels for backward compatibility for the headless service when going from ZK to KRaft + kraftLabels = map[string]string{ + IsControllerNodeKey: fmt.Sprintf("%t", false), + IsBrokerNodeKey: fmt.Sprintf("%t", true), + } } return util.MergeLabels( bConfig.BrokerLabels, diff --git a/api/v1beta1/kafkacluster_types_test.go b/api/v1beta1/kafkacluster_types_test.go index 45d4e66d6..7b917cbd3 100644 --- a/api/v1beta1/kafkacluster_types_test.go +++ b/api/v1beta1/kafkacluster_types_test.go @@ -445,10 +445,12 @@ func TestGetBrokerLabels(t *testing.T) { { testName: "Labels in zookeeper mode", expectedLabels: map[string]string{ - AppLabelKey: expectedDefaultLabelApp, - BrokerIdLabelKey: strconv.Itoa(expectedBrokerId), - KafkaCRLabelKey: expectedKafkaCRName, - "test_label_key": "test_label_value", + AppLabelKey: expectedDefaultLabelApp, + BrokerIdLabelKey: strconv.Itoa(expectedBrokerId), + KafkaCRLabelKey: expectedKafkaCRName, + IsBrokerNodeKey: "true", + IsControllerNodeKey: "false", + "test_label_key": "test_label_value", }, brokerConfig: &BrokerConfig{ Roles: nil, diff --git a/controllers/tests/kafkacluster_controller_kafka_test.go b/controllers/tests/kafkacluster_controller_kafka_test.go index 61eb55591..b102a07c2 100644 --- a/controllers/tests/kafkacluster_controller_kafka_test.go +++ b/controllers/tests/kafkacluster_controller_kafka_test.go @@ -432,6 +432,14 @@ func expectKafkaBrokerPod(ctx context.Context, kafkaCluster *v1beta1.KafkaCluste Value: "/kafka-logs,/ephemeral-dir1", }, )) + + // when CLUSTER_ID is set as an ENV, verify the status is not randomly generated + for _, env := range kafkaCluster.Spec.Envs { + if env.Name == "CLUSTER_ID" { + Expect(kafkaCluster.Status.ClusterID).To(Equal("test-cluster-id")) + break + } + } } else { Expect(container.Env).To(ConsistOf( // the exact value is not interesting diff --git a/controllers/tests/kafkacluster_controller_test.go b/controllers/tests/kafkacluster_controller_test.go index c2ee67373..008ccaf84 100644 --- a/controllers/tests/kafkacluster_controller_test.go +++ b/controllers/tests/kafkacluster_controller_test.go @@ -256,6 +256,23 @@ var _ = Describe("KafkaCluster", func() { expectCruiseControl(ctx, kafkaClusterKRaft) }) }) + When("configuring Kafka cluster in KRaft mode with CLUSTER_ID env var", func() { + BeforeEach(func() { + loadBalancerServiceName = fmt.Sprintf("envoy-loadbalancer-test-%s", kafkaCluster.Name) + externalListenerHostName = "test.host.com" + + loadBalancerServiceNameKRaft = fmt.Sprintf("envoy-loadbalancer-test-%s", kafkaClusterKRaft.Name) + externalListenerHostNameKRaft = "test.host.com" + kafkaClusterKRaft.Spec.Envs = append(kafkaClusterKRaft.Spec.Envs, corev1.EnvVar{ + Name: "CLUSTER_ID", + Value: "test-cluster-id", + }) + }) + + It("should reconciles objects properly", func(ctx SpecContext) { + expectKafka(ctx, kafkaClusterKRaft, count) + }) + }) When("configuring one ingress envoy controller config inside the external listener without bindings", func() { BeforeEach(func() { testExternalListener := kafkaCluster.Spec.ListenersConfig.ExternalListeners[0] diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index d1ae5b536..f5c2db1e3 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -57,9 +57,11 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v // Cruise Control metrics reporter configuration r.configCCMetricsReporter(broker, config, clientPass, log) + brokerReadOnlyConfig := getBrokerReadOnlyConfig(broker, r.KafkaCluster, log) + // Kafka Broker configurations if r.KafkaCluster.Spec.KRaftMode { - configureBrokerKRaftMode(bConfig, broker.Id, r.KafkaCluster, config, quorumVoters, serverPasses, extListenerStatuses, intListenerStatuses, log) + configureBrokerKRaftMode(bConfig, broker.Id, r.KafkaCluster, config, quorumVoters, serverPasses, extListenerStatuses, intListenerStatuses, log, brokerReadOnlyConfig) } else { configureBrokerZKMode(broker.Id, r.KafkaCluster, config, serverPasses, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses, log) } @@ -148,23 +150,42 @@ func (r *Reconciler) configCCMetricsReporter(broker v1beta1.Broker, config *prop } func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties, - quorumVoters []string, serverPasses map[string]string, extListenerStatuses, intListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger) { - if err := config.Set(kafkautils.KafkaConfigNodeID, brokerID); err != nil { - log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigNodeID)) - } + quorumVoters []string, serverPasses map[string]string, extListenerStatuses, intListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger, + brokerReadOnlyConfig *properties.Properties) { + controllerListenerName := generateControlPlaneListener(kafkaCluster.Spec.ListenersConfig.InternalListeners) - if err := config.Set(kafkautils.KafkaConfigProcessRoles, bConfig.Roles); err != nil { - log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigProcessRoles)) - } + // when kRaft is enabled for the cluster, brokers can still be configured to use zookeeper for metadata. + // this is to support the zk to kRaft migration where both zookeeper and kRaft controllers are running in parallel. + if shouldUseKRaftModeForBroker(brokerReadOnlyConfig) { + if err := config.Set(kafkautils.KafkaConfigNodeID, brokerID); err != nil { + log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigNodeID)) + } - if err := config.Set(kafkautils.KafkaConfigControllerQuorumVoters, quorumVoters); err != nil { - log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControllerQuorumVoters)) + if err := config.Set(kafkautils.KafkaConfigProcessRoles, bConfig.Roles); err != nil { + log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigProcessRoles)) + } + } else { // use zk mode for broker. + // when in zk mode, "broker.id" and "zookeeper.connect" are configured so it will communicate with zookeeper + // control.plane.listener.name will not be set in zk mode. There for it will default to the interbroker listener. + if err := config.Set(kafkautils.KafkaConfigBrokerID, brokerID); err != nil { + log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigBrokerID)) + } + + if err := config.Set(kafkautils.KafkaConfigZooKeeperConnect, zookeeperutils.PrepareConnectionAddress( + kafkaCluster.Spec.ZKAddresses, kafkaCluster.Spec.GetZkPath())); err != nil { + log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigZooKeeperConnect)) + } } - controllerListenerName := generateControlPlaneListener(kafkaCluster.Spec.ListenersConfig.InternalListeners) - if controllerListenerName != "" { - if err := config.Set(kafkautils.KafkaConfigControllerListenerName, controllerListenerName); err != nil { - log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControllerListenerName)) + if shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig) { + if err := config.Set(kafkautils.KafkaConfigControllerQuorumVoters, quorumVoters); err != nil { + log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControllerQuorumVoters)) + } + + if controllerListenerName != "" { + if err := config.Set(kafkautils.KafkaConfigControllerListenerName, controllerListenerName); err != nil { + log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControllerListenerName)) + } } } @@ -214,6 +235,20 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf } } +// Returns true by default (not in migration configured) OR when MigrationBrokerKRaftMode is set and 'true'. +// this is to support the zk to kRaft migration +func shouldUseKRaftModeForBroker(brokerReadOnlyConfig *properties.Properties) bool { + migrationBrokerKRaftMode, found := brokerReadOnlyConfig.Get(kafkautils.MigrationBrokerKRaftMode) + return !found || migrationBrokerKRaftMode.Value() == "true" +} + +// Returns true by default (not in migration) OR when MigrationBrokerControllerQuorumConfigEnabled is set and 'true'. +// this is to support the zk to kRaft migration +func shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig *properties.Properties) bool { + migrationBrokerControllerQuorumConfigEnabled, found := brokerReadOnlyConfig.Get(kafkautils.MigrationBrokerControllerQuorumConfigEnabled) + return !found || migrationBrokerControllerQuorumConfigEnabled.Value() == "true" +} + func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties, serverPasses map[string]string, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger) { @@ -547,6 +582,10 @@ func (r Reconciler) generateBrokerConfig(broker v1beta1.Broker, brokerConfig *v1 finalBrokerConfig.Merge(opGenConf) } + // Remove the migration broker configuration since its only used as flags to derive other configs + finalBrokerConfig.Delete(kafkautils.MigrationBrokerControllerQuorumConfigEnabled) + finalBrokerConfig.Delete(kafkautils.MigrationBrokerKRaftMode) + finalBrokerConfig.Sort() return finalBrokerConfig.String() diff --git a/pkg/resources/kafka/configmap_test.go b/pkg/resources/kafka/configmap_test.go index c44a97c22..e23f27e9a 100644 --- a/pkg/resources/kafka/configmap_test.go +++ b/pkg/resources/kafka/configmap_test.go @@ -722,13 +722,15 @@ zookeeper.connect=example.zk:2181/`, // TestGenerateBrokerConfigKRaftMode serves as an aggregated test on top of TestGenerateBrokerConfig to verify basic broker configurations under KRaft mode // Note: most of the test cases under TestGenerateBrokerConfig are not replicated here since running KRaft mode doesn't affect things like SSL and storage configurations -func TestGenerateBrokerConfigKRaftMode(t *testing.T) { +func TestGenerateBrokerConfigKRaftMode(t *testing.T) { //nolint funlen testCases := []struct { testName string brokers []v1beta1.Broker listenersConfig v1beta1.ListenersConfig internalListenerStatuses map[string]v1beta1.ListenerStatusList controllerListenerStatus map[string]v1beta1.ListenerStatusList + zkAddresses []string + zkPath string expectedBrokerConfigs []string }{ { @@ -999,6 +1001,210 @@ log.dirs=/test-kafka-logs/kafka,/test-kafka-logs-50/kafka,/test-kafka-logs-100/k metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter node.id=100 process.roles=broker,controller +`}, + }, + { + testName: "a Kafka cluster with a mix of broker-only, controller-only, and combined roles; and various migration configs set on each brokers", + brokers: []v1beta1.Broker{ + { + Id: 0, + BrokerConfig: &v1beta1.BrokerConfig{ + Roles: []string{"broker"}, + StorageConfigs: []v1beta1.StorageConfig{ + { + MountPath: "/test-kafka-logs", + }, + }, + }, + ReadOnlyConfig: "migration.broker.controllerQuorumConfigEnabled=true\nmigration.broker.kRaftMode=true", + }, + { + Id: 50, + BrokerConfig: &v1beta1.BrokerConfig{ + Roles: []string{"controller"}, + StorageConfigs: []v1beta1.StorageConfig{ + { + MountPath: "/test-kafka-logs", + }, + { + MountPath: "/test-kafka-logs-50", + }, + }, + }, + }, + { + Id: 100, + BrokerConfig: &v1beta1.BrokerConfig{ + Roles: []string{"broker", "controller"}, + StorageConfigs: []v1beta1.StorageConfig{ + { + MountPath: "/test-kafka-logs", + }, + { + MountPath: "/test-kafka-logs-50", + }, + { + MountPath: "/test-kafka-logs-100", + }, + }, + }, + ReadOnlyConfig: "migration.broker.controllerQuorumConfigEnabled=false\nmigration.broker.kRaftMode=false", + }, + { + Id: 200, + BrokerConfig: &v1beta1.BrokerConfig{ + Roles: []string{"broker"}, + StorageConfigs: []v1beta1.StorageConfig{ + { + MountPath: "/test-kafka-logs", + }, + }, + }, + ReadOnlyConfig: "migration.broker.controllerQuorumConfigEnabled=true\nmigration.broker.kRaftMode=false", + }, + { + Id: 300, + BrokerConfig: &v1beta1.BrokerConfig{ + Roles: []string{"broker"}, + StorageConfigs: []v1beta1.StorageConfig{ + { + MountPath: "/test-kafka-logs", + }, + }, + }, + ReadOnlyConfig: "migration.broker.controllerQuorumConfigEnabled=false\nmigration.broker.kRaftMode=true", + }, + }, + listenersConfig: v1beta1.ListenersConfig{ + InternalListeners: []v1beta1.InternalListenerConfig{ + { + CommonListenerSpec: v1beta1.CommonListenerSpec{ + Type: v1beta1.SecurityProtocol("PLAINTEXT"), + Name: "internal", + ContainerPort: 9092, + UsedForInnerBrokerCommunication: true, + }, + }, + { + CommonListenerSpec: v1beta1.CommonListenerSpec{ + Type: v1beta1.SecurityProtocol("PLAINTEXT"), + Name: "controller", + ContainerPort: 9093, + }, + UsedForControllerCommunication: true, + }, + }, + }, + internalListenerStatuses: map[string]v1beta1.ListenerStatusList{ + "internal": { + { + Name: "broker-0", + Address: "kafka-0.kafka.svc.cluster.local:9092", + }, + { + Name: "broker-50", + Address: "kafka-50.kafka.svc.cluster.local:9092", + }, + { + Name: "broker-100", + Address: "kafka-100.kafka.svc.cluster.local:9092", + }, + { + Name: "broker-200", + Address: "kafka-200.kafka.svc.cluster.local:9092", + }, + { + Name: "broker-300", + Address: "kafka-300.kafka.svc.cluster.local:9092", + }, + }, + }, + controllerListenerStatus: map[string]v1beta1.ListenerStatusList{ + "controller": { + { + Name: "broker-0", + Address: "kafka-0.kafka.svc.cluster.local:9093", + }, + { + Name: "broker-50", + Address: "kafka-50.kafka.svc.cluster.local:9093", + }, + { + Name: "broker-100", + Address: "kafka-100.kafka.svc.cluster.local:9093", + }, + { + Name: "broker-200", + Address: "kafka-200.kafka.svc.cluster.local:9093", + }, + { + Name: "broker-300", + Address: "kafka-300.kafka.svc.cluster.local:9093", + }, + }, + }, + zkAddresses: []string{"example.zk:2181"}, + zkPath: "/kafka", + expectedBrokerConfigs: []string{ + `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092 +controller.listener.names=CONTROLLER +controller.quorum.voters=50@kafka-50.kafka.svc.cluster.local:9093,100@kafka-100.kafka.svc.cluster.local:9093 +cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.kubernetes.mode=true +inter.broker.listener.name=INTERNAL +listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT +listeners=INTERNAL://:9092 +log.dirs=/test-kafka-logs/kafka +metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter +node.id=0 +process.roles=broker +`, + `controller.listener.names=CONTROLLER +controller.quorum.voters=50@kafka-50.kafka.svc.cluster.local:9093,100@kafka-100.kafka.svc.cluster.local:9093 +cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.kubernetes.mode=true +inter.broker.listener.name=INTERNAL +listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT +listeners=CONTROLLER://:9093 +log.dirs=/test-kafka-logs/kafka,/test-kafka-logs-50/kafka +metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter +node.id=50 +process.roles=controller +`, + `advertised.listeners=INTERNAL://kafka-100.kafka.svc.cluster.local:9092 +broker.id=100 +cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.kubernetes.mode=true +inter.broker.listener.name=INTERNAL +listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT +listeners=INTERNAL://:9092,CONTROLLER://:9093 +log.dirs=/test-kafka-logs/kafka,/test-kafka-logs-50/kafka,/test-kafka-logs-100/kafka +metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter +zookeeper.connect=example.zk:2181/kafka +`, + `advertised.listeners=INTERNAL://kafka-200.kafka.svc.cluster.local:9092 +broker.id=200 +controller.listener.names=CONTROLLER +controller.quorum.voters=50@kafka-50.kafka.svc.cluster.local:9093,100@kafka-100.kafka.svc.cluster.local:9093 +cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.kubernetes.mode=true +inter.broker.listener.name=INTERNAL +listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT +listeners=INTERNAL://:9092 +log.dirs=/test-kafka-logs/kafka +metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter +zookeeper.connect=example.zk:2181/kafka +`, + `advertised.listeners=INTERNAL://kafka-300.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092 +cruise.control.metrics.reporter.kubernetes.mode=true +inter.broker.listener.name=INTERNAL +listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT +listeners=INTERNAL://:9092 +log.dirs=/test-kafka-logs/kafka +metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter +node.id=300 +process.roles=broker `}, }, } @@ -1023,6 +1229,8 @@ process.roles=broker,controller KRaftMode: true, ListenersConfig: test.listenersConfig, Brokers: test.brokers, + ZKAddresses: test.zkAddresses, + ZKPath: test.zkPath, }, }, }, diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 65694582b..46429ed8f 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -366,7 +366,18 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if r.KafkaCluster.Spec.KRaftMode { // all broker nodes under the same Kafka cluster must use the same cluster UUID if r.KafkaCluster.Status.ClusterID == "" { - r.KafkaCluster.Status.ClusterID = generateRandomClusterID() + // CLUSTER_ID can be overridden with ENV (e.g for migration from ZK to KRaft so it matches the value for ZK cluster) + for _, env := range r.KafkaCluster.Spec.Envs { + if env.Name == "CLUSTER_ID" { + r.KafkaCluster.Status.ClusterID = env.Value + break + } + } + + if r.KafkaCluster.Status.ClusterID == "" { + r.KafkaCluster.Status.ClusterID = generateRandomClusterID() + } + err = r.Client.Status().Update(ctx, r.KafkaCluster) if apierrors.IsNotFound(err) { err = r.Client.Update(ctx, r.KafkaCluster) diff --git a/pkg/resources/kafka/pod.go b/pkg/resources/kafka/pod.go index cf1469a64..5b903e579 100644 --- a/pkg/resources/kafka/pod.go +++ b/pkg/resources/kafka/pod.go @@ -132,12 +132,8 @@ fi`}, for i, container := range pod.Spec.Containers { if container.Name == kafkaContainerName { // in KRaft mode, all broker nodes within the same Kafka cluster need to use the same cluster ID to format the storage - pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, - corev1.EnvVar{ - Name: "CLUSTER_ID", - Value: r.KafkaCluster.Status.ClusterID, - }, - ) + addClusterIdEnv(r, pod, i) + // see how this env var is used in wait-for-envoy-sidecars.sh storageMountPaths := brokerConfig.GetStorageMountPaths() if storageMountPaths != "" { @@ -156,6 +152,22 @@ fi`}, return pod } +func addClusterIdEnv(r *Reconciler, pod *corev1.Pod, i int) { + // when cluster id env var already present from KafkaCluster, do not add it again + for _, envVar := range r.KafkaCluster.Spec.Envs { + if envVar.Name == "CLUSTER_ID" { + return + } + } + + pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, + corev1.EnvVar{ + Name: "CLUSTER_ID", + Value: r.KafkaCluster.Status.ClusterID, + }, + ) +} + func (r *Reconciler) generateKafkaContainerPorts(log logr.Logger) []corev1.ContainerPort { var kafkaContainerPorts []corev1.ContainerPort diff --git a/pkg/util/kafka/const.go b/pkg/util/kafka/const.go index ffc06ec28..79d2e6f16 100644 --- a/pkg/util/kafka/const.go +++ b/pkg/util/kafka/const.go @@ -51,6 +51,12 @@ const ( KafkaConfigSSLKeyStorePassword = "ssl.keystore.password" ) +// used for zk to kraft migration +const ( + MigrationBrokerControllerQuorumConfigEnabled = "migration.broker.controllerQuorumConfigEnabled" + MigrationBrokerKRaftMode = "migration.broker.kRaftMode" +) + // used for Cruise Control configurations const ( CruiseControlConfigMetricsReporters = "metric.reporters"