77import com .linkedin .kafka .cruisecontrol .metricsreporter .exception .KafkaTopicDescriptionException ;
88import com .linkedin .kafka .cruisecontrol .metricsreporter .metric .CruiseControlMetric ;
99import com .linkedin .kafka .cruisecontrol .metricsreporter .metric .MetricSerde ;
10- import com .linkedin .kafka .cruisecontrol .metricsreporter .utils .CCEmbeddedBroker ;
10+ import com .linkedin .kafka .cruisecontrol .metricsreporter .utils .CCContainerizedKraftCluster ;
1111import com .linkedin .kafka .cruisecontrol .metricsreporter .utils .CCKafkaClientsIntegrationTestHarness ;
1212import java .time .Duration ;
1313import java .util .Arrays ;
1414import java .util .Collections ;
1515import java .util .HashMap ;
1616import java .util .HashSet ;
17+ import java .util .List ;
1718import java .util .Map ;
1819import java .util .Properties ;
1920import java .util .Set ;
20- import java .util .concurrent .ExecutionException ;
21+ import java .util .concurrent .TimeoutException ;
2122import java .util .concurrent .atomic .AtomicInteger ;
23+ import java .util .function .Predicate ;
2224import java .util .regex .Pattern ;
23- import com .linkedin .kafka .cruisecontrol .metricsreporter .utils .CCKafkaTestUtils ;
2425import org .apache .kafka .clients .CommonClientConfigs ;
26+ import org .apache .kafka .clients .admin .Admin ;
2527import org .apache .kafka .clients .admin .AdminClient ;
2628import org .apache .kafka .clients .admin .TopicDescription ;
2729import org .apache .kafka .clients .consumer .Consumer ;
3436import org .apache .kafka .clients .producer .ProducerConfig ;
3537import org .apache .kafka .clients .producer .ProducerRecord ;
3638import org .apache .kafka .clients .producer .RecordMetadata ;
39+ import org .apache .kafka .common .errors .UnknownTopicOrPartitionException ;
3740import org .apache .kafka .common .serialization .StringDeserializer ;
38- import org .apache .kafka .coordinator .group .GroupCoordinatorConfig ;
39- import org .apache .kafka .network .SocketServerConfigs ;
40- import org .apache .kafka .server .config .ReplicationConfigs ;
41- import org .apache .kafka .server .config .ServerLogConfigs ;
4241import org .junit .After ;
4342import org .junit .Before ;
4443import org .junit .Test ;
44+ import org .testcontainers .kafka .KafkaContainer ;
4545
4646import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporter .DEFAULT_BOOTSTRAP_SERVERS_HOST ;
4747import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporter .DEFAULT_BOOTSTRAP_SERVERS_PORT ;
4848import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporter .getTopicDescription ;
49+ import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporterConfig .CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG ;
4950import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporterConfig .CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG ;
51+ import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporterConfig .CRUISE_CONTROL_METRICS_TOPIC_CONFIG ;
5052import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporterConfig .CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG ;
5153import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporterConfig .CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG ;
5254import static com .linkedin .kafka .cruisecontrol .metricsreporter .metric .RawMetricType .*;
5355import static org .junit .Assert .assertEquals ;
5456import static org .junit .Assert .assertTrue ;
55- import static org .junit .Assert .fail ;
5657
5758public class CruiseControlMetricsReporterTest extends CCKafkaClientsIntegrationTestHarness {
59+ private static final int NUM_OF_BROKERS = 2 ;
5860 protected static final String TOPIC = "CruiseControlMetricsReporterTest" ;
59- private static final String HOST = "127.0.0.1" ;
61+ protected static final String HOST = "127.0.0.1" ;
62+ protected CCContainerizedKraftCluster _cluster ;
63+ protected List <Map <Object , Object >> _brokerConfigs ;
6064
6165 /**
6266 * Setup the unit test.
6367 */
6468 @ Before
6569 public void setUp () {
66- super .setUp ();
70+ Properties adminClientProps = new Properties ();
71+ setSecurityConfigs (adminClientProps , "admin" );
72+
73+ _brokerConfigs = buildBrokerConfigs ();
74+ _cluster = new CCContainerizedKraftCluster (NUM_OF_BROKERS , _brokerConfigs , adminClientProps );
75+ _cluster .start ();
76+ _bootstrapUrl = _cluster .getExternalBootstrapAddress ();
77+
6778 Properties props = new Properties ();
6879 props .setProperty (ProducerConfig .ACKS_CONFIG , "-1" );
6980 AtomicInteger failed = new AtomicInteger (0 );
@@ -82,23 +93,31 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
8293 assertEquals (0 , failed .get ());
8394 }
8495
96+ /**
97+ * Tear down the unit test.
98+ */
8599 @ After
86100 public void tearDown () {
87- super .tearDown ();
101+ if (_cluster != null ) {
102+ _cluster .close ();
103+ }
88104 }
89105
90106 @ Override
91107 public Properties overridingProps () {
92108 Properties props = new Properties ();
93- int port = CCKafkaTestUtils .findLocalPort ();
94109 props .setProperty (CommonClientConfigs .METRIC_REPORTER_CLASSES_CONFIG , CruiseControlMetricsReporter .class .getName ());
95- props .setProperty (SocketServerConfigs .LISTENERS_CONFIG , "PLAINTEXT://" + HOST + ":" + port );
96- props .setProperty (CruiseControlMetricsReporterConfig .config (CommonClientConfigs .BOOTSTRAP_SERVERS_CONFIG ), HOST + ":" + port );
97- props .setProperty (CruiseControlMetricsReporterConfig .CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG , "100" );
98- props .setProperty (CruiseControlMetricsReporterConfig .CRUISE_CONTROL_METRICS_TOPIC_CONFIG , TOPIC );
99- props .setProperty (ServerLogConfigs .LOG_FLUSH_INTERVAL_MESSAGES_CONFIG , "1" );
100- props .setProperty (GroupCoordinatorConfig .OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG , "1" );
101- props .setProperty (ReplicationConfigs .DEFAULT_REPLICATION_FACTOR_CONFIG , "2" );
110+ props .setProperty (CruiseControlMetricsReporterConfig .config (CommonClientConfigs .BOOTSTRAP_SERVERS_CONFIG ),
111+ HOST + ":" + CCContainerizedKraftCluster .CONTAINER_INTERNAL_LISTENER_PORT );
112+ props .put ("listener.security.protocol.map" , String .join ("," ,
113+ CCContainerizedKraftCluster .CONTROLLER_LISTENER_NAME + ":PLAINTEXT" ,
114+ CCContainerizedKraftCluster .INTERNAL_LISTENER_NAME + ":PLAINTEXT" ,
115+ CCContainerizedKraftCluster .EXTERNAL_LISTENER_NAME + ":PLAINTEXT" ));
116+ props .setProperty (CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG , "100" );
117+ props .setProperty (CRUISE_CONTROL_METRICS_TOPIC_CONFIG , TOPIC );
118+ props .setProperty ("log.flush.interval.messages" , "1" );
119+ props .setProperty ("offsets.topic.replication.factor" , "1" );
120+ props .setProperty ("default.replication.factor" , "2" );
102121 return props ;
103122 }
104123
@@ -187,73 +206,89 @@ public void testReportingMetrics() {
187206 assertEquals ("Expected " + expectedMetricTypes + ", but saw " + metricTypes , expectedMetricTypes , metricTypes );
188207 }
189208
209+ private TopicDescription waitForTopicMetadata (Admin adminClient ,
210+ Duration timeout ,
211+ Predicate <TopicDescription > condition )
212+ throws InterruptedException , TimeoutException {
213+
214+ long deadline = System .currentTimeMillis () + timeout .toMillis ();
215+
216+ while (System .currentTimeMillis () < deadline ) {
217+ try {
218+ TopicDescription topicDescription = getTopicDescription ((AdminClient ) adminClient , TOPIC );
219+
220+ if (condition .test (topicDescription )) {
221+ return topicDescription ;
222+ }
223+ } catch (KafkaTopicDescriptionException e ) {
224+ if (!(e .getCause () instanceof UnknownTopicOrPartitionException )) {
225+ throw new RuntimeException ("Failed to describe topic: " + TOPIC , e );
226+ }
227+ // else ignore and retry
228+ }
229+
230+ Thread .sleep (500 );
231+ }
232+
233+ throw new TimeoutException ("Timeout waiting for topic metadata condition to be met: " + TOPIC );
234+ }
235+
190236 @ Test
191- public void testUpdatingMetricsTopicConfig () throws ExecutionException , InterruptedException {
237+ public void testUpdatingMetricsTopicConfig () throws InterruptedException , TimeoutException {
192238 Properties props = new Properties ();
193239 setSecurityConfigs (props , "admin" );
194- props .setProperty (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers ());
195- AdminClient adminClient = AdminClient .create (props );
240+ props .setProperty (CommonClientConfigs .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers ());
241+ Admin adminClient = Admin .create (props );
196242
197- // For compatibility with Kafka 4.0 and beyond we must use new API methods.
198- TopicDescription topicDescription ;
199- try {
200- topicDescription = getTopicDescription (adminClient , TOPIC );
201- } catch (KafkaTopicDescriptionException e ) {
202- throw new RuntimeException (e );
203- }
243+ TopicDescription topicDescription = waitForTopicMetadata (adminClient , Duration .ofSeconds (30 ), td -> true );
204244 assertEquals (1 , topicDescription .partitions ().size ());
245+
246+ KafkaContainer broker = _cluster .getBrokers ().get (0 );
247+
205248 // Shutdown broker
206- _brokers .get (0 ).shutdown ();
249+ broker .stop ();
250+
207251 // Change broker config
208- Map <Object , Object > brokerConfig = buildBrokerConfigs () .get (0 );
252+ Map <Object , Object > brokerConfig = _brokerConfigs .get (0 );
209253 brokerConfig .put (CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG , "true" );
210254 brokerConfig .put (CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG , "2" );
211255 brokerConfig .put (CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG , "1" );
212- try (CCEmbeddedBroker broker = new CCEmbeddedBroker (brokerConfig )) {
213- // Restart broker
214- broker .startup ();
215- // Check whether the topic config is updated
216- long startTime = System .currentTimeMillis ();
217- boolean isTopicConfigChanged = false ;
218- while (!isTopicConfigChanged ) {
219- if (System .currentTimeMillis () > startTime + 60000 ) {
220- fail ("Topic config was not updated" );
221- }
222256
223- TopicDescription description = adminClient .describeTopics (Collections .singleton (TOPIC )).topicNameValues ().get (TOPIC ).get ();
224- isTopicConfigChanged = 2 == description .partitions ().size ();
257+ _cluster .overrideBrokerConfig (broker , brokerConfig );
225258
226- try {
227- Thread .sleep (5000 );
228- } catch (InterruptedException ignored ) {
229- }
230- }
231- }
259+ // Restart broker
260+ broker .start ();
261+
262+ // Wait for topic metadata configuration change to propagate
263+ int oldPartitionCount = topicDescription .partitions ().size ();
264+ TopicDescription newTopicDescription = waitForTopicMetadata (adminClient , Duration .ofSeconds (30 ),
265+ td -> td .partitions ().size () != oldPartitionCount );
266+
267+ assertEquals (2 , newTopicDescription .partitions ().size ());
232268 }
233269
234270 @ Test
235271 public void testGetKafkaBootstrapServersConfigure () {
236272 // Test with a "listeners" config with a host
237273 Map <Object , Object > brokerConfig = buildBrokerConfigs ().get (0 );
238- Map <String , Object > listenersMap = Collections .singletonMap (
239- SocketServerConfigs .LISTENERS_CONFIG , brokerConfig .get (SocketServerConfigs .LISTENERS_CONFIG ));
274+ Map <String , Object > listenersMap = Collections .singletonMap ("listeners" , brokerConfig .get ("listeners" ));
240275 String bootstrapServers = CruiseControlMetricsReporter .getBootstrapServers (listenersMap );
241276 String urlParse = "\\ [?([0-9a-zA-Z\\ -%._:]*)]?:(-?[0-9]+)" ;
242277 Pattern urlParsePattern = Pattern .compile (urlParse );
243278 assertTrue (urlParsePattern .matcher (bootstrapServers ).matches ());
244- assertEquals (HOST , bootstrapServers .split (":" )[0 ]);
279+ assertEquals ("localhost" , bootstrapServers .split (":" )[0 ]);
245280
246281 // Test with a "listeners" config without a host in the first listener.
247282 String listeners = "SSL://:1234,PLAINTEXT://myhost:4321" ;
248- listenersMap = Collections .singletonMap (SocketServerConfigs . LISTENERS_CONFIG , listeners );
283+ listenersMap = Collections .singletonMap ("listeners" , listeners );
249284 bootstrapServers = CruiseControlMetricsReporter .getBootstrapServers (listenersMap );
250285 assertTrue (urlParsePattern .matcher (bootstrapServers ).matches ());
251286 assertEquals (DEFAULT_BOOTSTRAP_SERVERS_HOST , bootstrapServers .split (":" )[0 ]);
252287 assertEquals ("1234" , bootstrapServers .split (":" )[1 ]);
253288
254289 // Test with "listeners" and "port" config together.
255290 listenersMap = new HashMap <>();
256- listenersMap .put (SocketServerConfigs . LISTENERS_CONFIG , listeners );
291+ listenersMap .put ("listeners" , listeners );
257292 listenersMap .put ("port" , "43" );
258293 bootstrapServers = CruiseControlMetricsReporter .getBootstrapServers (listenersMap );
259294 assertTrue (urlParsePattern .matcher (bootstrapServers ).matches ());
0 commit comments