Skip to content
Merged
Show file tree
Hide file tree
Changes from 63 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
dfc0824
Add koperator/api changes for KRaft support
panyuenlau Jul 25, 2023
722fd96
Merge branch 'master' into kraft-api
panyuenlau Jul 26, 2023
30427bf
Merge branch 'master' into kraft-api
panyuenlau Jul 27, 2023
54af705
Move processRoles under brokerConfig
panyuenlau Jul 27, 2023
dbf3200
Update comments for ZK-relevant configurations in kafkacluster_types.go
panyuenlau Jul 27, 2023
219d998
Rebase origin/kraft-api
panyuenlau Jul 20, 2023
314ea41
Support running Kafka cluster in KRaft mode
panyuenlau Jul 20, 2023
7613f06
brokerRoles -> processRoles to match upstream Kafka naming
panyuenlau Jul 21, 2023
aca2396
Update pod start-up process so pods can be restarted during rolling u…
panyuenlau Jul 21, 2023
c5a4819
Rebase from origin/kraft-api
panyuenlau Jul 23, 2023
5d1b777
Update exsiting integration tests and func signatures
panyuenlau Jul 23, 2023
aa4f5f5
Fix broker configurations; add unit tests for broker configurations u…
panyuenlau Jul 24, 2023
7431c93
Remove unnecessary method from koperator/api
panyuenlau Jul 25, 2023
2e3be06
Extend integration tests to cover KRaft mode
panyuenlau Jul 25, 2023
78e63ee
make lint-fix
panyuenlau Jul 25, 2023
3b5aff9
Update static KafkaCluster yamls; add check for kraft mode before set…
panyuenlau Jul 25, 2023
f9aedac
Rebase from origin/koperator-api
panyuenlau Jul 25, 2023
5804835
Use util functions that got moved to the koperator/api module
panyuenlau Jul 25, 2023
ca16422
Remove unineteded changes during rebase
panyuenlau Jul 25, 2023
afd567b
Do not take active controller identity into consideration when reorde…
panyuenlau Jul 25, 2023
bbc0307
Update implementation to accomomdate the latest KafkaCluster API change
panyuenlau Jul 27, 2023
455ef3f
Make comments about ZK-relevant configurations more clear
panyuenlau Jul 27, 2023
6b3d616
Add ConcurrentBrokerRestartCountPerRack to RollingUpgradeConfig (#1002)
ctrlaltluc Jul 27, 2023
0a90251
Small refactoring
panyuenlau Jul 28, 2023
63e15fa
Merge branch 'master' into kraft
panyuenlau Jul 28, 2023
aa859a1
Exclude controller-only nodes from all CC operations
panyuenlau Jul 30, 2023
28a4a75
Add processRoles label key for Kafka pods in KRaft mode; export consts
panyuenlau Jul 30, 2023
514fa07
Allow concurrent broker restarts from same AZ (broker rack) (#62)
amuraru Jun 10, 2023
16a9fc2
Fix flaky test by deleting nodeports explicitly (#67)
ctrlaltluc Jun 13, 2023
cdfb6b9
Allow dashes when parsing broker rack (#68)
ctrlaltluc Jun 28, 2023
18e7253
Upgrade Kafka to 3.6.0 (#69)
ctrlaltluc Oct 11, 2023
edb7ebf
Upgrade dependencies
amuraru Dec 12, 2023
5f78c06
Fix wrong port on expectEnvoyWithConfigAz2Tls test (#70)
dobrerazvan Dec 19, 2023
7383921
Upgrade Kafka to 3.6.1 (#71)
cristianpetrache Dec 22, 2023
2da23a0
working kraft
dobrerazvan Jan 23, 2024
57d22ec
Merge origin/master
dobrerazvan Jan 29, 2024
51fb7d6
Fixing go.mod
dobrerazvan Feb 7, 2024
bea02f8
Fixing tests post merge
dobrerazvan Feb 7, 2024
7b70fa6
Merge remote-tracking branch 'origin/master' into kraft
dobrerazvan Feb 7, 2024
46cf1a8
Fix tests
dobrerazvan Feb 7, 2024
168340e
Fix tests
dobrerazvan Feb 13, 2024
f205723
Fix unnecessary append
dobrerazvan Feb 13, 2024
2bb6c76
Fix go.mod
dobrerazvan Feb 13, 2024
58aa672
Update CRD
dobrerazvan Feb 13, 2024
112173c
Update go.mod for e2e
dobrerazvan Feb 13, 2024
c927063
Update go.mod
dobrerazvan Feb 13, 2024
b64f2ed
More fixes
dobrerazvan Feb 13, 2024
8172c6e
Upgrade Kafka image to use Java v21 (#72)
amuraru Feb 16, 2024
2fe3531
Merge branch 'master' into kraft
dobrerazvan Feb 19, 2024
20be375
Remove jbod cc tests
dobrerazvan Feb 20, 2024
50c2b07
Merging Master to Kraft and Reverting Rack Removal (#89)
dvaseekara Dec 9, 2024
15f6634
Trigger E2E Test
Dec 9, 2024
b55b1f7
End to end testing with KRaft cluster (#92)
dvaseekara Dec 11, 2024
1b56507
[CORE-119212] - Add/fix pdb for controllers (#94)
hvan Jan 10, 2025
c96e846
Ensure externallisteners are listed first (#97)
hvan Jan 23, 2025
55b625a
Exclude controllers from external ingress (e.g. envoy) (#95)
hvan Jan 24, 2025
5c5dbe7
adding new check for broker level readOnlyConfig (#96)
hvan Jan 27, 2025
9978dfa
Zk kraft migration (#100)
hvan Feb 21, 2025
8feced9
adding fix for koperator to add interBrokerListenerName one at a time…
cawright-rh Feb 27, 2025
4272b8d
[CORE-122420] - Update pod name for Kraft Controllers (#102)
dvaseekara Feb 28, 2025
de38d2e
Merge branch 'master' into kraft
dvaseekara May 16, 2025
2d7fa97
Cleanup Merge
May 16, 2025
08677fc
Cleanup Merge
May 16, 2025
ef14d15
Update .github/workflows/helm.yml
dvaseekara May 22, 2025
7fd54d3
Update api/go.mod
dvaseekara May 22, 2025
6797624
Fixing controller reconcile logic to properly check for controller on…
cawright-rh May 22, 2025
12ade6c
Update api/v1beta1/kafkacluster_types.go
dvaseekara Jun 4, 2025
703409d
Update api/v1beta1/kafkacluster_types.go
dvaseekara Jun 4, 2025
57b4760
Update config/samples/banzaicloud_v1beta1_kafkacluster.yaml
dvaseekara Jun 4, 2025
c306f06
[CORE-126251] - PR Comments Addressed
Jun 4, 2025
a3c7ae3
[CORE-126251] - PR Comments Addressed
Jun 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/helm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:

- name: Add Helm repositories
run: |
# helm repo add banzaicloud-stable "https://kubernetes-charts.banzaicloud.com"
helm repo add incubator "https://charts.helm.sh/incubator"
helm repo add stable "https://charts.helm.sh/stable"

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ test: generate fmt vet bin/setup-envtest
test-e2e:
IMG_E2E=${IMG_E2E} go test github.com/banzaicloud/koperator/tests/e2e \
-v \
-timeout 20m \
-timeout 45m \
-tags e2e \
--ginkgo.show-node-events \
--ginkgo.trace \
Expand Down
12 changes: 12 additions & 0 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
emperror.dev/errors v0.8.1
github.com/banzaicloud/istio-client-go v0.0.17
github.com/cert-manager/cert-manager v1.13.2
// github.com/stretchr/testify v1.8.4
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
gotest.tools v2.2.0+incompatible
k8s.io/api v0.28.4
Expand All @@ -15,6 +16,7 @@ require (
)

require (
// github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/go-cmp v0.5.9 // indirect
Expand All @@ -23,16 +25,26 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
// github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
// gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
)

require github.com/stretchr/testify v1.8.4

require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

// remove once https://github.com/cert-manager/cert-manager/issues/5953 is fixed
replace github.com/Venafi/vcert/v4 => github.com/jetstack/vcert/v4 v4.9.6-0.20230127103832-3aa3dfd6613d
18 changes: 18 additions & 0 deletions api/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,21 @@ func MergeLabels(l ...map[string]string) map[string]string {
func LabelsForKafka(name string) map[string]string {
return map[string]string{"app": "kafka", "kafka_cr": name}
}

func LabelsForBroker(name string) map[string]string {
return map[string]string{"isBrokerNode": "true", "app": "kafka", "kafka_cr": name}
}

func LabelsForController(name string) map[string]string {
return map[string]string{"isControllerNode": "true", "app": "kafka", "kafka_cr": name}
}

// StringSliceContains returns true if list contains s
func StringSliceContains(list []string, s string) bool {
for _, v := range list {
if v == s {
return true
}
}
return false
}
10 changes: 10 additions & 0 deletions api/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,13 @@ func TestMergeLabels(t *testing.T) {
t.Error("Expected:", expected, "Got:", merged)
}
}

func TestStringSliceContains(t *testing.T) {
slice := []string{"1", "2", "3"}
if !StringSliceContains(slice, "1") {
t.Error("Expected slice contains 1, got false")
}
if StringSliceContains(slice, "4") {
t.Error("Expected slice not contains 4, got true")
}
}
96 changes: 92 additions & 4 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,25 @@ const (
KafkaCRLabelKey = "kafka_cr"
BrokerIdLabelKey = "brokerId"

// ProcessRolesKey is used to identify which process roles the Kafka pod has
ProcessRolesKey = "processRoles"

// IsBrokerNodeKey is used to identify if the kafka pod is either a broker or a broker_controller
IsBrokerNodeKey = "isBrokerNode"

// IsControllerNodeKey is used to identify if the kafka pod is a controller or broker_controller
IsControllerNodeKey = "isControllerNode"

// DefaultCruiseControlImage is the default CC image used when users don't specify it in CruiseControlConfig.Image
DefaultCruiseControlImage = "ghcr.io/banzaicloud/cruise-control:2.5.123"

// DefaultKafkaImage is the default Kafka image used when users don't specify it in KafkaClusterSpec.ClusterImage
DefaultKafkaImage = "ghcr.io/banzaicloud/kafka:2.13-3.4.1"

// ControllerNodeProcessRole represents the node is a controller node
ControllerNodeProcessRole = "controller"
// BrokerNodeProcessRole represents the node is a broker node
BrokerNodeProcessRole = "broker"
// These are default values for API keys

/* General Config */
Expand Down Expand Up @@ -134,15 +153,25 @@ const (

// KafkaClusterSpec defines the desired state of KafkaCluster
type KafkaClusterSpec struct {
// kRaft is used to decide where the Kafka cluster is under KRaft mode or ZooKeeper mode.
// This is default to be true; if set to false, the Kafka cluster is in ZooKeeper mode.
// +kubebuilder:default=true
// +optional
KRaftMode bool `json:"kRaft"`
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
// Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint
AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"`
// ZKAddresses specifies the ZooKeeper connection string
// in the form hostname:port where host and port are the host and port of a ZooKeeper server.
ZKAddresses []string `json:"zkAddresses"`
// Under ZooKeeper mode, this is a must-have configuration.
// And if set under KRaft mode, Koperator ignores this configuration.
// +optional
ZKAddresses []string `json:"zkAddresses,omitempty"`
// ZKPath specifies the ZooKeeper chroot path as part
// of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace.
// If set under KRaft mode, Koperator ignores this configuration.
// +optional
ZKPath string `json:"zkPath,omitempty"`
RackAwareness *RackAwareness `json:"rackAwareness,omitempty"`
ClusterImage string `json:"clusterImage,omitempty"`
Expand Down Expand Up @@ -197,6 +226,8 @@ type KafkaClusterStatus struct {
RollingUpgrade RollingUpgradeStatus `json:"rollingUpgradeStatus,omitempty"`
AlertCount int `json:"alertCount"`
ListenerStatuses ListenerStatuses `json:"listenerStatuses,omitempty"`
// ClusterID is a base64-encoded random UUID generated by Koperator to run the Kafka cluster in KRaft mode
ClusterID string `json:"clusterID,omitempty"`
}

// RollingUpgradeStatus defines status of rolling upgrade
Expand Down Expand Up @@ -244,11 +275,12 @@ type DisruptionBudgetWithStrategy struct {
DisruptionBudget `json:",inline"`
// The strategy to be used, either minAvailable or maxUnavailable
// +kubebuilder:validation:Enum=minAvailable;maxUnavailable
Stategy string `json:"strategy,omitempty"`
Strategy string `json:"strategy,omitempty"`
}

// Broker defines the broker basic configuration
type Broker struct {
// id maps to "node.id" configuration in KRaft mode, and it maps to "broker.id" configuration in ZooKeeper mode.
// +kubebuilder:validation:Minimum=0
// +kubebuilder:validation:Maximum=65535
// +kubebuilder:validation:ExclusiveMaximum=true
Expand All @@ -260,6 +292,11 @@ type Broker struct {

// BrokerConfig defines the broker configuration
type BrokerConfig struct {
// processRoles defines the role(s) for this particular Kafka node: "broker", "controller", or both.
// This must be set in KRaft mode. If set in ZooKeeper mode, Koperator ignores this configuration.
// +kubebuilder:validation:MaxItems=2
// +optional
Roles []string `json:"processRoles,omitempty"`
Image string `json:"image,omitempty"`
MetricsReporterImage string `json:"metricsReporterImage,omitempty"`
Config string `json:"config,omitempty"`
Expand Down Expand Up @@ -991,6 +1028,19 @@ func (bConfig *BrokerConfig) GetTerminationGracePeriod() int64 {
return *bConfig.TerminationGracePeriod
}

// GetStorageMountPaths returns a string with comma-separated storage mount paths that the broker uses
func (bConfig *BrokerConfig) GetStorageMountPaths() string {
var mountPaths string
for i, sc := range bConfig.StorageConfigs {
if i != len(bConfig.StorageConfigs)-1 {
mountPaths += sc.MountPath + ","
} else {
mountPaths += sc.MountPath
}
}
return mountPaths
}

// GetNodeSelector returns the node selector for cruise control
func (cConfig *CruiseControlConfig) GetNodeSelector() map[string]string {
return cConfig.NodeSelector
Expand Down Expand Up @@ -1051,11 +1101,24 @@ func (bConfig *BrokerConfig) GetBrokerAnnotations() map[string]string {
}

// GetBrokerLabels returns the labels that are applied to broker pods
func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId int32) map[string]string {
func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId int32, kRaftMode bool) map[string]string {
var kraftLabels map[string]string
if kRaftMode {
kraftLabels = map[string]string{
ProcessRolesKey: strings.Join(bConfig.Roles, "_"),
IsControllerNodeKey: fmt.Sprintf("%t", bConfig.IsControllerNode()),
IsBrokerNodeKey: fmt.Sprintf("%t", bConfig.IsBrokerNode()),
}
} else { // in ZK mode -> new labels for backward compatibility for the headless service when going from ZK to KRaft
kraftLabels = map[string]string{
IsControllerNodeKey: fmt.Sprintf("%t", false),
IsBrokerNodeKey: fmt.Sprintf("%t", true),
}
}
return util.MergeLabels(
bConfig.BrokerLabels,
util.LabelsForKafka(kafkaClusterName),
map[string]string{BrokerIdLabelKey: fmt.Sprintf("%d", brokerId)},
kraftLabels, map[string]string{BrokerIdLabelKey: fmt.Sprintf("%d", brokerId)},
)
}

Expand Down Expand Up @@ -1124,6 +1187,31 @@ func (cConfig *CruiseControlConfig) GetResources() *corev1.ResourceRequirements
}
}

// IsBrokerNode returns true when the broker is a broker node
func (bConfig *BrokerConfig) IsBrokerNode() bool {
return util.StringSliceContains(bConfig.Roles, BrokerNodeProcessRole)
}

// IsControllerNode returns true when the broker is a controller node
func (bConfig *BrokerConfig) IsControllerNode() bool {
return util.StringSliceContains(bConfig.Roles, ControllerNodeProcessRole)
}

// IsBrokerOnlyNode returns true when the broker is a broker-only node
func (bConfig *BrokerConfig) IsBrokerOnlyNode() bool {
return bConfig.IsBrokerNode() && !bConfig.IsControllerNode()
}

// IsControllerOnlyNode returns true when the broker is a controller-only node
func (bConfig *BrokerConfig) IsControllerOnlyNode() bool {
return bConfig.IsControllerNode() && !bConfig.IsBrokerNode()
}

// IsCombinedNode returns true when the broker is a broker + controller node
func (bConfig *BrokerConfig) IsCombinedNode() bool {
return bConfig.IsBrokerNode() && bConfig.IsControllerNode()
}

// GetResources returns the broker specific Kubernetes resource
func (bConfig *BrokerConfig) GetResources() *corev1.ResourceRequirements {
if bConfig.Resources != nil {
Expand Down
Loading
Loading