Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ public void testReportingMetrics() {

@Test
public void testUpdatingMetricsTopicConfig() throws InterruptedException, TimeoutException {
TopicDescription topicDescription = _cluster.waitForTopicMetadata(TOPIC, Duration.ofSeconds(30), td -> true);
Map<String, TopicDescription> topicDescriptions =
_cluster.waitForTopicMetadata(List.of(TOPIC), Duration.ofSeconds(5), Duration.ofSeconds(30), td -> true);
TopicDescription topicDescription = topicDescriptions.get(TOPIC);
assertEquals(1, topicDescription.partitions().size());

KafkaContainer broker = _cluster.getBrokers().get(0);
Expand All @@ -223,9 +225,9 @@ public void testUpdatingMetricsTopicConfig() throws InterruptedException, Timeou

// Wait for topic metadata configuration change to propagate
int oldPartitionCount = topicDescription.partitions().size();
TopicDescription newTopicDescription = _cluster.waitForTopicMetadata(TOPIC, Duration.ofSeconds(30),
td -> td.partitions().size() != oldPartitionCount);

Map<String, TopicDescription> newTopicDescriptions = _cluster.waitForTopicMetadata(
List.of(TOPIC), Duration.ofSeconds(5), Duration.ofSeconds(30), td -> td.partitions().size() != oldPartitionCount);
TopicDescription newTopicDescription = newTopicDescriptions.get(TOPIC);
assertEquals(2, newTopicDescription.partitions().size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
Expand All @@ -31,6 +32,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -273,6 +275,15 @@ public void overrideBrokerConfig(KafkaContainer kafkaContainer, Map<Object, Obje
}
}

/**
* Returns the {@link Admin} client used by this cluster instance.
*
* @return the {@link Admin} client associated with this cluster
*/
public Admin adminClient() {
return _adminClient;
}

/**
* Returns list of KafkaContainer broker objects within the TestContainer Kafka cluster.
*
Expand Down Expand Up @@ -338,36 +349,47 @@ private static <T> T waitUntil(Supplier<T> supplier, Predicate<T> condition, Dur
}

/**
* Waits until the metadata for a Kafka topic meets the specified condition,
* Waits until the metadata for the listed Kafka topics meet the specified condition,
* or until the given timeout period elapses.
*
* @param topicName the name of the topic whose metadata should be monitored
* @param timeout the maximum duration to wait for the condition to be satisfied
* @param topicNames the names of the topics whose metadata should be monitored
* @param fetchTimeout the timeout duration for the metadata retrieval call
* @param waitTimeout the maximum duration to wait for the condition to be satisfied
* @param condition a {@link Predicate} that tests the {@link TopicDescription} for readiness
* @return the {@link TopicDescription} once the condition evaluates to {@code true}
* @return the {@link Map}{@code <String, TopicDescription>} once the condition evaluates to {@code true}
*/
public TopicDescription waitForTopicMetadata(String topicName, Duration timeout, Predicate<TopicDescription> condition) {
public Map<String, TopicDescription> waitForTopicMetadata(List<String> topicNames,
Duration fetchTimeout,
Duration waitTimeout,
Predicate<TopicDescription> condition) {
return waitUntil(
() -> {
try {
return _adminClient.describeTopics(Collections.singleton(topicName))
.topicNameValues()
.get(topicName)
.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
// Topic doesn't exist yet, retry
return null;
}
throw new RuntimeException("Failed to describe topic: " + topicName, e);
Map<String, TopicDescription> descriptions =
_adminClient.describeTopics(topicNames)
.allTopicNames()
.get(fetchTimeout.toSeconds(), TimeUnit.SECONDS);

boolean topicMetadataReady = descriptions.values().stream().allMatch(td -> td != null && condition.test(td));

return topicMetadataReady ? descriptions : null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for broker to become ready", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException || e.getCause() instanceof RetriableException) {
// Topic doesn't exist yet or transient error, retry
return null;
}
throw new RuntimeException("Failed to describe topics: " + topicNames, e);
} catch (TimeoutException e) {
// Timeout fetching metadata, treat as transient and retry
return null;
}
},
td -> td != null && condition.test(td),
timeout,
String.format("Timeout waiting for topic %s metadata to be ready.", topicName)
Objects::nonNull,
waitTimeout,
String.format("Timeout waiting for topics %s metadata to be ready.", topicNames)
);
}

Expand All @@ -385,14 +407,22 @@ public void shutDownBroker(int brokerId) {
waitUntil(
() -> {
try {
return _adminClient.describeCluster().nodes().get().stream()
.map(node -> String.valueOf(node.id()))
.collect(Collectors.toSet());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
return _adminClient.describeCluster().nodes().get(10, TimeUnit.SECONDS)
.stream().noneMatch(node -> node.id() == brokerId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for broker removal", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof RetriableException) {
return false;
}
throw new RuntimeException("Failed to describe cluster: ", e);
} catch (TimeoutException e) {
// Timeout fetching metadata, treat as transient and retry
return false;
}
},
brokerIds -> !brokerIds.contains(String.valueOf(brokerId)),
Boolean::booleanValue,
timeout,
String.format("Broker %s did not shutdown properly.", brokerId)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.common;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Client for fetching Kafka cluster metadata using Kafka Admin APIs.
*
* This replaces the use of MetadataClient which relies on internal Kafka APIs.
*/
public class MetadataAdminClient {
private static final Logger LOG = LoggerFactory.getLogger(MetadataAdminClient.class);

private final Admin _adminClient;

/**
* Creates a new MetadataAdminClient.
*
* @param adminClient The AdminClient to use for fetching cluster metadata.
*/
public MetadataAdminClient(Admin adminClient) {
_adminClient = adminClient;
}

/**
* Fetches the current metadata for the Kafka cluster.
*
* @return a {@link Cluster} containing the cluster ID, broker nodes, and partition information for all topics
*/
public Cluster cluster() {
try {
Set<String> topicNames = _adminClient.listTopics().names().get();

Map<String, TopicDescription> topicDescriptions = _adminClient.describeTopics(topicNames).allTopicNames().get();

DescribeClusterResult describeResult = _adminClient.describeCluster();
Collection<Node> nodes = describeResult.nodes().get();
String clusterId = describeResult.clusterId().get();

List<PartitionInfo> partitionInfos = new ArrayList<>();

for (TopicDescription desc : topicDescriptions.values()) {
for (TopicPartitionInfo partInfo : desc.partitions()) {

Node leader = partInfo.leader();
Node[] replicas = partInfo.replicas().toArray(Node[]::new);
Node[] isr = partInfo.isr().toArray(Node[]::new);

partitionInfos.add(new PartitionInfo(desc.name(), partInfo.partition(), leader, replicas, isr));
}
}

return new Cluster(clusterId, nodes, partitionInfos, Collections.emptySet(), Collections.emptySet());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while fetching cluster metadata", e);
throw new RuntimeException("Interrupted while fetching cluster metadata", e);

} catch (ExecutionException e) {
LOG.error("ExecutionException while fetching cluster metadata", e);
throw new RuntimeException("Failed to fetch cluster metadata", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Legacy client for fetching Kafka cluster metadata.
*
* @deprecated This class is deprecated and will be removed in a future release in favor of {@link MetadataAdminClient}
* which uses public Kafka APIs.
*/
@Deprecated
public class MetadataClient {
private static final Logger LOG = LoggerFactory.getLogger(MetadataClient.class);
private static final LogContext LOG_CONTEXT = new LogContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.AtomicDouble;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.MetadataAdminClient;
import com.linkedin.kafka.cruisecontrol.common.TopicMinIsrCache;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorManager;
import com.linkedin.kafka.cruisecontrol.exception.OngoingExecutionException;
Expand Down Expand Up @@ -42,18 +42,14 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.ElectLeadersResult;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -87,7 +83,7 @@ public class Executor {
private static final long EXECUTION_PROGRESS_CHECK_INTERVAL_ADJUSTING_MS = 1000;
// The execution progress is controlled by the ExecutionTaskManager.
private final ExecutionTaskManager _executionTaskManager;
private final MetadataClient _metadataClient;
private final MetadataAdminClient _metadataClient;
private volatile long _executionProgressCheckIntervalMs;
private final long _defaultExecutionProgressCheckIntervalMs;
private Long _requestedExecutionProgressCheckIntervalMs;
Expand Down Expand Up @@ -156,7 +152,7 @@ public Executor(KafkaCruiseControlConfig config,
Time time,
MetricRegistry dropwizardMetricRegistry,
AnomalyDetectorManager anomalyDetectorManager) {
this(config, time, dropwizardMetricRegistry, null, null, anomalyDetectorManager);
this(config, time, dropwizardMetricRegistry, null, null, null, anomalyDetectorManager);
}

/**
Expand All @@ -168,7 +164,8 @@ public Executor(KafkaCruiseControlConfig config,
Executor(KafkaCruiseControlConfig config,
Time time,
MetricRegistry dropwizardMetricRegistry,
MetadataClient metadataClient,
AdminClient adminClient,
MetadataAdminClient metadataClient,
ExecutorNotifier executorNotifier,
AnomalyDetectorManager anomalyDetectorManager) {
_numExecutionStopped = new AtomicInteger(0);
Expand All @@ -185,19 +182,12 @@ public Executor(KafkaCruiseControlConfig config,
_config = config;

_time = time;
_adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(config));
_adminClient = adminClient != null ? adminClient
: KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(config));
_executionTaskManager = new ExecutionTaskManager(_adminClient, dropwizardMetricRegistry, time, config);
// Register gauge sensors.
registerGaugeSensors(dropwizardMetricRegistry);
_metadataClient = metadataClient != null ? metadataClient
: new MetadataClient(config,
new Metadata(ExecutionUtils.METADATA_REFRESH_BACKOFF,
ExecutionUtils.METADATA_REFRESH_BACKOFF_MAX,
ExecutionUtils.METADATA_EXPIRY_MS,
new LogContext(),
new ClusterResourceListeners()),
-1L,
time);
_metadataClient = metadataClient != null ? metadataClient : new MetadataAdminClient(_adminClient);
_defaultExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG);
_executionProgressCheckIntervalMs = _defaultExecutionProgressCheckIntervalMs;
_leaderMovementTimeoutMs = config.getLong(ExecutorConfig.LEADER_MOVEMENT_TIMEOUT_MS_CONFIG);
Expand Down Expand Up @@ -875,7 +865,7 @@ private synchronized void initProposalExecution(Collection<ExecutionProposal> pr
recentlyRemovedBrokers(), isTriggeredByUserRequest);
_executionTaskManager.setExecutionModeForTaskTracker(_isKafkaAssignerMode);
// Get a snapshot of (1) cluster and (2) minIsr with time by topic name.
StrategyOptions strategyOptions = new StrategyOptions.Builder(_metadataClient.refreshMetadata().cluster())
StrategyOptions strategyOptions = new StrategyOptions.Builder(_metadataClient.cluster())
.minIsrWithTimeByTopic(_topicMinIsrCache.minIsrWithTimeByTopic()).build();
_executionTaskManager.addExecutionProposals(proposals, brokersToSkipConcurrencyCheck, strategyOptions, replicaMovementStrategy);
_concurrencyAdjuster.initAdjustment(loadMonitor,
Expand Down Expand Up @@ -1069,9 +1059,7 @@ private void sanityCheckOngoingMovement() throws OngoingExecutionException {
} else {
boolean hasOngoingIntraBrokerReplicaMovement;
try {
hasOngoingIntraBrokerReplicaMovement =
hasOngoingIntraBrokerReplicaMovement(_metadataClient.cluster().nodes().stream().mapToInt(Node::id).boxed()
.collect(Collectors.toSet()), _adminClient, _config);
hasOngoingIntraBrokerReplicaMovement = hasOngoingIntraBrokerReplicaMovement(_adminClient, _config);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
// This may indicate transient (e.g. network) issues.
throw new IllegalStateException("Failed to retrieve if there are already ongoing intra-broker replica reassignments.", e);
Expand Down Expand Up @@ -1205,7 +1193,6 @@ public synchronized void shutdown() {
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for anomaly detector to shutdown.");
}
_metadataClient.close();
KafkaCruiseControlUtils.closeAdminClientWithTimeout(_adminClient);
_executionHistoryScannerExecutor.shutdownNow();
_concurrencyAdjusterExecutor.shutdownNow();
Expand Down Expand Up @@ -1799,7 +1786,7 @@ private Cluster getClusterForExecutionProgressCheck() {
if (LOG.isDebugEnabled()) {
LOG.debug("Tasks in execution: {}", inExecutionTasks());
}
return _metadataClient.refreshMetadata().cluster();
return _metadataClient.cluster();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.clients.admin.ReplicaInfo;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LogDirNotFoundException;
Expand Down Expand Up @@ -99,14 +101,14 @@ static void executeIntraBrokerReplicaMovements(List<ExecutionTask> tasksToExecut

/**
* Check whether there is ongoing intra-broker replica movement.
* @param brokersToCheck List of broker to check.
* @param adminClient The adminClient to send describeLogDirs request.
* @param config The config object that holds all the Cruise Control related configs
* @return {@code true} if there is ongoing intra-broker replica movement.
*/
static boolean hasOngoingIntraBrokerReplicaMovement(Collection<Integer> brokersToCheck, AdminClient adminClient,
static boolean hasOngoingIntraBrokerReplicaMovement(AdminClient adminClient,
KafkaCruiseControlConfig config)
throws InterruptedException, ExecutionException, TimeoutException {
Collection<Integer> brokersToCheck = adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet());
Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> logDirsByBrokerId = adminClient.describeLogDirs(brokersToCheck).descriptions();
for (Map.Entry<Integer, KafkaFuture<Map<String, LogDirDescription>>> entry : logDirsByBrokerId.entrySet()) {
Map<String, LogDirDescription> logInfos = entry.getValue().get(config.getLong(LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG), TimeUnit.MILLISECONDS);
Expand Down
Loading
Loading