Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
355e140
Fix flaky test by deleting nodeports explicitly (#67)
ctrlaltluc Jun 13, 2023
bcef0f1
Upgrade Kafka to 3.6.0 (#69)
ctrlaltluc Oct 11, 2023
dab6cf4
Upgrade dependencies
amuraru Dec 12, 2023
78fd5be
Fix wrong port on expectEnvoyWithConfigAz2Tls test (#70)
dobrerazvan Dec 19, 2023
676e733
Upgrade Kafka to 3.6.1 (#71)
cristianpetrache Dec 22, 2023
7c39a63
Upgrade Kafka image to use Java v21 (#72)
amuraru Feb 16, 2024
14c6c9e
Added arm64 to docker build platforms (#73)
azun Feb 28, 2024
509931e
Upgrading Kafka to 3.7.0 (#77)
azun Mar 26, 2024
1dad051
Update codeql-analysis.yml (#78)
amuraru Apr 8, 2024
254cc1c
[INTERNAL] Create uniq leader ID per operator deployment (#76)
dobrerazvan Apr 16, 2024
7f2632a
[INTERNAL] Get watched namespaces from env variable (#75)
dobrerazvan Jun 3, 2024
b37a977
[CORE-106517] Fix outdated config in the sample (#83)
aguzovatii Jun 4, 2024
11b5e9a
Cross-compile koperator for arm and intel. (#84)
dobrerazvan Jun 25, 2024
b1e9544
Adding Contour Ingress support (#82)
dobrerazvan Jun 27, 2024
1f0d47f
Allow property security-inter-broker-protocol (#85)
cawright-rh Aug 14, 2024
bad9200
Revert "Allow concurrent broker restarts from same AZ (broker rack) (…
Nov 14, 2024
9791142
Merge branch 'master' into merge-kraft-2
Nov 14, 2024
c1c0f02
Fixed build issues
Nov 14, 2024
4e5f972
Fix TestGenerateBrokerConfig
Nov 14, 2024
0179d93
Added LoadBalancer for Kind E2E test cluster
Nov 20, 2024
f05f1ee
Added LoadBalancer for Kind E2E test cluster
Nov 20, 2024
3a8e73f
Added LoadBalancer for Kind E2E test cluster
Nov 20, 2024
8f15bec
Added LoadBalancer for Kind E2E test cluster
Nov 20, 2024
2da7c29
Added LoadBalancer for Kind E2E test cluster
Nov 20, 2024
a174dfb
Added LoadBalancer for Kind E2E test cluster
Nov 20, 2024
69634e1
Added LoadBalancer for Kind E2E test cluster
Nov 20, 2024
5faf114
Added LoadBalancer for Kind E2E test cluster
Nov 20, 2024
bd1ccb5
Added LoadBalancer for Kind E2E test cluster
Nov 20, 2024
deded52
Added LoadBalancer for Kind E2E test cluster
Nov 20, 2024
4093c68
Added LoadBalancer for Kind E2E test cluster
Nov 20, 2024
bec67ce
Added LoadBalancer for Kind E2E test cluster
Nov 20, 2024
3d8ed45
Added watch namesapces
Nov 21, 2024
6d4c370
Added tmate for debugging
Nov 21, 2024
d68510a
Added tmate for debugging
Nov 21, 2024
656df6d
Added tmate for debugging
Nov 21, 2024
095a77b
Added enabled projectcontour helm install
Nov 21, 2024
cb84940
Enabled cloud-provider-kind
Nov 21, 2024
20577d4
Added ProjectContour cluster role
Nov 21, 2024
5ecf22e
updated certificate name
Nov 22, 2024
2a5eb1e
updated certificate name
Nov 22, 2024
bd22523
Run without SSL
Nov 22, 2024
697dcb3
Removing Project Contour
Nov 25, 2024
f373314
Adding cloud-provider-kind
Nov 25, 2024
d073326
Removing cloud-provider - manually adding during test
Nov 25, 2024
11beda1
trigger test
Nov 25, 2024
44ecbaa
Remove SnpshotClusterAndCompare
Nov 25, 2024
88710c7
Increased log length for Snapshot and Compare
Nov 25, 2024
5f26efc
Re-Add Snapshot and compare
Nov 25, 2024
c04b3fa
Increased log length for Snapshot and Compare
Nov 25, 2024
3f99b3e
Increased log length even more
Nov 25, 2024
c13ddf0
Add Uninstall Contour CRDs
Nov 25, 2024
4a94850
Re-Add KafkaCluster_SSL Tests
Nov 25, 2024
9e0ea26
Removing BanzaiCloud Helm Chart from list of repos
Nov 26, 2024
b428487
pushing up latest go.sum
Nov 26, 2024
48690ca
Merge pull request #2 from dvaseekara/contour-e2e
dvaseekara Nov 26, 2024
1a07e63
Merge branch 'master' of https://github.com/dvaseekara/koperator into…
Nov 26, 2024
7143005
Merge branch 'merge-kraft-2' of https://github.com/dvaseekara/koperat…
Nov 26, 2024
38cf906
Clean up Merge
Nov 26, 2024
e17bb79
Enabling Tmate to debug e2e Test
Nov 26, 2024
5c5b19c
Revert Cert Changes
Nov 26, 2024
8f5aa5d
Revert "Revert Cert Changes"
Nov 26, 2024
116c89e
Enable sslClientAuth
Nov 26, 2024
3b699c9
trigger test
Nov 26, 2024
615f46c
WIP: Fix Listener Config
Nov 27, 2024
7fc9ebf
Clean up test case results - tc-1
Dec 2, 2024
251679a
Clean up test case results - tc-2
Dec 2, 2024
0943500
Updated Kraft Test Cases
Dec 2, 2024
d691828
Cleanup Linting Issues
Dec 3, 2024
c99efe2
Remove Tmate Debugger
Dec 3, 2024
8fc0f46
Run Kraft CLuster E2E
Dec 4, 2024
3f62f8e
Increate Timeout to allow pod termination
Dec 4, 2024
2cb57d8
Trigger Test
Dec 4, 2024
5aa37f3
Added Debugger
Dec 4, 2024
a3cf8a5
Fix App Labels for Controllers
Dec 5, 2024
fcef62f
Revert image upate
Dec 5, 2024
8315b04
Revert "Fix App Labels for Controllers"
Dec 5, 2024
7548d5e
Include Broker/Controller Labels for Headless SVC Selector
Dec 5, 2024
b3129c6
Logic for controller listener
Dec 5, 2024
7af19c0
gMerge branch 'headless-service' of https://github.com/dvaseekara/kop…
Dec 5, 2024
824ad63
add controller service
Dec 5, 2024
eb867ac
Added Headless-Controller-SVC Labels
Dec 5, 2024
a1095ad
Fix controller addresses and labels for brokers
Dec 5, 2024
1777a1a
Merge pull request #6 from dvaseekara/fix
musubi7726 Dec 5, 2024
9b7c667
Empty commit to trigger e2e
Dec 6, 2024
69eabbf
Set up kafka-3 as controller only for troubleshooting
Dec 6, 2024
1cb149b
Empty commit to trigger e2e
Dec 9, 2024
cfe1481
Use controller address for JMXTemplate
Dec 9, 2024
b36dbef
Update uninstall timeout to 600s
Dec 9, 2024
5eba833
fix lint
Dec 9, 2024
5056200
fix lint
Dec 9, 2024
d1eeae8
fix lint
Dec 9, 2024
a5f552f
Merge pull request #7 from adobe/fix
musubi7726 Dec 9, 2024
c1b1cb2
Merge branch 'kraft' into e2e-kraft
dvaseekara Dec 9, 2024
d92f00f
Enable TMate Debugger
Dec 9, 2024
4c55ec2
Trigger E2E
Dec 9, 2024
c91a238
Updated BrokerIdLabelkey
Dec 10, 2024
4309f81
Updated BrokerIdLabelkey
Dec 10, 2024
ab46d31
Check for Kraft mode when setting the controller listener
Dec 10, 2024
4fba963
Check for Kraft mode when setting the controller listener
Dec 10, 2024
f266705
Disable tmate from e2e test
Dec 10, 2024
8846984
adding //nolint:unparam to testProduceConsumeInternal func now that i…
Dec 10, 2024
2974655
moving the //nolint:unparam to the none-ssl version
Dec 10, 2024
0c26cce
Fixed BrokerLabel Test
Dec 11, 2024
f2a87f1
Add additional test cases for TestGetBrokerLabels
Dec 11, 2024
a28ee79
Add additional test cases for TestGetBrokerLabels
Dec 11, 2024
40c49d8
commenting out broker-1 in test to fix kraft test
Dec 11, 2024
6e187e5
adding conditional to check if kraft mode is enabled before selecting…
Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ test: generate fmt vet bin/setup-envtest
test-e2e:
IMG_E2E=${IMG_E2E} go test github.com/banzaicloud/koperator/tests/e2e \
-v \
-timeout 20m \
-timeout 45m \
-tags e2e \
--ginkgo.show-node-events \
--ginkgo.trace \
Expand Down
8 changes: 8 additions & 0 deletions api/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ func LabelsForKafka(name string) map[string]string {
return map[string]string{"app": "kafka", "kafka_cr": name}
}

func LabelsForBroker(name string) map[string]string {
return map[string]string{"isBrokerNode": "true", "app": "kafka", "kafka_cr": name}
}

func LabelsForController(name string) map[string]string {
return map[string]string{"isControllerNode": "true", "app": "kafka", "kafka_cr": name}
}

// StringSliceContains returns true if list contains s
func StringSliceContains(list []string, s string) bool {
for _, v := range list {
Expand Down
21 changes: 16 additions & 5 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ const (
// ProcessRolesKey is used to identify which process roles the Kafka pod has
ProcessRolesKey = "processRoles"

// IsBrokerNodeKey is used to identify if the kafka pod is either a broker or a broker_controller
IsBrokerNodeKey = "isBrokerNode"

// IsControllerNodeKey is used to identify if the kafka pod is a controller or broker_controller
IsControllerNodeKey = "isControllerNode"

// DefaultCruiseControlImage is the default CC image used when users don't specify it in CruiseControlConfig.Image
DefaultCruiseControlImage = "ghcr.io/banzaicloud/cruise-control:2.5.123"

Expand Down Expand Up @@ -1095,14 +1101,19 @@ func (bConfig *BrokerConfig) GetBrokerAnnotations() map[string]string {
}

// GetBrokerLabels returns the labels that are applied to broker pods
func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId int32) map[string]string {
func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId int32, kRaftMode bool) map[string]string {
kraftLabels := make(map[string]string, 0)
if kRaftMode {
kraftLabels = map[string]string{
ProcessRolesKey: strings.Join(bConfig.Roles, "_"),
IsControllerNodeKey: fmt.Sprintf("%t", bConfig.IsControllerNode()),
IsBrokerNodeKey: fmt.Sprintf("%t", bConfig.IsBrokerNode()),
}
}
return util.MergeLabels(
bConfig.BrokerLabels,
util.LabelsForKafka(kafkaClusterName),
map[string]string{
BrokerIdLabelKey: fmt.Sprintf("%d", brokerId),
ProcessRolesKey: strings.Join(bConfig.Roles, "_"),
},
kraftLabels, map[string]string{BrokerIdLabelKey: fmt.Sprintf("%d", brokerId)},
)
}

Expand Down
124 changes: 118 additions & 6 deletions api/v1beta1/kafkacluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,125 @@ func TestGetBrokerLabels(t *testing.T) {

expectedBrokerId = 0
)
testCases := []struct {
testName string
brokerConfig *BrokerConfig
expectedLabels map[string]string
kRaftMode bool
}{
{
testName: "Labels in zookeeper mode",
expectedLabels: map[string]string{
AppLabelKey: expectedDefaultLabelApp,
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
KafkaCRLabelKey: expectedKafkaCRName,
"test_label_key": "test_label_value",
},
brokerConfig: &BrokerConfig{
Roles: nil,
BrokerLabels: map[string]string{
AppLabelKey: "test_app",
BrokerIdLabelKey: "test_id",
KafkaCRLabelKey: "test_cr_name",
"test_label_key": "test_label_value",
},
},
kRaftMode: false,
},
{
testName: "Labels for broker in kraft mode",
expectedLabels: map[string]string{
AppLabelKey: expectedDefaultLabelApp,
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
KafkaCRLabelKey: expectedKafkaCRName,
"test_label_key": "test_label_value",
ProcessRolesKey: "broker",
IsBrokerNodeKey: "true",
IsControllerNodeKey: "false",
},
brokerConfig: &BrokerConfig{
Roles: []string{"broker"},
BrokerLabels: map[string]string{
AppLabelKey: "test_app",
BrokerIdLabelKey: "test_id",
KafkaCRLabelKey: "test_cr_name",
"test_label_key": "test_label_value",
},
},
kRaftMode: true,
},
{
testName: "Labels for controller in kraft mode",
expectedLabels: map[string]string{
AppLabelKey: expectedDefaultLabelApp,
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
KafkaCRLabelKey: expectedKafkaCRName,
"test_label_key": "test_label_value",
ProcessRolesKey: "controller",
IsBrokerNodeKey: "false",
IsControllerNodeKey: "true",
},
brokerConfig: &BrokerConfig{
Roles: []string{"controller"},
BrokerLabels: map[string]string{
AppLabelKey: "test_app",
BrokerIdLabelKey: "test_id",
KafkaCRLabelKey: "test_cr_name",
"test_label_key": "test_label_value",
},
},
kRaftMode: true,
},
{
testName: "Labels for controller/broker in kraft mode",
expectedLabels: map[string]string{
AppLabelKey: expectedDefaultLabelApp,
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
KafkaCRLabelKey: expectedKafkaCRName,
"test_label_key": "test_label_value",
ProcessRolesKey: "controller_broker",
IsBrokerNodeKey: "true",
IsControllerNodeKey: "true",
},
brokerConfig: &BrokerConfig{
Roles: []string{"controller", "broker"},
BrokerLabels: map[string]string{
AppLabelKey: "test_app",
BrokerIdLabelKey: "test_id",
KafkaCRLabelKey: "test_cr_name",
"test_label_key": "test_label_value",
},
},
kRaftMode: true,
},
}

for _, test := range testCases {
t.Run(test.testName, func(t *testing.T) {
result := test.brokerConfig.GetBrokerLabels(expectedKafkaCRName, expectedBrokerId, test.kRaftMode)
if !reflect.DeepEqual(result, test.expectedLabels) {
t.Error("Expected:", test.expectedLabels, "Got:", result)
}
})
}
}

func TestGetBrokerLabelKraft(t *testing.T) {
const (
expectedDefaultLabelApp = "kafka"
expectedKafkaCRName = "kafka"

expectedBrokerId = 0
)

expected := map[string]string{
AppLabelKey: expectedDefaultLabelApp,
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
KafkaCRLabelKey: expectedKafkaCRName,
"test_label_key": "test_label_value",
ProcessRolesKey: "broker",
AppLabelKey: expectedDefaultLabelApp,
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
KafkaCRLabelKey: expectedKafkaCRName,
"test_label_key": "test_label_value",
ProcessRolesKey: "broker",
IsBrokerNodeKey: "true",
IsControllerNodeKey: "false",
}

brokerConfig := &BrokerConfig{
Expand All @@ -455,7 +567,7 @@ func TestGetBrokerLabels(t *testing.T) {
},
}

result := brokerConfig.GetBrokerLabels(expectedKafkaCRName, expectedBrokerId)
result := brokerConfig.GetBrokerLabels(expectedKafkaCRName, expectedBrokerId, true)

if !reflect.DeepEqual(result, expected) {
t.Error("Expected:", expected, "Got:", result)
Expand Down
2 changes: 1 addition & 1 deletion config/samples/kraft/simplekafkacluster_kraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ spec:
brokerConfig:
processRoles:
- controller
- broker
# - broker
- id: 4
brokerConfigGroup: "default"
brokerConfig:
Expand Down
114 changes: 78 additions & 36 deletions controllers/tests/kafkacluster_controller_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,48 +525,90 @@ func expectKafkaCRStatus(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster
Expect(kafkaCluster.Status.State).To(Equal(v1beta1.KafkaClusterRunning))
Expect(kafkaCluster.Status.AlertCount).To(Equal(0))

Expect(kafkaCluster.Status.ListenerStatuses).To(Equal(v1beta1.ListenerStatuses{
InternalListeners: map[string]v1beta1.ListenerStatusList{
"internal": {
{
Name: "any-broker",
Address: fmt.Sprintf("%s-all-broker.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
},
{
Name: "broker-0",
Address: fmt.Sprintf("%s-0.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
},
{
Name: "broker-1",
Address: fmt.Sprintf("%s-1.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
},
{
Name: "broker-2",
Address: fmt.Sprintf("%s-2.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
if kafkaCluster.Spec.KRaftMode == false {
Expect(kafkaCluster.Status.ListenerStatuses).To(Equal(v1beta1.ListenerStatuses{
InternalListeners: map[string]v1beta1.ListenerStatusList{
"internal": {
{
Name: "any-broker",
Address: fmt.Sprintf("%s-all-broker.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
},
{
Name: "broker-0",
Address: fmt.Sprintf("%s-0.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
},
{
Name: "broker-1",
Address: fmt.Sprintf("%s-1.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
},
{
Name: "broker-2",
Address: fmt.Sprintf("%s-2.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
},
},
},
},
ExternalListeners: map[string]v1beta1.ListenerStatusList{
"test": {
{
Name: "any-broker",
Address: "test.host.com:29092",
},
{
Name: "broker-0",
Address: "test.host.com:19090",
ExternalListeners: map[string]v1beta1.ListenerStatusList{
"test": {
{
Name: "any-broker",
Address: "test.host.com:29092",
},
{
Name: "broker-0",
Address: "test.host.com:19090",
},
{
Name: "broker-1",
Address: "test.host.com:19091",
},
{
Name: "broker-2",
Address: "test.host.com:19092",
},
},
{
Name: "broker-1",
Address: "test.host.com:19091",
},
}))
}
if kafkaCluster.Spec.KRaftMode == true {
Expect(kafkaCluster.Status.ListenerStatuses).To(Equal(v1beta1.ListenerStatuses{
InternalListeners: map[string]v1beta1.ListenerStatusList{
"internal": {
{
Name: "any-broker",
Address: fmt.Sprintf("%s-all-broker.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
},
{
Name: "broker-0",
Address: fmt.Sprintf("%s-0.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
},
{
Name: "broker-2",
Address: fmt.Sprintf("%s-2.%s.svc.cluster.local:29092", kafkaCluster.Name, kafkaCluster.Namespace),
},
},
{
Name: "broker-2",
Address: "test.host.com:19092",
},
ExternalListeners: map[string]v1beta1.ListenerStatusList{
"test": {
{
Name: "any-broker",
Address: "test.host.com:29092",
},
{
Name: "broker-0",
Address: "test.host.com:19090",
},
{
Name: "broker-1",
Address: "test.host.com:19091",
},
{
Name: "broker-2",
Address: "test.host.com:19092",
},
},
},
},
}))
}))
}
for _, brokerState := range kafkaCluster.Status.BrokersState {
Expect(brokerState.Version).To(Equal("3.4.1"))
Expect(brokerState.Image).To(Equal(kafkaCluster.Spec.GetClusterImage()))
Expand Down
14 changes: 10 additions & 4 deletions pkg/jmxextractor/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import (
)

const (
headlessServiceJMXTemplate = "http://%s-%d." + kafka.HeadlessServiceTemplate + ".%s.svc.%s:%d"
serviceJMXTemplate = "http://%s-%d.%s.svc.%s:%d"
versionRegexGroup = "version"
headlessServiceJMXTemplate = "http://%s-%d." + kafka.HeadlessServiceTemplate + ".%s.svc.%s:%d"
headlessControllerServiceJMXTemplate = "http://%s-%d." + kafka.HeadlessControllerServiceTemplate + ".%s.svc.%s:%d"
serviceJMXTemplate = "http://%s-%d.%s.svc.%s:%d"
versionRegexGroup = "version"
)

var newJMXExtractor = createNewJMXExtractor
Expand Down Expand Up @@ -74,9 +75,14 @@ func NewMockJMXExtractor() {
func (exp *jmxExtractor) ExtractDockerImageAndVersion(brokerId int32, brokerConfig *v1beta1.BrokerConfig,
clusterImage string, headlessServiceEnabled bool) (*v1beta1.KafkaVersion, error) {
var requestURL string

if headlessServiceEnabled {
var jmxTemplate = headlessServiceJMXTemplate
if brokerConfig.IsControllerNode() {
jmxTemplate = headlessControllerServiceJMXTemplate
}
requestURL =
fmt.Sprintf(headlessServiceJMXTemplate,
fmt.Sprintf(jmxTemplate,
exp.clusterName, brokerId, exp.clusterName, exp.clusterNamespace,
exp.kubernetesClusterDomain, 9020)
} else {
Expand Down
Loading
Loading