@@ -457,7 +457,7 @@ public void testOpaAuthorizationWithTls() {
457457 @ ParallelTest
458458 public void testNullUserConfiguration () {
459459 String configuration = new KafkaBrokerConfigurationBuilder (Reconciliation .DUMMY_RECONCILIATION , NODE_REF )
460- .withUserConfiguration (null , false )
460+ .withUserConfiguration (null , false , false )
461461 .build ();
462462
463463 assertThat (configuration , isEquivalent ("node.id=2" ,
@@ -473,7 +473,7 @@ public void testNullUserConfiguration() {
473473 @ ParallelTest
474474 public void testNullUserConfigurationAndCCReporter () {
475475 String configuration = new KafkaBrokerConfigurationBuilder (Reconciliation .DUMMY_RECONCILIATION , NODE_REF )
476- .withUserConfiguration (null , true )
476+ .withUserConfiguration (null , true , false )
477477 .build ();
478478
479479 assertThat (configuration , isEquivalent ("node.id=2" ,
@@ -487,13 +487,30 @@ public void testNullUserConfigurationAndCCReporter() {
487487 "metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter" ));
488488 }
489489
490+ @ ParallelTest
491+ public void testNullUserConfigurationAndMetricsEnabled () {
492+ String configuration = new KafkaBrokerConfigurationBuilder (Reconciliation .DUMMY_RECONCILIATION , NODE_REF )
493+ .withUserConfiguration (null , false , true )
494+ .build ();
495+
496+ assertThat (configuration , isEquivalent ("node.id=2" ,
497+ "config.providers=strimzienv,strimzifile,strimzidir" ,
498+ "config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider" ,
499+ "config.providers.strimzienv.param.allowlist.pattern=.*" ,
500+ "config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider" ,
501+ "config.providers.strimzifile.param.allowed.paths=/opt/kafka" ,
502+ "config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider" ,
503+ "config.providers.strimzidir.param.allowed.paths=/opt/kafka" ,
504+ "metric.reporters=org.apache.kafka.common.metrics.JmxReporter" ));
505+ }
506+
490507 @ ParallelTest
491508 public void testEmptyUserConfiguration () {
492509 Map <String , Object > userConfiguration = new HashMap <>();
493510 KafkaConfiguration kafkaConfiguration = new KafkaConfiguration (Reconciliation .DUMMY_RECONCILIATION , userConfiguration .entrySet ());
494511
495512 String configuration = new KafkaBrokerConfigurationBuilder (Reconciliation .DUMMY_RECONCILIATION , NODE_REF )
496- .withUserConfiguration (kafkaConfiguration , false )
513+ .withUserConfiguration (kafkaConfiguration , false , false )
497514 .build ();
498515
499516 assertThat (configuration , isEquivalent ("node.id=2" ,
@@ -517,7 +534,7 @@ public void testUserConfiguration() {
517534 KafkaConfiguration kafkaConfiguration = new KafkaConfiguration (Reconciliation .DUMMY_RECONCILIATION , userConfiguration .entrySet ());
518535
519536 String configuration = new KafkaBrokerConfigurationBuilder (Reconciliation .DUMMY_RECONCILIATION , NODE_REF )
520- .withUserConfiguration (kafkaConfiguration , false )
537+ .withUserConfiguration (kafkaConfiguration , false , false )
521538 .build ();
522539
523540 assertThat (configuration , isEquivalent ("node.id=2" ,
@@ -544,7 +561,7 @@ public void testUserConfigurationWithConfigProviders() {
544561
545562 // Broker
546563 String configuration = new KafkaBrokerConfigurationBuilder (Reconciliation .DUMMY_RECONCILIATION , NODE_REF )
547- .withUserConfiguration (kafkaConfiguration , false )
564+ .withUserConfiguration (kafkaConfiguration , false , false )
548565 .build ();
549566
550567 assertThat (configuration , isEquivalent ("node.id=2" ,
@@ -559,7 +576,7 @@ public void testUserConfigurationWithConfigProviders() {
559576
560577 // Controller
561578 configuration = new KafkaBrokerConfigurationBuilder (Reconciliation .DUMMY_RECONCILIATION , new NodeRef ("my-cluster-kafka-3" , 3 , "kafka" , true , false ))
562- .withUserConfiguration (kafkaConfiguration , false )
579+ .withUserConfiguration (kafkaConfiguration , false , false )
563580 .build ();
564581
565582 assertThat (configuration , isEquivalent ("node.id=3" ,
@@ -580,7 +597,7 @@ public void testUserConfigurationWithCCMetricsReporter() {
580597 KafkaConfiguration kafkaConfiguration = new KafkaConfiguration (Reconciliation .DUMMY_RECONCILIATION , userConfiguration .entrySet ());
581598
582599 String configuration = new KafkaBrokerConfigurationBuilder (Reconciliation .DUMMY_RECONCILIATION , NODE_REF )
583- .withUserConfiguration (kafkaConfiguration , true )
600+ .withUserConfiguration (kafkaConfiguration , true , false )
584601 .build ();
585602
586603 assertThat (configuration , isEquivalent ("node.id=2" ,
@@ -598,6 +615,35 @@ public void testUserConfigurationWithCCMetricsReporter() {
598615 "metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter" ));
599616 }
600617
618+ @ ParallelTest
619+ public void testUserConfigurationWithCCMetricsReporterAndMetricsEnabled () {
620+ Map <String , Object > userConfiguration = new HashMap <>();
621+ userConfiguration .put ("auto.create.topics.enable" , "false" );
622+ userConfiguration .put ("offsets.topic.replication.factor" , 3 );
623+ userConfiguration .put ("transaction.state.log.replication.factor" , 3 );
624+ userConfiguration .put ("transaction.state.log.min.isr" , 2 );
625+
626+ KafkaConfiguration kafkaConfiguration = new KafkaConfiguration (Reconciliation .DUMMY_RECONCILIATION , userConfiguration .entrySet ());
627+
628+ String configuration = new KafkaBrokerConfigurationBuilder (Reconciliation .DUMMY_RECONCILIATION , NODE_REF )
629+ .withUserConfiguration (kafkaConfiguration , true , true )
630+ .build ();
631+
632+ assertThat (configuration , isEquivalent ("node.id=2" ,
633+ "config.providers=strimzienv,strimzifile,strimzidir" ,
634+ "config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider" ,
635+ "config.providers.strimzienv.param.allowlist.pattern=.*" ,
636+ "config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider" ,
637+ "config.providers.strimzifile.param.allowed.paths=/opt/kafka" ,
638+ "config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider" ,
639+ "config.providers.strimzidir.param.allowed.paths=/opt/kafka" ,
640+ "auto.create.topics.enable=false" ,
641+ "offsets.topic.replication.factor=3" ,
642+ "transaction.state.log.replication.factor=3" ,
643+ "transaction.state.log.min.isr=2" ,
644+ "metric.reporters=org.apache.kafka.common.metrics.JmxReporter,com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter" ));
645+ }
646+
601647 @ ParallelTest
602648 public void testUserConfigurationWithCCMetricsReporterAndOtherMetricReporters () {
603649 Map <String , Object > userConfiguration = new HashMap <>();
@@ -606,7 +652,7 @@ public void testUserConfigurationWithCCMetricsReporterAndOtherMetricReporters()
606652 KafkaConfiguration kafkaConfiguration = new KafkaConfiguration (Reconciliation .DUMMY_RECONCILIATION , userConfiguration .entrySet ());
607653
608654 String configuration = new KafkaBrokerConfigurationBuilder (Reconciliation .DUMMY_RECONCILIATION , NODE_REF )
609- .withUserConfiguration (kafkaConfiguration , true )
655+ .withUserConfiguration (kafkaConfiguration , true , false )
610656 .build ();
611657
612658 assertThat (configuration , isEquivalent ("node.id=2" ,
@@ -620,6 +666,28 @@ public void testUserConfigurationWithCCMetricsReporterAndOtherMetricReporters()
620666 "metric.reporters=my.domain.CustomMetricReporter,com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter" ));
621667 }
622668
669+ @ ParallelTest
670+ public void testUserConfigurationWithCCMetricsReporterAndMetricsEnabledAndOtherMetricReporters () {
671+ Map <String , Object > userConfiguration = new HashMap <>();
672+ userConfiguration .put ("metric.reporters" , "my.domain.CustomMetricReporter" );
673+
674+ KafkaConfiguration kafkaConfiguration = new KafkaConfiguration (Reconciliation .DUMMY_RECONCILIATION , userConfiguration .entrySet ());
675+
676+ String configuration = new KafkaBrokerConfigurationBuilder (Reconciliation .DUMMY_RECONCILIATION , NODE_REF )
677+ .withUserConfiguration (kafkaConfiguration , true , true )
678+ .build ();
679+
680+ assertThat (configuration , isEquivalent ("node.id=2" ,
681+ "config.providers=strimzienv,strimzifile,strimzidir" ,
682+ "config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider" ,
683+ "config.providers.strimzienv.param.allowlist.pattern=.*" ,
684+ "config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider" ,
685+ "config.providers.strimzifile.param.allowed.paths=/opt/kafka" ,
686+ "config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider" ,
687+ "config.providers.strimzidir.param.allowed.paths=/opt/kafka" ,
688+ "metric.reporters=my.domain.CustomMetricReporter,org.apache.kafka.common.metrics.JmxReporter,com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter" ));
689+ }
690+
623691 @ ParallelTest
624692 public void testEphemeralStorageLogDirs () {
625693 Storage storage = new EphemeralStorageBuilder ()
0 commit comments