diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java index 4cc364e710..6218a141c0 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java @@ -202,7 +202,9 @@ public void testReportingMetrics() { @Test public void testUpdatingMetricsTopicConfig() throws InterruptedException, TimeoutException { - TopicDescription topicDescription = _cluster.waitForTopicMetadata(TOPIC, Duration.ofSeconds(30), td -> true); + Map topicDescriptions = + _cluster.waitForTopicMetadata(List.of(TOPIC), Duration.ofSeconds(5), Duration.ofSeconds(30), td -> true); + TopicDescription topicDescription = topicDescriptions.get(TOPIC); assertEquals(1, topicDescription.partitions().size()); KafkaContainer broker = _cluster.getBrokers().get(0); @@ -223,9 +225,9 @@ public void testUpdatingMetricsTopicConfig() throws InterruptedException, Timeou // Wait for topic metadata configuration change to propagate int oldPartitionCount = topicDescription.partitions().size(); - TopicDescription newTopicDescription = _cluster.waitForTopicMetadata(TOPIC, Duration.ofSeconds(30), - td -> td.partitions().size() != oldPartitionCount); - + Map newTopicDescriptions = _cluster.waitForTopicMetadata( + List.of(TOPIC), Duration.ofSeconds(5), Duration.ofSeconds(30), td -> td.partitions().size() != oldPartitionCount); + TopicDescription newTopicDescription = newTopicDescriptions.get(TOPIC); assertEquals(2, newTopicDescription.partitions().size()); } diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java index 707415dd94..9f0a4061ee 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java @@ -13,6 +13,7 @@ import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; @@ -31,6 +32,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -273,6 +275,15 @@ public void overrideBrokerConfig(KafkaContainer kafkaContainer, Map T waitUntil(Supplier supplier, Predicate condition, Dur } /** - * Waits until the metadata for a Kafka topic meets the specified condition, + * Waits until the metadata for the listed Kafka topics meet the specified condition, * or until the given timeout period elapses. * - * @param topicName the name of the topic whose metadata should be monitored - * @param timeout the maximum duration to wait for the condition to be satisfied + * @param topicNames the names of the topics whose metadata should be monitored + * @param fetchTimeout the timeout duration for the metadata retrieval call + * @param waitTimeout the maximum duration to wait for the condition to be satisfied * @param condition a {@link Predicate} that tests the {@link TopicDescription} for readiness - * @return the {@link TopicDescription} once the condition evaluates to {@code true} + * @return the {@link Map}{@code } once the condition evaluates to {@code true} */ - public TopicDescription waitForTopicMetadata(String topicName, Duration timeout, Predicate condition) { + public Map waitForTopicMetadata(List topicNames, + Duration fetchTimeout, + Duration waitTimeout, + Predicate condition) { return waitUntil( () -> { try { - return _adminClient.describeTopics(Collections.singleton(topicName)) - .topicNameValues() - .get(topicName) - .get(); - } catch (ExecutionException e) { - if (e.getCause() instanceof UnknownTopicOrPartitionException) { - // Topic doesn't exist yet, retry - return null; - } - throw new RuntimeException("Failed to describe topic: " + topicName, e); + Map descriptions = + _adminClient.describeTopics(topicNames) + .allTopicNames() + .get(fetchTimeout.toSeconds(), TimeUnit.SECONDS); + + boolean topicMetadataReady = descriptions.values().stream().allMatch(td -> td != null && condition.test(td)); + + return topicMetadataReady ? descriptions : null; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Interrupted while waiting for broker to become ready", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof UnknownTopicOrPartitionException || e.getCause() instanceof RetriableException) { + // Topic doesn't exist yet or transient error, retry + return null; + } + throw new RuntimeException("Failed to describe topics: " + topicNames, e); + } catch (TimeoutException e) { + // Timeout fetching metadata, treat as transient and retry + return null; } }, - td -> td != null && condition.test(td), - timeout, - String.format("Timeout waiting for topic %s metadata to be ready.", topicName) + Objects::nonNull, + waitTimeout, + String.format("Timeout waiting for topics %s metadata to be ready.", topicNames) ); } @@ -385,14 +407,22 @@ public void shutDownBroker(int brokerId) { waitUntil( () -> { try { - return _adminClient.describeCluster().nodes().get().stream() - .map(node -> String.valueOf(node.id())) - .collect(Collectors.toSet()); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); + return _adminClient.describeCluster().nodes().get(10, TimeUnit.SECONDS) + .stream().noneMatch(node -> node.id() == brokerId); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for broker removal", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof RetriableException) { + return false; + } + throw new RuntimeException("Failed to describe cluster: ", e); + } catch (TimeoutException e) { + // Timeout fetching metadata, treat as transient and retry + return false; } }, - brokerIds -> !brokerIds.contains(String.valueOf(brokerId)), + Boolean::booleanValue, timeout, String.format("Broker %s did not shutdown properly.", brokerId) ); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/common/MetadataAdminClient.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/common/MetadataAdminClient.java new file mode 100644 index 0000000000..e5484a8f5e --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/common/MetadataAdminClient.java @@ -0,0 +1,82 @@ +/* + * Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.common; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartitionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client for fetching Kafka cluster metadata using Kafka Admin APIs. + * + * This replaces the use of MetadataClient which relies on internal Kafka APIs. + */ +public class MetadataAdminClient { + private static final Logger LOG = LoggerFactory.getLogger(MetadataAdminClient.class); + + private final Admin _adminClient; + + /** + * Creates a new MetadataAdminClient. + * + * @param adminClient The AdminClient to use for fetching cluster metadata. + */ + public MetadataAdminClient(Admin adminClient) { + _adminClient = adminClient; + } + + /** + * Fetches the current metadata for the Kafka cluster. + * + * @return a {@link Cluster} containing the cluster ID, broker nodes, and partition information for all topics + */ + public Cluster cluster() { + try { + Set topicNames = _adminClient.listTopics().names().get(); + + Map topicDescriptions = _adminClient.describeTopics(topicNames).allTopicNames().get(); + + DescribeClusterResult describeResult = _adminClient.describeCluster(); + Collection nodes = describeResult.nodes().get(); + String clusterId = describeResult.clusterId().get(); + + List partitionInfos = new ArrayList<>(); + + for (TopicDescription desc : topicDescriptions.values()) { + for (TopicPartitionInfo partInfo : desc.partitions()) { + + Node leader = partInfo.leader(); + Node[] replicas = partInfo.replicas().toArray(Node[]::new); + Node[] isr = partInfo.isr().toArray(Node[]::new); + + partitionInfos.add(new PartitionInfo(desc.name(), partInfo.partition(), leader, replicas, isr)); + } + } + + return new Cluster(clusterId, nodes, partitionInfos, Collections.emptySet(), Collections.emptySet()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Interrupted while fetching cluster metadata", e); + throw new RuntimeException("Interrupted while fetching cluster metadata", e); + + } catch (ExecutionException e) { + LOG.error("ExecutionException while fetching cluster metadata", e); + throw new RuntimeException("Failed to fetch cluster metadata", e); + } + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/common/MetadataClient.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/common/MetadataClient.java index b3be9868ef..1c8651889c 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/common/MetadataClient.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/common/MetadataClient.java @@ -26,7 +26,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +/** + * Legacy client for fetching Kafka cluster metadata. + * + * @deprecated This class is deprecated and will be removed in a future release in favor of {@link MetadataAdminClient} + * which uses public Kafka APIs. + */ +@Deprecated public class MetadataClient { private static final Logger LOG = LoggerFactory.getLogger(MetadataClient.class); private static final LogContext LOG_CONTEXT = new LogContext(); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 22f210f0a5..df1f1c6574 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -9,10 +9,10 @@ import com.codahale.metrics.Timer; import com.google.common.util.concurrent.AtomicDouble; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; +import com.linkedin.kafka.cruisecontrol.common.MetadataAdminClient; import com.linkedin.kafka.cruisecontrol.common.TopicMinIsrCache; import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory; -import com.linkedin.kafka.cruisecontrol.common.MetadataClient; import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorManager; import com.linkedin.kafka.cruisecontrol.exception.OngoingExecutionException; @@ -42,18 +42,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; -import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult; import org.apache.kafka.clients.admin.DescribeConfigsResult; import org.apache.kafka.clients.admin.ElectLeadersResult; import org.apache.kafka.clients.admin.PartitionReassignment; import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; -import org.apache.kafka.common.internals.ClusterResourceListeners; -import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,7 +83,7 @@ public class Executor { private static final long EXECUTION_PROGRESS_CHECK_INTERVAL_ADJUSTING_MS = 1000; // The execution progress is controlled by the ExecutionTaskManager. private final ExecutionTaskManager _executionTaskManager; - private final MetadataClient _metadataClient; + private final MetadataAdminClient _metadataClient; private volatile long _executionProgressCheckIntervalMs; private final long _defaultExecutionProgressCheckIntervalMs; private Long _requestedExecutionProgressCheckIntervalMs; @@ -156,7 +152,7 @@ public Executor(KafkaCruiseControlConfig config, Time time, MetricRegistry dropwizardMetricRegistry, AnomalyDetectorManager anomalyDetectorManager) { - this(config, time, dropwizardMetricRegistry, null, null, anomalyDetectorManager); + this(config, time, dropwizardMetricRegistry, null, null, null, anomalyDetectorManager); } /** @@ -168,7 +164,8 @@ public Executor(KafkaCruiseControlConfig config, Executor(KafkaCruiseControlConfig config, Time time, MetricRegistry dropwizardMetricRegistry, - MetadataClient metadataClient, + AdminClient adminClient, + MetadataAdminClient metadataClient, ExecutorNotifier executorNotifier, AnomalyDetectorManager anomalyDetectorManager) { _numExecutionStopped = new AtomicInteger(0); @@ -185,19 +182,12 @@ public Executor(KafkaCruiseControlConfig config, _config = config; _time = time; - _adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(config)); + _adminClient = adminClient != null ? adminClient + : KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(config)); _executionTaskManager = new ExecutionTaskManager(_adminClient, dropwizardMetricRegistry, time, config); // Register gauge sensors. registerGaugeSensors(dropwizardMetricRegistry); - _metadataClient = metadataClient != null ? metadataClient - : new MetadataClient(config, - new Metadata(ExecutionUtils.METADATA_REFRESH_BACKOFF, - ExecutionUtils.METADATA_REFRESH_BACKOFF_MAX, - ExecutionUtils.METADATA_EXPIRY_MS, - new LogContext(), - new ClusterResourceListeners()), - -1L, - time); + _metadataClient = metadataClient != null ? metadataClient : new MetadataAdminClient(_adminClient); _defaultExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG); _executionProgressCheckIntervalMs = _defaultExecutionProgressCheckIntervalMs; _leaderMovementTimeoutMs = config.getLong(ExecutorConfig.LEADER_MOVEMENT_TIMEOUT_MS_CONFIG); @@ -875,7 +865,7 @@ private synchronized void initProposalExecution(Collection pr recentlyRemovedBrokers(), isTriggeredByUserRequest); _executionTaskManager.setExecutionModeForTaskTracker(_isKafkaAssignerMode); // Get a snapshot of (1) cluster and (2) minIsr with time by topic name. - StrategyOptions strategyOptions = new StrategyOptions.Builder(_metadataClient.refreshMetadata().cluster()) + StrategyOptions strategyOptions = new StrategyOptions.Builder(_metadataClient.cluster()) .minIsrWithTimeByTopic(_topicMinIsrCache.minIsrWithTimeByTopic()).build(); _executionTaskManager.addExecutionProposals(proposals, brokersToSkipConcurrencyCheck, strategyOptions, replicaMovementStrategy); _concurrencyAdjuster.initAdjustment(loadMonitor, @@ -1069,9 +1059,7 @@ private void sanityCheckOngoingMovement() throws OngoingExecutionException { } else { boolean hasOngoingIntraBrokerReplicaMovement; try { - hasOngoingIntraBrokerReplicaMovement = - hasOngoingIntraBrokerReplicaMovement(_metadataClient.cluster().nodes().stream().mapToInt(Node::id).boxed() - .collect(Collectors.toSet()), _adminClient, _config); + hasOngoingIntraBrokerReplicaMovement = hasOngoingIntraBrokerReplicaMovement(_adminClient, _config); } catch (TimeoutException | InterruptedException | ExecutionException e) { // This may indicate transient (e.g. network) issues. throw new IllegalStateException("Failed to retrieve if there are already ongoing intra-broker replica reassignments.", e); @@ -1205,7 +1193,6 @@ public synchronized void shutdown() { } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for anomaly detector to shutdown."); } - _metadataClient.close(); KafkaCruiseControlUtils.closeAdminClientWithTimeout(_adminClient); _executionHistoryScannerExecutor.shutdownNow(); _concurrencyAdjusterExecutor.shutdownNow(); @@ -1799,7 +1786,7 @@ private Cluster getClusterForExecutionProgressCheck() { if (LOG.isDebugEnabled()) { LOG.debug("Tasks in execution: {}", inExecutionTasks()); } - return _metadataClient.refreshMetadata().cluster(); + return _metadataClient.cluster(); } /** diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorAdminUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorAdminUtils.java index d1a468540f..c8f1751e9e 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorAdminUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorAdminUtils.java @@ -13,11 +13,13 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.LogDirDescription; import org.apache.kafka.clients.admin.ReplicaInfo; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.common.errors.LogDirNotFoundException; @@ -99,14 +101,14 @@ static void executeIntraBrokerReplicaMovements(List tasksToExecut /** * Check whether there is ongoing intra-broker replica movement. - * @param brokersToCheck List of broker to check. * @param adminClient The adminClient to send describeLogDirs request. * @param config The config object that holds all the Cruise Control related configs * @return {@code true} if there is ongoing intra-broker replica movement. */ - static boolean hasOngoingIntraBrokerReplicaMovement(Collection brokersToCheck, AdminClient adminClient, + static boolean hasOngoingIntraBrokerReplicaMovement(AdminClient adminClient, KafkaCruiseControlConfig config) throws InterruptedException, ExecutionException, TimeoutException { + Collection brokersToCheck = adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet()); Map>> logDirsByBrokerId = adminClient.describeLogDirs(brokersToCheck).descriptions(); for (Map.Entry>> entry : logDirsByBrokerId.entrySet()) { Map logInfos = entry.getValue().get(config.getLong(LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG), TimeUnit.MILLISECONDS); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java index 42ec5fa84f..ed933a4b0b 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java @@ -6,7 +6,7 @@ import com.codahale.metrics.MetricRegistry; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; -import com.linkedin.kafka.cruisecontrol.common.MetadataClient; +import com.linkedin.kafka.cruisecontrol.common.MetadataAdminClient; import com.linkedin.kafka.cruisecontrol.common.TestConstants; import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigFileResolver; import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; @@ -40,6 +40,7 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult; @@ -53,6 +54,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -84,6 +86,7 @@ public class ExecutorTest extends CCKafkaClientsIntegrationTestHarness { private static final long MOCK_CURRENT_TIME = 1596842708000L; private CCContainerizedKraftCluster _cluster; + private Admin _adminClient; private List _brokerAddressList; @Override @@ -103,12 +106,14 @@ public int clusterSize() { @Before public void setUp() { Properties adminClientProps = new Properties(); + adminClientProps.setProperty(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "0"); setSecurityConfigs(adminClientProps, "admin"); _cluster = new CCContainerizedKraftCluster(clusterSize(), buildBrokerConfigs(), adminClientProps); _cluster.start(); _brokerAddressList = _cluster.getBrokerAddressList(); _bootstrapUrl = _cluster.getExternalBootstrapAddress(); + _adminClient = _cluster.adminClient(); } /** @@ -133,19 +138,10 @@ public Properties overridingProps() { @Test public void testReplicaReassignment() throws InterruptedException, OngoingExecutionException { - KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties()); - Map adminClientConfigs = KafkaCruiseControlUtils.parseAdminClientConfigs(kafkaCruiseControlConfig); - adminClientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); - AdminClient adminClient = KafkaCruiseControlUtils.createAdminClient(adminClientConfigs); - - try { - List proposalsToExecute = new ArrayList<>(); - List proposalsToCheck = new ArrayList<>(); - populateProposals(proposalsToExecute, proposalsToCheck, 0); - executeAndVerifyProposals(adminClient, proposalsToExecute, proposalsToCheck, false, null, false, true); - } finally { - KafkaCruiseControlUtils.closeAdminClientWithTimeout(adminClient); - } + List proposalsToExecute = new ArrayList<>(); + List proposalsToCheck = new ArrayList<>(); + populateProposals(proposalsToExecute, proposalsToCheck, 0); + executeAndVerifyProposals(_adminClient, proposalsToExecute, proposalsToCheck, false, null, false, true); } @Test @@ -168,43 +164,34 @@ public void testReplicaReassignmentProgressWithThrottle() throws InterruptedExce @Test public void testBrokerDiesBeforeMovingPartition() throws Exception { - KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties()); - Map adminClientConfigs = KafkaCruiseControlUtils.parseAdminClientConfigs(kafkaCruiseControlConfig); - adminClientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); - AdminClient adminClient = KafkaCruiseControlUtils.createAdminClient(adminClientConfigs); + Map topicDescriptions = createTopics((int) PRODUCE_SIZE_IN_BYTES); + // initialLeader0 will be alive after killing a broker in cluster. + int initialLeader0 = topicDescriptions.get(TOPIC0).partitions().get(0).leader().id(); + int initialLeader1 = topicDescriptions.get(TOPIC1).partitions().get(0).leader().id(); + // Kill broker before starting the reassignment. + int brokerId = initialLeader0 == 0 ? 1 : 0; - try { - Map topicDescriptions = createTopics((int) PRODUCE_SIZE_IN_BYTES); - // initialLeader0 will be alive after killing a broker in cluster. - int initialLeader0 = topicDescriptions.get(TOPIC0).partitions().get(0).leader().id(); - int initialLeader1 = topicDescriptions.get(TOPIC1).partitions().get(0).leader().id(); - // Kill broker before starting the reassignment. - int brokerId = initialLeader0 == 0 ? 1 : 0; - - _cluster.shutDownBroker(brokerId); - - ExecutionProposal proposal0 = - new ExecutionProposal(TP0, PRODUCE_SIZE_IN_BYTES, new ReplicaPlacementInfo(initialLeader0), - Collections.singletonList(new ReplicaPlacementInfo(initialLeader0)), - Collections.singletonList(new ReplicaPlacementInfo(initialLeader0 == 0 ? 1 : 0))); - ExecutionProposal proposal1 = - new ExecutionProposal(TP1, 0, new ReplicaPlacementInfo(initialLeader1), - Arrays.asList(new ReplicaPlacementInfo(initialLeader1), - new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0)), - Arrays.asList(new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0), - new ReplicaPlacementInfo(initialLeader1))); - - Collection proposalsToExecute = Arrays.asList(proposal0, proposal1); - executeAndVerifyProposals(adminClient, proposalsToExecute, Collections.emptyList(), true, null, false, false); - - // We are doing the rollback. -- The leadership should be on the alive broker. - TopicDescription topic0 = adminClient.describeTopics(Collections.singleton(TOPIC0)).topicNameValues().get(TOPIC0).get(); - TopicDescription topic1 = adminClient.describeTopics(Collections.singleton(TOPIC1)).topicNameValues().get(TOPIC1).get(); - assertEquals(initialLeader0, topic0.partitions().get(PARTITION).leader().id()); - assertEquals(initialLeader0, topic1.partitions().get(PARTITION).leader().id()); - } finally { - KafkaCruiseControlUtils.closeAdminClientWithTimeout(adminClient); - } + _cluster.shutDownBroker(brokerId); + + ExecutionProposal proposal0 = + new ExecutionProposal(TP0, PRODUCE_SIZE_IN_BYTES, new ReplicaPlacementInfo(initialLeader0), + Collections.singletonList(new ReplicaPlacementInfo(initialLeader0)), + Collections.singletonList(new ReplicaPlacementInfo(initialLeader0 == 0 ? 1 : 0))); + ExecutionProposal proposal1 = + new ExecutionProposal(TP1, 0, new ReplicaPlacementInfo(initialLeader1), + Arrays.asList(new ReplicaPlacementInfo(initialLeader1), + new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0)), + Arrays.asList(new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0), + new ReplicaPlacementInfo(initialLeader1))); + + Collection proposalsToExecute = Arrays.asList(proposal0, proposal1); + executeAndVerifyProposals(_adminClient, proposalsToExecute, Collections.emptyList(), true, null, false, false); + + // We are doing the rollback. -- The leadership should be on the alive broker. + TopicDescription topic0 = _adminClient.describeTopics(Collections.singleton(TOPIC0)).topicNameValues().get(TOPIC0).get(); + TopicDescription topic1 = _adminClient.describeTopics(Collections.singleton(TOPIC1)).topicNameValues().get(TOPIC1).get(); + assertEquals(initialLeader0, topic0.partitions().get(PARTITION).leader().id()); + assertEquals(initialLeader0, topic1.partitions().get(PARTITION).leader().id()); } @Test @@ -405,7 +392,7 @@ private static boolean verifyFutureError(Future future, Class new Executor(config, null, new MetricRegistry(), EasyMock.mock(MetadataClient.class), null, null)); - Executor executor = new Executor(config, null, new MetricRegistry(), EasyMock.mock(MetadataClient.class), + () -> new Executor(config, null, new MetricRegistry(), null, EasyMock.mock(MetadataAdminClient.class), null, null)); + Executor executor = new Executor(config, null, new MetricRegistry(), null, EasyMock.mock(MetadataAdminClient.class), null, EasyMock.mock(AnomalyDetectorManager.class)); // Verify correctness of add/drop recently removed/demoted brokers. @@ -502,7 +489,7 @@ public void testTimeoutAndExecutionStop() throws InterruptedException, OngoingEx KafkaCruiseControlConfig configs = new KafkaCruiseControlConfig(getExecutorProperties()); Time time = new MockTime(); - MetadataClient mockMetadataClient = EasyMock.mock(MetadataClient.class); + MetadataAdminClient mockMetadataClient = EasyMock.mock(MetadataAdminClient.class); // Fake the metadata to never change so the leader movement will timeout. Node node0 = new Node(BROKER_ID_0, "host0", 100); Node node1 = new Node(BROKER_ID_1, "host1", 100); @@ -512,9 +499,7 @@ public void testTimeoutAndExecutionStop() throws InterruptedException, OngoingEx PartitionInfo partitionInfo = new PartitionInfo(TP1.topic(), TP1.partition(), node1, replicas, replicas); Cluster cluster = new Cluster("id", Arrays.asList(node0, node1), Collections.singleton(partitionInfo), Collections.emptySet(), Collections.emptySet()); - MetadataClient.ClusterAndGeneration clusterAndGeneration = new MetadataClient.ClusterAndGeneration(cluster, 0); - EasyMock.expect(mockMetadataClient.refreshMetadata()).andReturn(clusterAndGeneration).anyTimes(); - EasyMock.expect(mockMetadataClient.cluster()).andReturn(clusterAndGeneration.cluster()).anyTimes(); + EasyMock.expect(mockMetadataClient.cluster()).andReturn(cluster).anyTimes(); LoadMonitor mockLoadMonitor = getMockLoadMonitor(); AnomalyDetectorManager mockAnomalyDetectorManager = getMockAnomalyDetector(RANDOM_UUID, false); UserTaskManager.UserTaskInfo mockUserTaskInfo = getMockUserTaskInfo(); @@ -523,7 +508,7 @@ public void testTimeoutAndExecutionStop() throws InterruptedException, OngoingEx EasyMock.replay(mockMetadataClient, mockLoadMonitor, mockAnomalyDetectorManager, mockUserTaskInfo, mockUserTaskManager); Collection proposalsToExecute = Collections.singletonList(proposal); - Executor executor = new Executor(configs, time, new MetricRegistry(), mockMetadataClient, null, + Executor executor = new Executor(configs, time, new MetricRegistry(), null, mockMetadataClient, null, mockAnomalyDetectorManager); executor.setUserTaskManager(mockUserTaskManager); @@ -659,45 +644,32 @@ private void populateProposals(List proposalToExecute, * {@link com.linkedin.kafka.cruisecontrol.common.TestConstants#TOPIC0 topic0}. * @return A map from topic names to their description. */ - private Map createTopics(int produceSizeInBytes) throws InterruptedException { - AdminClient adminClient = KafkaCruiseControlUtils.createAdminClient(Collections.singletonMap( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, _brokerAddressList.get(BROKER_ID_0))); - try { - adminClient.createTopics(Arrays.asList(new NewTopic(TOPIC0, Collections.singletonMap(0, Collections.singletonList(0))), - new NewTopic(TOPIC1, Collections.singletonMap(0, List.of(0, 1))))); - } finally { - KafkaCruiseControlUtils.closeAdminClientWithTimeout(adminClient); - } - - // We need to use the admin clients to query the metadata from two different brokers to make sure that - // both brokers have the latest metadata. Otherwise the Executor may get confused when it does not - // see expected topics in the metadata. - Map topicDescriptions0 = null; - Map topicDescriptions1 = null; - do { - AdminClient adminClient0 = KafkaCruiseControlUtils.createAdminClient(Collections.singletonMap( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, _brokerAddressList.get(BROKER_ID_0))); - AdminClient adminClient1 = KafkaCruiseControlUtils.createAdminClient(Collections.singletonMap( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, _brokerAddressList.get(BROKER_ID_1))); - try { - topicDescriptions0 = adminClient0.describeTopics(Arrays.asList(TOPIC0, TOPIC1)).allTopicNames().get(); - topicDescriptions1 = adminClient1.describeTopics(Arrays.asList(TOPIC0, TOPIC1)).allTopicNames().get(); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } catch (ExecutionException ee) { - // Let it go. - } finally { - KafkaCruiseControlUtils.closeAdminClientWithTimeout(adminClient0); - KafkaCruiseControlUtils.closeAdminClientWithTimeout(adminClient1); - } - } while (topicDescriptions0 == null || topicDescriptions0.size() < 2 - || topicDescriptions1 == null || topicDescriptions1.size() < 2); + private Map createTopics(int produceSizeInBytes) { + _adminClient.createTopics(Arrays.asList(new NewTopic(TOPIC0, Collections.singletonMap(0, Collections.singletonList(0))), + new NewTopic(TOPIC1, Collections.singletonMap(0, List.of(0, 1))))); + + // Wait for the metadata of newly created topics to be propagated to all the brokers by ensuring + // - Every partition has a leader assigned, + // - Every partition has replicas assigned, + // - The leader is included in the replicas list, + // - Every partition's in-sync replicas list is populated + // so subsequent Admin operations on these topics can safely assume the metadata is consistent. + Map topicDescriptions = _cluster.waitForTopicMetadata( + List.of(TOPIC0, TOPIC1), + Duration.ofSeconds(15), + Duration.ofSeconds(60), + topicDescription -> { + return topicDescription.partitions() != null + && !topicDescription.partitions().isEmpty() + && topicDescription.partitions().stream().allMatch(p -> + p.leader() != null + && !p.replicas().isEmpty() + && p.replicas().contains(p.leader()) + && !p.isr().isEmpty()); + }); produceRandomDataToTopic(TOPIC0, produceSizeInBytes); - return topicDescriptions0; + return topicDescriptions; } private void verifyOngoingPartitionReassignments(Set partitions) { @@ -784,7 +756,7 @@ private static UserTaskManager.UserTaskInfo getMockUserTaskInfo() { return mockUserTaskInfo; } - private void executeAndVerifyProposals(AdminClient adminClient, + private void executeAndVerifyProposals(Admin adminClient, Collection proposalsToExecute, Collection proposalsToCheck, boolean completeWithError, @@ -815,8 +787,8 @@ private void executeAndVerifyProposals(AdminClient adminClient, EasyMock.replay(mockUserTaskInfo, mockExecutorNotifier, mockLoadMonitor, mockAnomalyDetectorManager); } MetricRegistry metricRegistry = new MetricRegistry(); - Executor executor = new Executor(configs, Time.SYSTEM, metricRegistry, null, mockExecutorNotifier, - mockAnomalyDetectorManager); + Executor executor = new Executor(configs, Time.SYSTEM, metricRegistry, (AdminClient) _adminClient, + new MetadataAdminClient(_adminClient), mockExecutorNotifier, mockAnomalyDetectorManager); executor.setUserTaskManager(mockUserTaskManager); Map replicationFactors = new HashMap<>(); for (ExecutionProposal proposal : proposalsToCheck) { @@ -852,14 +824,25 @@ private void executeAndVerifyProposals(AdminClient adminClient, .collect(Collectors.toSet()); // Wait for topic metadata change to propagate. - TopicDescription topic = _cluster.waitForTopicMetadata(tp.topic(), Duration.ofSeconds(60), + Map topicDescriptions = _cluster.waitForTopicMetadata( + List.of(tp.topic()), + Duration.ofSeconds(15), + Duration.ofSeconds(60), topicDescription -> { - List replicas = topicDescription.partitions().get(tp.partition()).replicas(); + TopicPartitionInfo partitionInfo = topicDescription.partitions().get(tp.partition()); + + List replicas = partitionInfo.replicas(); Set actualBrokerIds = replicas.stream().map(Node::id).collect(Collectors.toSet()); - return actualBrokerIds.containsAll(expectedBrokerIds) - && actualBrokerIds.size() == expectedBrokerIds.size(); + boolean replicasMatch = actualBrokerIds.containsAll(expectedBrokerIds); + + Node leader = partitionInfo.leader(); + boolean leaderMatches = leader != null && leader.id() == proposal.newLeader().brokerId(); + + return replicasMatch && leaderMatches; }); + TopicDescription topic = topicDescriptions.get(proposal.topic()); + assertEquals("Replication factor for partition " + tp + " should be " + expectedReplicationFactor, expectedReplicationFactor, topic.partitions().get(tp.partition()).replicas().size()); diff --git a/gradle.properties b/gradle.properties index c867482120..3959e4411e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -6,4 +6,4 @@ kafkaVersion=4.0.0 nettyVersion=4.1.118.Final jettyVersion=9.4.56.v20240826 vertxVersion=4.5.8 -testcontainersVersion=1.21.3 +testcontainersVersion=1.21.4