Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Commit 5a64fb0

Browse files
authored
Fix Kafka super.users override (#923)
1 parent 8b57f6e commit 5a64fb0

File tree

2 files changed

+187
-4
lines changed

2 files changed

+187
-4
lines changed

pkg/resources/kafka/configmap.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,44 @@ func generateListenerSSLConfig(config *properties.Properties, name string, sslCl
346346
}
347347
}
348348

349+
// mergeSuperUsersPropertyValue merges the target and source super.users property value, and returns it as string.
350+
// It returns empty string when there were no updates or any of the super.users property value was empty.
351+
func mergeSuperUsersPropertyValue(source *properties.Properties, target *properties.Properties) string {
352+
sourceVal, foundSource := source.Get("super.users")
353+
if !foundSource || sourceVal.IsEmpty() {
354+
return ""
355+
}
356+
targetVal, foundTarget := target.Get("super.users")
357+
if !foundTarget || targetVal.IsEmpty() {
358+
return ""
359+
}
360+
361+
sourceSuperUsers := strings.Split(sourceVal.Value(), ";")
362+
targetSuperUsers := strings.Split(targetVal.Value(), ";")
363+
364+
inserted := false
365+
for _, sourceSuperUser := range sourceSuperUsers {
366+
found := false
367+
for _, targetSuperUser := range targetSuperUsers {
368+
if sourceSuperUser == targetSuperUser {
369+
found = true
370+
break
371+
}
372+
}
373+
374+
if !found {
375+
inserted = true
376+
targetSuperUsers = append(targetSuperUsers, sourceSuperUser)
377+
}
378+
}
379+
380+
if inserted {
381+
return strings.Join(targetSuperUsers, ";")
382+
}
383+
384+
return ""
385+
}
386+
349387
func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerConfig, extListenerStatuses,
350388
intListenerStatuses, controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList,
351389
serverPasses map[string]string, clientPass string, superUsers []string, log logr.Logger) string {
@@ -356,6 +394,13 @@ func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerC
356394

357395
// Merge operator generated configuration to the final one
358396
if opGenConf != nil {
397+
// When there is custom super.users configuration we merge its value with the Koperator generated one
398+
// to avoid overwrite that happens when the finalBrokerConfig.Merge(opGenConf) is called.
399+
if suMerged := mergeSuperUsersPropertyValue(finalBrokerConfig, opGenConf); suMerged != "" {
400+
// Setting string value for a property is not going to run into error, also we don't return error in this function
401+
//nolint:errcheck
402+
opGenConf.Set("super.users", suMerged)
403+
}
359404
finalBrokerConfig.Merge(opGenConf)
360405
}
361406

pkg/resources/kafka/configmap_test.go

Lines changed: 142 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ package kafka
1616

1717
import (
1818
"reflect"
19+
"strings"
1920
"testing"
2021

2122
"github.com/go-logr/logr"
2223
v1 "k8s.io/api/core/v1"
2324
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2425

25-
"github.com/banzaicloud/koperator/pkg/util"
2626
kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka"
2727

2828
"github.com/stretchr/testify/mock"
@@ -464,6 +464,146 @@ listener.security.protocol.map=INTERNAL:SSL
464464
listeners=INTERNAL://:9092
465465
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
466466
super.users=User:CN=kafka-headless.kafka.svc.cluster.local
467+
zookeeper.connect=example.zk:2181/`,
468+
},
469+
{
470+
testName: "configWithSSL_with_readOnly-superUsers1",
471+
readOnlyConfig: `super.users=User:CN=custom-superuser1;User:CN=custom-superuser2`,
472+
zkAddresses: []string{"example.zk:2181"},
473+
zkPath: ``,
474+
kubernetesClusterDomain: ``,
475+
clusterWideConfig: ``,
476+
perBrokerConfig: ``,
477+
perBrokerReadOnlyConfig: ``,
478+
advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`,
479+
listenerType: "ssl",
480+
sslClientAuth: "none",
481+
expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092
482+
broker.id=0
483+
cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092
484+
cruise.control.metrics.reporter.kubernetes.mode=true
485+
cruise.control.metrics.reporter.security.protocol=SSL
486+
cruise.control.metrics.reporter.ssl.keystore.location=/var/run/secrets/java.io/keystores/client/keystore.jks
487+
cruise.control.metrics.reporter.ssl.keystore.password=keystore_clientpassword123
488+
cruise.control.metrics.reporter.ssl.truststore.location=/var/run/secrets/java.io/keystores/client/truststore.jks
489+
cruise.control.metrics.reporter.ssl.truststore.password=keystore_clientpassword123
490+
inter.broker.listener.name=INTERNAL
491+
listener.name.internal.ssl.client.auth=none
492+
listener.name.internal.ssl.keystore.location=/var/run/secrets/java.io/keystores/server/internal/keystore.jks
493+
listener.name.internal.ssl.keystore.password=keystore_serverpassword123
494+
listener.name.internal.ssl.keystore.type=JKS
495+
listener.name.internal.ssl.truststore.location=/var/run/secrets/java.io/keystores/server/internal/truststore.jks
496+
listener.name.internal.ssl.truststore.password=keystore_serverpassword123
497+
listener.name.internal.ssl.truststore.type=JKS
498+
listener.security.protocol.map=INTERNAL:SSL
499+
listeners=INTERNAL://:9092
500+
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
501+
super.users=User:CN=kafka-headless.kafka.svc.cluster.local;User:CN=custom-superuser1;User:CN=custom-superuser2
502+
zookeeper.connect=example.zk:2181/`,
503+
},
504+
{
505+
testName: "configWithSSL_with_readOnly-superUsers2",
506+
readOnlyConfig: `super.users=User:CN=kafka-headless.kafka.svc.cluster.local`,
507+
zkAddresses: []string{"example.zk:2181"},
508+
zkPath: ``,
509+
kubernetesClusterDomain: ``,
510+
clusterWideConfig: ``,
511+
perBrokerConfig: ``,
512+
perBrokerReadOnlyConfig: ``,
513+
advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`,
514+
listenerType: "ssl",
515+
sslClientAuth: "none",
516+
expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092
517+
broker.id=0
518+
cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092
519+
cruise.control.metrics.reporter.kubernetes.mode=true
520+
cruise.control.metrics.reporter.security.protocol=SSL
521+
cruise.control.metrics.reporter.ssl.keystore.location=/var/run/secrets/java.io/keystores/client/keystore.jks
522+
cruise.control.metrics.reporter.ssl.keystore.password=keystore_clientpassword123
523+
cruise.control.metrics.reporter.ssl.truststore.location=/var/run/secrets/java.io/keystores/client/truststore.jks
524+
cruise.control.metrics.reporter.ssl.truststore.password=keystore_clientpassword123
525+
inter.broker.listener.name=INTERNAL
526+
listener.name.internal.ssl.client.auth=none
527+
listener.name.internal.ssl.keystore.location=/var/run/secrets/java.io/keystores/server/internal/keystore.jks
528+
listener.name.internal.ssl.keystore.password=keystore_serverpassword123
529+
listener.name.internal.ssl.keystore.type=JKS
530+
listener.name.internal.ssl.truststore.location=/var/run/secrets/java.io/keystores/server/internal/truststore.jks
531+
listener.name.internal.ssl.truststore.password=keystore_serverpassword123
532+
listener.name.internal.ssl.truststore.type=JKS
533+
listener.security.protocol.map=INTERNAL:SSL
534+
listeners=INTERNAL://:9092
535+
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
536+
super.users=User:CN=kafka-headless.kafka.svc.cluster.local
537+
zookeeper.connect=example.zk:2181/`,
538+
},
539+
{
540+
testName: "configWithSSL_with_readOnly-superUsers3",
541+
readOnlyConfig: `super.users=User:CN=custom-superuser1;User:CN=custom-superuser2;User:CN=kafka-headless.kafka.svc.cluster.local`,
542+
zkAddresses: []string{"example.zk:2181"},
543+
zkPath: ``,
544+
kubernetesClusterDomain: ``,
545+
clusterWideConfig: ``,
546+
perBrokerConfig: ``,
547+
perBrokerReadOnlyConfig: ``,
548+
advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`,
549+
listenerType: "ssl",
550+
sslClientAuth: "none",
551+
expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092
552+
broker.id=0
553+
cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092
554+
cruise.control.metrics.reporter.kubernetes.mode=true
555+
cruise.control.metrics.reporter.security.protocol=SSL
556+
cruise.control.metrics.reporter.ssl.keystore.location=/var/run/secrets/java.io/keystores/client/keystore.jks
557+
cruise.control.metrics.reporter.ssl.keystore.password=keystore_clientpassword123
558+
cruise.control.metrics.reporter.ssl.truststore.location=/var/run/secrets/java.io/keystores/client/truststore.jks
559+
cruise.control.metrics.reporter.ssl.truststore.password=keystore_clientpassword123
560+
inter.broker.listener.name=INTERNAL
561+
listener.name.internal.ssl.client.auth=none
562+
listener.name.internal.ssl.keystore.location=/var/run/secrets/java.io/keystores/server/internal/keystore.jks
563+
listener.name.internal.ssl.keystore.password=keystore_serverpassword123
564+
listener.name.internal.ssl.keystore.type=JKS
565+
listener.name.internal.ssl.truststore.location=/var/run/secrets/java.io/keystores/server/internal/truststore.jks
566+
listener.name.internal.ssl.truststore.password=keystore_serverpassword123
567+
listener.name.internal.ssl.truststore.type=JKS
568+
listener.security.protocol.map=INTERNAL:SSL
569+
listeners=INTERNAL://:9092
570+
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
571+
super.users=User:CN=kafka-headless.kafka.svc.cluster.local;User:CN=custom-superuser1;User:CN=custom-superuser2
572+
zookeeper.connect=example.zk:2181/`,
573+
},
574+
{
575+
testName: "configWithSSL_with_readOnly-superUsers4",
576+
readOnlyConfig: `super.users=`,
577+
zkAddresses: []string{"example.zk:2181"},
578+
zkPath: ``,
579+
kubernetesClusterDomain: ``,
580+
clusterWideConfig: ``,
581+
perBrokerConfig: ``,
582+
perBrokerReadOnlyConfig: ``,
583+
advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`,
584+
listenerType: "ssl",
585+
sslClientAuth: "none",
586+
expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092
587+
broker.id=0
588+
cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092
589+
cruise.control.metrics.reporter.kubernetes.mode=true
590+
cruise.control.metrics.reporter.security.protocol=SSL
591+
cruise.control.metrics.reporter.ssl.keystore.location=/var/run/secrets/java.io/keystores/client/keystore.jks
592+
cruise.control.metrics.reporter.ssl.keystore.password=keystore_clientpassword123
593+
cruise.control.metrics.reporter.ssl.truststore.location=/var/run/secrets/java.io/keystores/client/truststore.jks
594+
cruise.control.metrics.reporter.ssl.truststore.password=keystore_clientpassword123
595+
inter.broker.listener.name=INTERNAL
596+
listener.name.internal.ssl.client.auth=none
597+
listener.name.internal.ssl.keystore.location=/var/run/secrets/java.io/keystores/server/internal/keystore.jks
598+
listener.name.internal.ssl.keystore.password=keystore_serverpassword123
599+
listener.name.internal.ssl.keystore.type=JKS
600+
listener.name.internal.ssl.truststore.location=/var/run/secrets/java.io/keystores/server/internal/truststore.jks
601+
listener.name.internal.ssl.truststore.password=keystore_serverpassword123
602+
listener.name.internal.ssl.truststore.type=JKS
603+
listener.security.protocol.map=INTERNAL:SSL
604+
listeners=INTERNAL://:9092
605+
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
606+
super.users=User:CN=kafka-headless.kafka.svc.cluster.local
467607
zookeeper.connect=example.zk:2181/`,
468608
},
469609
}
@@ -537,9 +677,7 @@ zookeeper.connect=example.zk:2181/`,
537677
superUsers []string
538678
)
539679

540-
sslConfigTestNames := []string{"configWithSSL_SSLClientAuth_not_provided", "configWithSSL_SSLClientAuth_required", "configWithSSL_SSLClientAuth_requested",
541-
"configWithSSL_SSLClientAuth_none"}
542-
if util.StringSliceContains(sslConfigTestNames, test.testName) {
680+
if strings.Contains(test.testName, "configWithSSL") {
543681
serverPasses = map[string]string{"internal": "keystore_serverpassword123"}
544682
clientPass = "keystore_clientpassword123"
545683
superUsers = []string{"CN=kafka-headless.kafka.svc.cluster.local"}

0 commit comments

Comments
 (0)