Skip to content

Commit 7be343a

Browse files
dobrerazvanpanyuenlauctrlaltlucamurarucristianpetrache
authored
Kraft (#74)
* Add koperator/api changes for KRaft support * Move processRoles under brokerConfig * Update comments for ZK-relevant configurations in kafkacluster_types.go * Rebase origin/kraft-api * Support running Kafka cluster in KRaft mode * brokerRoles -> processRoles to match upstream Kafka naming * Update pod start-up process so pods can be restarted during rolling upgrade triggered by controller addition/removal * Rebase from origin/kraft-api * Update exsiting integration tests and func signatures * Fix broker configurations; add unit tests for broker configurations under KRaft mode * Remove unnecessary method from koperator/api * Extend integration tests to cover KRaft mode * make lint-fix * Update static KafkaCluster yamls; add check for kraft mode before setting ClusterID in status * Rebase from origin/koperator-api * Use util functions that got moved to the koperator/api module * Remove unineteded changes during rebase * Do not take active controller identity into consideration when reorder the brokers * Update implementation to accomomdate the latest KafkaCluster API change * Make comments about ZK-relevant configurations more clear * Add ConcurrentBrokerRestartCountPerRack to RollingUpgradeConfig (banzaicloud#1002) * Add ConcurrentBrokerRestartCountPerRack to RollingUpgradeConfig * Small refactoring * Exclude controller-only nodes from all CC operations * Add processRoles label key for Kafka pods in KRaft mode; export consts * Allow concurrent broker restarts from same AZ (broker rack) (#62) * Fix flaky test by deleting nodeports explicitly (#67) * Allow dashes when parsing broker rack (#68) * Upgrade Kafka to 3.6.0 (#69) * Upgrade dependencies * Fix wrong port on expectEnvoyWithConfigAz2Tls test (#70) * Upgrade Kafka to 3.6.1 (#71) Co-authored-by: Petruț™ <[email protected]> * working kraft * Merge origin/master * Fixing go.mod * Fixing tests post merge * Fix tests * Fix tests * Fix unnecessary append * Fix go.mod * Update CRD * Update go.mod for e2e * Update go.mod * More fixes * Upgrade Kafka image to use Java v21 (#72) * Remove jbod cc tests * Merging Master to Kraft and Reverting Rack Removal (#89) * Fix flaky test by deleting nodeports explicitly (#67) * Upgrade Kafka to 3.6.0 (#69) * Upgrade dependencies * Fix wrong port on expectEnvoyWithConfigAz2Tls test (#70) * Upgrade Kafka to 3.6.1 (#71) Co-authored-by: Petruț™ <[email protected]> * Upgrade Kafka image to use Java v21 (#72) * Added arm64 to docker build platforms (#73) * Added arm64 to docker build platforms * Regenerated headers for 2024 * Upgrading Kafka to 3.7.0 (#77) * Update codeql-analysis.yml (#78) * [INTERNAL] Create uniq leader ID per operator deployment (#76) * [INTERNAL] Get watched namespaces from env variable (#75) (cherry picked from commit de6500b) * [CORE-106517] Fix outdated config in the sample (#83) * Cross-compile koperator for arm and intel. (#84) * Adding Contour Ingress support (#82) * Allow property security-inter-broker-protocol (#85) * adding the ability to use security-inter-broker-protocol in koperator * updating util.go to remove _ for generated names * adding replace all for external listener port name * fixing other places where externallistener name is used to not have _ * adding an alternative way to identify which port to use for kafka administration and cc connection * taking out comments for pr push * fixing kafka crd * setting omitempty so it will not be required * adding generated crds * adding comments with context for new flag UsedForKafkaAdminCommunication * Use getBrokerReadOnlyConfig function to get properties and update unit test - security_inter_broker_protocol_Set * Update crds to match generated manifest --------- Co-authored-by: Cameron Wright <[email protected]> Co-authored-by: Ha Van <[email protected]> * Revert "Allow concurrent broker restarts from same AZ (broker rack) (#62)" This reverts commit 514fa07. * Fixed build issues * Fix TestGenerateBrokerConfig * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added watch namesapces * Added tmate for debugging * Added tmate for debugging * Added tmate for debugging * Added enabled projectcontour helm install * Enabled cloud-provider-kind * Added ProjectContour cluster role * updated certificate name * updated certificate name * Run without SSL * Removing Project Contour * Adding cloud-provider-kind * Removing cloud-provider - manually adding during test * trigger test * Remove SnpshotClusterAndCompare * Increased log length for Snapshot and Compare * Re-Add Snapshot and compare * Increased log length for Snapshot and Compare * Increased log length even more * Add Uninstall Contour CRDs * Re-Add KafkaCluster_SSL Tests * Removing BanzaiCloud Helm Chart from list of repos * pushing up latest go.sum * Clean up Merge * Enabling Tmate to debug e2e Test * Revert Cert Changes * Revert "Revert Cert Changes" This reverts commit 5c5b19c. * Enable sslClientAuth * trigger test * WIP: Fix Listener Config * Clean up test case results - tc-1 * Clean up test case results - tc-2 * Updated Kraft Test Cases * Cleanup Linting Issues * Remove Tmate Debugger * Run Kraft CLuster E2E * Add kraft e2e test * Revert Test --------- Co-authored-by: ctrlaltluc <[email protected]> Co-authored-by: Adi Muraru <[email protected]> Co-authored-by: Razvan Dobre <[email protected]> Co-authored-by: Cristian-Petrut Petrache <[email protected]> Co-authored-by: Petruț™ <[email protected]> Co-authored-by: Adrian Muraru <[email protected]> Co-authored-by: Adrian <[email protected]> Co-authored-by: aguzovatii <[email protected]> Co-authored-by: cawright-rh <[email protected]> Co-authored-by: Cameron Wright <[email protected]> Co-authored-by: Ha Van <[email protected]> Co-authored-by: Daniel Vaseekaran <[email protected]> * Trigger E2E Test * End to end testing with KRaft cluster (#92) * Fix flaky test by deleting nodeports explicitly (#67) * Upgrade Kafka to 3.6.0 (#69) * Upgrade dependencies * Fix wrong port on expectEnvoyWithConfigAz2Tls test (#70) * Upgrade Kafka to 3.6.1 (#71) Co-authored-by: Petruț™ <[email protected]> * Upgrade Kafka image to use Java v21 (#72) * Added arm64 to docker build platforms (#73) * Added arm64 to docker build platforms * Regenerated headers for 2024 * Upgrading Kafka to 3.7.0 (#77) * Update codeql-analysis.yml (#78) * [INTERNAL] Create uniq leader ID per operator deployment (#76) * [INTERNAL] Get watched namespaces from env variable (#75) (cherry picked from commit de6500b) * [CORE-106517] Fix outdated config in the sample (#83) * Cross-compile koperator for arm and intel. (#84) * Adding Contour Ingress support (#82) * Allow property security-inter-broker-protocol (#85) * adding the ability to use security-inter-broker-protocol in koperator * updating util.go to remove _ for generated names * adding replace all for external listener port name * fixing other places where externallistener name is used to not have _ * adding an alternative way to identify which port to use for kafka administration and cc connection * taking out comments for pr push * fixing kafka crd * setting omitempty so it will not be required * adding generated crds * adding comments with context for new flag UsedForKafkaAdminCommunication * Use getBrokerReadOnlyConfig function to get properties and update unit test - security_inter_broker_protocol_Set * Update crds to match generated manifest --------- Co-authored-by: Cameron Wright <[email protected]> Co-authored-by: Ha Van <[email protected]> * Revert "Allow concurrent broker restarts from same AZ (broker rack) (#62)" This reverts commit 514fa07. * Fixed build issues * Fix TestGenerateBrokerConfig * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added LoadBalancer for Kind E2E test cluster * Added watch namesapces * Added tmate for debugging * Added tmate for debugging * Added tmate for debugging * Added enabled projectcontour helm install * Enabled cloud-provider-kind * Added ProjectContour cluster role * updated certificate name * updated certificate name * Run without SSL * Removing Project Contour * Adding cloud-provider-kind * Removing cloud-provider - manually adding during test * trigger test * Remove SnpshotClusterAndCompare * Increased log length for Snapshot and Compare * Re-Add Snapshot and compare * Increased log length for Snapshot and Compare * Increased log length even more * Add Uninstall Contour CRDs * Re-Add KafkaCluster_SSL Tests * Removing BanzaiCloud Helm Chart from list of repos * pushing up latest go.sum * Clean up Merge * Enabling Tmate to debug e2e Test * Revert Cert Changes * Revert "Revert Cert Changes" This reverts commit 5c5b19c. * Enable sslClientAuth * trigger test * WIP: Fix Listener Config * Clean up test case results - tc-1 * Clean up test case results - tc-2 * Updated Kraft Test Cases * Cleanup Linting Issues * Remove Tmate Debugger * Run Kraft CLuster E2E * Increate Timeout to allow pod termination * Trigger Test * Added Debugger * Fix App Labels for Controllers * Revert image upate * Revert "Fix App Labels for Controllers" This reverts commit a3cf8a5. * Include Broker/Controller Labels for Headless SVC Selector * Logic for controller listener * add controller service * Added Headless-Controller-SVC Labels * Fix controller addresses and labels for brokers * Empty commit to trigger e2e * Set up kafka-3 as controller only for troubleshooting * Empty commit to trigger e2e * Use controller address for JMXTemplate * Update uninstall timeout to 600s * fix lint * fix lint * fix lint * Enable TMate Debugger * Trigger E2E * Updated BrokerIdLabelkey * Updated BrokerIdLabelkey * Check for Kraft mode when setting the controller listener * Check for Kraft mode when setting the controller listener * Disable tmate from e2e test * adding //nolint:unparam to testProduceConsumeInternal func now that it is used twice * moving the //nolint:unparam to the none-ssl version * Fixed BrokerLabel Test * Add additional test cases for TestGetBrokerLabels * Add additional test cases for TestGetBrokerLabels * commenting out broker-1 in test to fix kraft test * adding conditional to check if kraft mode is enabled before selecting which expected results in test --------- Co-authored-by: ctrlaltluc <[email protected]> Co-authored-by: Adi Muraru <[email protected]> Co-authored-by: Razvan Dobre <[email protected]> Co-authored-by: Cristian-Petrut Petrache <[email protected]> Co-authored-by: Petruț™ <[email protected]> Co-authored-by: Adrian Muraru <[email protected]> Co-authored-by: Adrian <[email protected]> Co-authored-by: aguzovatii <[email protected]> Co-authored-by: cawright-rh <[email protected]> Co-authored-by: Cameron Wright <[email protected]> Co-authored-by: Ha Van <[email protected]> Co-authored-by: Daniel Vaseekaran <[email protected]> Co-authored-by: Ha Van <[email protected]> * [CORE-119212] - Add/fix pdb for controllers (#94) * Ensure externallisteners are listed first (#97) * Exclude controllers from external ingress (e.g. envoy) (#95) * adding new check for broker level readOnlyConfig (#96) * Zk kraft migration (#100) * Allow setting CLUSTER_ID as env var for zk to kraft migration * Add migration properties to control if broker are in kraft mode or zk mode even when kraft is enabled * Refactor functions and additional test cases * Fix linting * Fix unit test * Add zkConnect property when broker is in zk mode * Remove control.plane.listener.name property from broker during migration * Add additional labels to brokers for backward compatibility while performing migration * adding fix for koperator to add interBrokerListenerName one at a time and check for security.inter.broker.protocol (#101) * adding fix for koperator to add interBrokerListenerName one at a time * updating test to include inter.broker.listener.name * fixing code that checks for inter.broker.listener.name * removing unecessary function calls * updating test expectations to be alphabetized * adding generateListenerSpecificConfig back * adding merges back in * reverting back to having getConfigProperties only call generateListenerSpecificConfig * removing unnecessary comments --------- Co-authored-by: Cameron Wright <[email protected]> * [CORE-122420] - Update pod name for Kraft Controllers (#102) * [CORE-122420] - Update pod name for Kraft Controllers * Triggering Integration Tests * Update Test Case --------- Co-authored-by: Daniel Vaseekaran <[email protected]> * Cleanup Merge * Cleanup Merge * Update .github/workflows/helm.yml Co-authored-by: Razvan Dobre <[email protected]> * Update api/go.mod Co-authored-by: Razvan Dobre <[email protected]> * Fixing controller reconcile logic to properly check for controller only nodes in kraft mode only (#104) * fixing controller reconcile logic that was stopping all reconciles * fixing lint issue * fixing filteredBrokerID changes * Updating spacing in cctask_controller_test * increasing timeout time during waitforclusterrunningstate * testing longer pull rate * increasing serviceIP range to try to increase test stability * reverting to 50ms * adding service counter to try and further debug * temporarily removing counter * removing commented section * dummy commit to kick of test * Adding if statement to check if kafkacluster is deleted first before deleting in test * adding additional checks and defering if an error arises to help with debugging * adding further debugging and using a helper function to clean up kafkacluster resources * adding new util to help with safer deleting, easier debugging, and an additional helper function for port management * taking out old tests * adding liscense text to new test_utils.go * taking out old comments from previous testing * fixing lint errors * handling errors * adding panic handling for problematic tests * adding defer to exact spot where panic is occuring * removing safekafkacleanup * removing cleanups * adding logging to help with debug * adding printf formatting directive * adding removal for kafkacluster kraft specific test * changing kfakcluster deletion * taking out manual kraft cleanup * removing manual cleanup of kafkacluster * refactoring externalnodeport test to be more stable * fixing license issue * removing logging * changing logging * getting rid of old comments * updating test to reserve all ports in a line * fixing lint issue * fixing lint issues * Update controllers/tests/suite_test.go Co-authored-by: hvan <[email protected]> * fixing lint errors * forcing ordering to further prevent nodeport collision --------- Co-authored-by: Cameron Wright <[email protected]> Co-authored-by: hvan <[email protected]> * Update api/v1beta1/kafkacluster_types.go Co-authored-by: Razvan Dobre <[email protected]> * Update api/v1beta1/kafkacluster_types.go Co-authored-by: Razvan Dobre <[email protected]> * Update config/samples/banzaicloud_v1beta1_kafkacluster.yaml Co-authored-by: Razvan Dobre <[email protected]> * [CORE-126251] - PR Comments Addressed * [CORE-126251] - PR Comments Addressed --------- Co-authored-by: Darren Lau <[email protected]> Co-authored-by: Lucian Ilie <[email protected]> Co-authored-by: Adi Muraru <[email protected]> Co-authored-by: Cristian-Petrut Petrache <[email protected]> Co-authored-by: Petruț™ <[email protected]> Co-authored-by: Adrian Muraru <[email protected]> Co-authored-by: dvaseekara <[email protected]> Co-authored-by: Adrian <[email protected]> Co-authored-by: aguzovatii <[email protected]> Co-authored-by: cawright-rh <[email protected]> Co-authored-by: Cameron Wright <[email protected]> Co-authored-by: Ha Van <[email protected]> Co-authored-by: Daniel Vaseekaran <[email protected]> Co-authored-by: Ha Van <[email protected]> Co-authored-by: hvan <[email protected]>
1 parent faa60bd commit 7be343a

File tree

70 files changed

+4655
-475
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+4655
-475
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ test: generate fmt vet bin/setup-envtest
102102
test-e2e:
103103
IMG_E2E=${IMG_E2E} go test github.com/banzaicloud/koperator/tests/e2e \
104104
-v \
105-
-timeout 20m \
105+
-timeout 45m \
106106
-tags e2e \
107107
--ginkgo.show-node-events \
108108
--ginkgo.trace \

api/go.mod

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
)
1616

1717
require (
18+
// github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
1819
github.com/go-logr/logr v1.3.0 // indirect
1920
github.com/gogo/protobuf v1.3.2 // indirect
2021
github.com/google/go-cmp v0.5.9 // indirect
@@ -23,16 +24,26 @@ require (
2324
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
2425
github.com/modern-go/reflect2 v1.0.2 // indirect
2526
github.com/pkg/errors v0.9.1 // indirect
27+
// github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
2628
go.uber.org/multierr v1.11.0 // indirect
2729
golang.org/x/net v0.18.0 // indirect
2830
golang.org/x/text v0.14.0 // indirect
2931
gopkg.in/inf.v0 v0.9.1 // indirect
3032
gopkg.in/yaml.v2 v2.4.0 // indirect
33+
// gopkg.in/yaml.v3 v3.0.1 // indirect
3134
k8s.io/klog/v2 v2.110.1 // indirect
3235
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
3336
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
3437
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
3538
)
3639

40+
require github.com/stretchr/testify v1.8.4
41+
42+
require (
43+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
44+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
45+
gopkg.in/yaml.v3 v3.0.1 // indirect
46+
)
47+
3748
// remove once https://github.com/cert-manager/cert-manager/issues/5953 is fixed
3849
replace github.com/Venafi/vcert/v4 => github.com/jetstack/vcert/v4 v4.9.6-0.20230127103832-3aa3dfd6613d

api/util/util.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,21 @@ func MergeLabels(l ...map[string]string) map[string]string {
3939
func LabelsForKafka(name string) map[string]string {
4040
return map[string]string{"app": "kafka", "kafka_cr": name}
4141
}
42+
43+
func LabelsForBroker(name string) map[string]string {
44+
return map[string]string{"isBrokerNode": "true", "app": "kafka", "kafka_cr": name}
45+
}
46+
47+
func LabelsForController(name string) map[string]string {
48+
return map[string]string{"isControllerNode": "true", "app": "kafka", "kafka_cr": name}
49+
}
50+
51+
// StringSliceContains returns true if list contains s
52+
func StringSliceContains(list []string, s string) bool {
53+
for _, v := range list {
54+
if v == s {
55+
return true
56+
}
57+
}
58+
return false
59+
}

api/util/util_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,13 @@ func TestMergeLabels(t *testing.T) {
3535
t.Error("Expected:", expected, "Got:", merged)
3636
}
3737
}
38+
39+
func TestStringSliceContains(t *testing.T) {
40+
slice := []string{"1", "2", "3"}
41+
if !StringSliceContains(slice, "1") {
42+
t.Error("Expected slice contains 1, got false")
43+
}
44+
if StringSliceContains(slice, "4") {
45+
t.Error("Expected slice not contains 4, got true")
46+
}
47+
}

api/v1beta1/kafkacluster_types.go

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,25 @@ const (
4242
KafkaCRLabelKey = "kafka_cr"
4343
BrokerIdLabelKey = "brokerId"
4444

45+
// ProcessRolesKey is used to identify which process roles the Kafka pod has
46+
ProcessRolesKey = "processRoles"
47+
48+
// IsBrokerNodeKey is used to identify if the kafka pod is either a broker or a broker_controller
49+
IsBrokerNodeKey = "isBrokerNode"
50+
51+
// IsControllerNodeKey is used to identify if the kafka pod is a controller or broker_controller
52+
IsControllerNodeKey = "isControllerNode"
53+
54+
// DefaultCruiseControlImage is the default CC image used when users don't specify it in CruiseControlConfig.Image
55+
DefaultCruiseControlImage = "ghcr.io/banzaicloud/cruise-control:2.5.123"
56+
57+
// DefaultKafkaImage is the default Kafka image used when users don't specify it in KafkaClusterSpec.ClusterImage
58+
DefaultKafkaImage = "ghcr.io/banzaicloud/kafka:2.13-3.4.1"
59+
60+
// ControllerNodeProcessRole represents the node is a controller node
61+
ControllerNodeProcessRole = "controller"
62+
// BrokerNodeProcessRole represents the node is a broker node
63+
BrokerNodeProcessRole = "broker"
4564
// These are default values for API keys
4665

4766
/* General Config */
@@ -134,15 +153,25 @@ const (
134153

135154
// KafkaClusterSpec defines the desired state of KafkaCluster
136155
type KafkaClusterSpec struct {
156+
// kRaft is used to decide where the Kafka cluster is under KRaft mode or ZooKeeper mode.
157+
// This is default to be true; if set to false, the Kafka cluster is in ZooKeeper mode.
158+
// +kubebuilder:default=false
159+
// +optional
160+
KRaftMode bool `json:"kRaft"`
137161
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
138162
ListenersConfig ListenersConfig `json:"listenersConfig"`
139163
// Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint
140164
AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"`
141165
// ZKAddresses specifies the ZooKeeper connection string
142166
// in the form hostname:port where host and port are the host and port of a ZooKeeper server.
143-
ZKAddresses []string `json:"zkAddresses"`
167+
// Under ZooKeeper mode, this is a must-have configuration.
168+
// And if set under KRaft mode, Koperator ignores this configuration.
169+
// +optional
170+
ZKAddresses []string `json:"zkAddresses,omitempty"`
144171
// ZKPath specifies the ZooKeeper chroot path as part
145172
// of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace.
173+
// If set under KRaft mode, Koperator ignores this configuration.
174+
// +optional
146175
ZKPath string `json:"zkPath,omitempty"`
147176
RackAwareness *RackAwareness `json:"rackAwareness,omitempty"`
148177
ClusterImage string `json:"clusterImage,omitempty"`
@@ -197,6 +226,8 @@ type KafkaClusterStatus struct {
197226
RollingUpgrade RollingUpgradeStatus `json:"rollingUpgradeStatus,omitempty"`
198227
AlertCount int `json:"alertCount"`
199228
ListenerStatuses ListenerStatuses `json:"listenerStatuses,omitempty"`
229+
// ClusterID is a base64-encoded random UUID generated by Koperator to run the Kafka cluster in KRaft mode
230+
ClusterID string `json:"clusterID,omitempty"`
200231
}
201232

202233
// RollingUpgradeStatus defines status of rolling upgrade
@@ -244,11 +275,12 @@ type DisruptionBudgetWithStrategy struct {
244275
DisruptionBudget `json:",inline"`
245276
// The strategy to be used, either minAvailable or maxUnavailable
246277
// +kubebuilder:validation:Enum=minAvailable;maxUnavailable
247-
Stategy string `json:"strategy,omitempty"`
278+
Strategy string `json:"strategy,omitempty"`
248279
}
249280

250281
// Broker defines the broker basic configuration
251282
type Broker struct {
283+
// id maps to "node.id" configuration in KRaft mode, and it maps to "broker.id" configuration in ZooKeeper mode.
252284
// +kubebuilder:validation:Minimum=0
253285
// +kubebuilder:validation:Maximum=65535
254286
// +kubebuilder:validation:ExclusiveMaximum=true
@@ -260,6 +292,13 @@ type Broker struct {
260292

261293
// BrokerConfig defines the broker configuration
262294
type BrokerConfig struct {
295+
// processRoles defines the role(s) for this particular Kafka node: "broker", "controller", or both.
296+
// This must be set in KRaft mode. If set in ZooKeeper mode, Koperator ignores this configuration.
297+
// +kubebuilder:validation:MaxItems=2
298+
// +kubebuilder:validation:Items:Type=string
299+
// +kubebuilder:validation:Items:Enum=controller;broker
300+
// +optional
301+
Roles []string `json:"processRoles,omitempty"`
263302
Image string `json:"image,omitempty"`
264303
MetricsReporterImage string `json:"metricsReporterImage,omitempty"`
265304
Config string `json:"config,omitempty"`
@@ -991,6 +1030,19 @@ func (bConfig *BrokerConfig) GetTerminationGracePeriod() int64 {
9911030
return *bConfig.TerminationGracePeriod
9921031
}
9931032

1033+
// GetStorageMountPaths returns a string with comma-separated storage mount paths that the broker uses
1034+
func (bConfig *BrokerConfig) GetStorageMountPaths() string {
1035+
var mountPaths string
1036+
for i, sc := range bConfig.StorageConfigs {
1037+
if i != len(bConfig.StorageConfigs)-1 {
1038+
mountPaths += sc.MountPath + ","
1039+
} else {
1040+
mountPaths += sc.MountPath
1041+
}
1042+
}
1043+
return mountPaths
1044+
}
1045+
9941046
// GetNodeSelector returns the node selector for cruise control
9951047
func (cConfig *CruiseControlConfig) GetNodeSelector() map[string]string {
9961048
return cConfig.NodeSelector
@@ -1051,11 +1103,24 @@ func (bConfig *BrokerConfig) GetBrokerAnnotations() map[string]string {
10511103
}
10521104

10531105
// GetBrokerLabels returns the labels that are applied to broker pods
1054-
func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId int32) map[string]string {
1106+
func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId int32, kRaftMode bool) map[string]string {
1107+
var kraftLabels map[string]string
1108+
if kRaftMode {
1109+
kraftLabels = map[string]string{
1110+
ProcessRolesKey: strings.Join(bConfig.Roles, "_"),
1111+
IsControllerNodeKey: fmt.Sprintf("%t", bConfig.IsControllerNode()),
1112+
IsBrokerNodeKey: fmt.Sprintf("%t", bConfig.IsBrokerNode()),
1113+
}
1114+
} else { // in ZK mode -> new labels for backward compatibility for the headless service when going from ZK to KRaft
1115+
kraftLabels = map[string]string{
1116+
IsControllerNodeKey: fmt.Sprintf("%t", false),
1117+
IsBrokerNodeKey: fmt.Sprintf("%t", true),
1118+
}
1119+
}
10551120
return util.MergeLabels(
10561121
bConfig.BrokerLabels,
10571122
util.LabelsForKafka(kafkaClusterName),
1058-
map[string]string{BrokerIdLabelKey: fmt.Sprintf("%d", brokerId)},
1123+
kraftLabels, map[string]string{BrokerIdLabelKey: fmt.Sprintf("%d", brokerId)},
10591124
)
10601125
}
10611126

@@ -1124,6 +1189,31 @@ func (cConfig *CruiseControlConfig) GetResources() *corev1.ResourceRequirements
11241189
}
11251190
}
11261191

1192+
// IsBrokerNode returns true when the broker is a broker node
1193+
func (bConfig *BrokerConfig) IsBrokerNode() bool {
1194+
return util.StringSliceContains(bConfig.Roles, BrokerNodeProcessRole)
1195+
}
1196+
1197+
// IsControllerNode returns true when the broker is a controller node
1198+
func (bConfig *BrokerConfig) IsControllerNode() bool {
1199+
return util.StringSliceContains(bConfig.Roles, ControllerNodeProcessRole)
1200+
}
1201+
1202+
// IsBrokerOnlyNode returns true when the broker is a broker-only node
1203+
func (bConfig *BrokerConfig) IsBrokerOnlyNode() bool {
1204+
return bConfig.IsBrokerNode() && !bConfig.IsControllerNode()
1205+
}
1206+
1207+
// IsControllerOnlyNode returns true when the broker is a controller-only node
1208+
func (bConfig *BrokerConfig) IsControllerOnlyNode() bool {
1209+
return bConfig.IsControllerNode() && !bConfig.IsBrokerNode()
1210+
}
1211+
1212+
// IsCombinedNode returns true when the broker is a broker + controller node
1213+
func (bConfig *BrokerConfig) IsCombinedNode() bool {
1214+
return bConfig.IsBrokerNode() && bConfig.IsControllerNode()
1215+
}
1216+
11271217
// GetResources returns the broker specific Kubernetes resource
11281218
func (bConfig *BrokerConfig) GetResources() *corev1.ResourceRequirements {
11291219
if bConfig.Resources != nil {

0 commit comments

Comments
 (0)