Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ project(':cruise-control') {
testImplementation 'com.jayway.jsonpath:json-path:2.7.0'
testImplementation 'org.powermock:powermock-module-junit4:2.0.9'
testImplementation 'org.powermock:powermock-api-easymock:2.0.9'
testImplementation "org.testcontainers:kafka:$testcontainersVersion"
testImplementation "org.testcontainers:testcontainers-kafka:$testcontainersVersion"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this related to fixing the Executor tests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to breaking changes in recent Docker Engine updates we need to upgrade Testcontainers to a version that adds compatibility fixes, either 1.21.4 or 2.0.3.

That being said, we could avoid this specific artifact name change by sticking with the 1.x versions instead or jumping to 2.x versions as I have done as part of my commit.

Let me update the version to 1.21.4 in my next commit to avoid this artifact name change.

}

publishing {
Expand Down Expand Up @@ -484,7 +484,7 @@ project(':cruise-control-metrics-reporter') {
testImplementation "org.apache.kafka:kafka-raft:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation "org.testcontainers:kafka:$testcontainersVersion"
testImplementation "org.testcontainers:testcontainers-kafka:$testcontainersVersion"
testOutput sourceSets.test.output
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ public void testReportingMetrics() {

@Test
public void testUpdatingMetricsTopicConfig() throws InterruptedException, TimeoutException {
TopicDescription topicDescription = _cluster.waitForTopicMetadata(TOPIC, Duration.ofSeconds(30), td -> true);
Map<String, TopicDescription> topicDescriptions =
_cluster.waitForTopicMetadata(List.of(TOPIC), Duration.ofSeconds(5), Duration.ofSeconds(30), td -> true);
TopicDescription topicDescription = topicDescriptions.get(TOPIC);
assertEquals(1, topicDescription.partitions().size());

KafkaContainer broker = _cluster.getBrokers().get(0);
Expand All @@ -223,9 +225,9 @@ public void testUpdatingMetricsTopicConfig() throws InterruptedException, Timeou

// Wait for topic metadata configuration change to propagate
int oldPartitionCount = topicDescription.partitions().size();
TopicDescription newTopicDescription = _cluster.waitForTopicMetadata(TOPIC, Duration.ofSeconds(30),
td -> td.partitions().size() != oldPartitionCount);

Map<String, TopicDescription> newTopicDescriptions = _cluster.waitForTopicMetadata(
List.of(TOPIC), Duration.ofSeconds(5), Duration.ofSeconds(30), td -> td.partitions().size() != oldPartitionCount);
TopicDescription newTopicDescription = newTopicDescriptions.get(TOPIC);
assertEquals(2, newTopicDescription.partitions().size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -273,6 +274,15 @@ public void overrideBrokerConfig(KafkaContainer kafkaContainer, Map<Object, Obje
}
}

/**
* Returns the {@link Admin} client used by this cluster instance.
*
* @return the {@link Admin} client associated with this cluster
*/
public Admin adminClient() {
return _adminClient;
}

/**
* Returns list of KafkaContainer broker objects within the TestContainer Kafka cluster.
*
Expand Down Expand Up @@ -338,36 +348,47 @@ private static <T> T waitUntil(Supplier<T> supplier, Predicate<T> condition, Dur
}

/**
* Waits until the metadata for a Kafka topic meets the specified condition,
* Waits until the metadata for the listed Kafka topics meet the specified condition,
* or until the given timeout period elapses.
*
* @param topicName the name of the topic whose metadata should be monitored
* @param timeout the maximum duration to wait for the condition to be satisfied
* @param topicNames the names of the topics whose metadata should be monitored
* @param fetchTimeout the timeout duration for the metadata retrieval call
* @param waitTimeout the maximum duration to wait for the condition to be satisfied
* @param condition a {@link Predicate} that tests the {@link TopicDescription} for readiness
* @return the {@link TopicDescription} once the condition evaluates to {@code true}
* @return the {@link Map}{@code <String, TopicDescription>} once the condition evaluates to {@code true}
*/
public TopicDescription waitForTopicMetadata(String topicName, Duration timeout, Predicate<TopicDescription> condition) {
public Map<String, TopicDescription> waitForTopicMetadata(List<String> topicNames,
Duration fetchTimeout,
Duration waitTimeout,
Predicate<TopicDescription> condition) {
return waitUntil(
() -> {
try {
return _adminClient.describeTopics(Collections.singleton(topicName))
.topicNameValues()
.get(topicName)
.get();
Map<String, TopicDescription> descriptions =
_adminClient.describeTopics(topicNames)
.allTopicNames()
.get(fetchTimeout.toSeconds(), TimeUnit.SECONDS);

boolean topicMetadataReady = descriptions.values().stream().allMatch(td -> td != null && condition.test(td));

return topicMetadataReady ? descriptions : null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for broker to become ready", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
// Topic doesn't exist yet, retry
return null;
}
throw new RuntimeException("Failed to describe topic: " + topicName, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for broker to become ready", e);
throw new RuntimeException("Failed to describe topics: " + topicNames, e);
} catch (TimeoutException e) {
// Timeout fetching metadata, treat as transient and retry
return null;
}
},
td -> td != null && condition.test(td),
timeout,
String.format("Timeout waiting for topic %s metadata to be ready.", topicName)
Objects::nonNull,
waitTimeout,
String.format("Timeout waiting for topics %s metadata to be ready.", topicNames)
);
}

Expand All @@ -385,14 +406,23 @@ public void shutDownBroker(int brokerId) {
waitUntil(
() -> {
try {
return _adminClient.describeCluster().nodes().get().stream()
.map(node -> String.valueOf(node.id()))
.collect(Collectors.toSet());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
return _adminClient.describeCluster().nodes().get(10, TimeUnit.SECONDS)
.stream().noneMatch(node -> node.id() == brokerId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for broker removal", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
// Topic doesn't exist yet, retry
return false;
}
throw new RuntimeException("Failed to describe cluster: " + e);
} catch (TimeoutException e) {
// Timeout fetching metadata, treat as transient and retry
return false;
}
},
brokerIds -> !brokerIds.contains(String.valueOf(brokerId)),
Boolean::booleanValue,
timeout,
String.format("Broker %s did not shutdown properly.", brokerId)
);
Expand Down
Comment thread
kyguy marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.linkedin.kafka.cruisecontrol.metricsreporter.config.EnvConfigProvider;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
Expand Down Expand Up @@ -513,7 +514,7 @@ public static AdminClient createAdminClient(Map<String, Object> adminClientConfi
*
* @param adminClient AdminClient to be closed.
*/
public static void closeAdminClientWithTimeout(AdminClient adminClient) {
public static void closeAdminClientWithTimeout(Admin adminClient) {
closeAdminClientWithTimeout(adminClient, ADMIN_CLIENT_CLOSE_TIMEOUT_MS);
}

Expand All @@ -523,7 +524,7 @@ public static void closeAdminClientWithTimeout(AdminClient adminClient) {
* @param adminClient AdminClient to be closed.
* @param timeoutMs the timeout.
*/
public static void closeAdminClientWithTimeout(AdminClient adminClient, long timeoutMs) {
public static void closeAdminClientWithTimeout(Admin adminClient, long timeoutMs) {
closeClientWithTimeout(() -> {
try {
((AutoCloseable) adminClient).close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.common;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Client for fetching Kafka cluster metadata using Kafka Admin APIs.
*
* This replaces the use of MetadataClient which relies on internal Kafka APIs.
*/
public class MetadataAdminClient {
private static final Logger LOG = LoggerFactory.getLogger(MetadataAdminClient.class);

private final Admin _adminClient;

/**
* Creates a new MetadataAdminClient.
*
* @param adminClient The AdminClient to use for fetching cluster metadata.
*/
public MetadataAdminClient(Admin adminClient) {
_adminClient = adminClient;
}

/**
* Close adminClient
*/
public void close() {
Comment thread
kyguy marked this conversation as resolved.
Outdated
if (_adminClient != null) {
try {
_adminClient.close();
} catch (Exception e) {
LOG.warn("Failed to close AdminClient", e);
}
}
}

/**
* Fetches the current metadata for the Kafka cluster.
*
* @return a {@link Cluster} containing the cluster ID, broker nodes, and partition information for all topics
*/
public Cluster cluster() {
try {
Set<String> topicNames = _adminClient.listTopics().names().get();

Map<String, TopicDescription> topicDescriptions = _adminClient.describeTopics(topicNames).allTopicNames().get();

DescribeClusterResult describeResult = _adminClient.describeCluster();
Collection<Node> nodes = describeResult.nodes().get();
String clusterId = describeResult.clusterId().get();

List<PartitionInfo> partitionInfos = new ArrayList<>();

for (TopicDescription desc : topicDescriptions.values()) {
for (TopicPartitionInfo partInfo : desc.partitions()) {

Node leader = partInfo.leader();
Node[] replicas = partInfo.replicas().toArray(Node[]::new);
Node[] isr = partInfo.isr().toArray(Node[]::new);

partitionInfos.add(new PartitionInfo(desc.name(), partInfo.partition(), leader, replicas, isr));
}
}

return new Cluster(clusterId, nodes, partitionInfos, Collections.emptySet(), Collections.emptySet());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while fetching cluster metadata", e);
throw new RuntimeException("Interrupted while fetching cluster metadata", e);

} catch (ExecutionException e) {
LOG.error("ExecutionException while fetching cluster metadata", e);
// Could inspect e.getCause() and retry if RetriableException, but fail fast for now
throw new RuntimeException("Failed to fetch cluster metadata", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Legacy client for fetching Kafka cluster metadata.
*
* @deprecated This class is deprecated and will be removed in a future release in favor of {@link MetadataAdminClient}
* which uses public Kafka APIs.
*/
@Deprecated
public class MetadataClient {
private static final Logger LOG = LoggerFactory.getLogger(MetadataClient.class);
private static final LogContext LOG_CONTEXT = new LogContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
Expand Down Expand Up @@ -53,7 +53,7 @@ public class ExecutionTaskManager {
* @param time The time object to get the time.
* @param config config object that holds all Kafka Cruise control related configs
*/
public ExecutionTaskManager(AdminClient adminClient,
public ExecutionTaskManager(Admin adminClient,
MetricRegistry dropwizardMetricRegistry,
Time time,
KafkaCruiseControlConfig config) {
Expand Down
Comment thread
kyguy marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
Expand Down Expand Up @@ -75,7 +75,7 @@ public class ExecutionTaskPlanner {
private final Map<Long, ExecutionTask> _remainingLeadershipMovements;
private long _executionId;
private ReplicaMovementStrategy _defaultReplicaMovementTaskStrategy;
private final AdminClient _adminClient;
private final Admin _adminClient;
private final KafkaCruiseControlConfig _config;
private final long _taskExecutionAlertingThresholdMs;
private final double _interBrokerReplicaMovementRateAlertingThreshold;
Expand All @@ -89,7 +89,7 @@ public class ExecutionTaskPlanner {
* @param adminClient The adminClient to send describeReplicaLogDirs request.
* @param config The config object that holds all the Cruise Control related configs.
*/
public ExecutionTaskPlanner(AdminClient adminClient, KafkaCruiseControlConfig config) {
public ExecutionTaskPlanner(Admin adminClient, KafkaCruiseControlConfig config) {
_executionId = 0L;
_interPartMoveTasksByBrokerId = new HashMap<>();
_intraPartMoveTasksByBrokerId = new HashMap<>();
Expand Down
Comment thread
kyguy marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.clients.admin.ElectLeadersResult;
Expand Down Expand Up @@ -347,7 +347,7 @@ public static void populateMinIsrState(Cluster cluster,
* @param adminClient The adminClient to ask for ongoing partition reassignments.
* @return The set of {@link TopicPartition partitions} that are being reassigned.
*/
public static Set<TopicPartition> partitionsBeingReassigned(AdminClient adminClient)
public static Set<TopicPartition> partitionsBeingReassigned(Admin adminClient)
throws InterruptedException, ExecutionException, TimeoutException {
return ongoingPartitionReassignments(adminClient).keySet();
}
Expand All @@ -362,7 +362,7 @@ public static Set<TopicPartition> partitionsBeingReassigned(AdminClient adminCli
* @param adminClient The adminClient to ask for ongoing partition reassignments.
* @return The map of {@link PartitionReassignment reassignment} by {@link TopicPartition partitions}.
*/
public static Map<TopicPartition, PartitionReassignment> ongoingPartitionReassignments(AdminClient adminClient)
public static Map<TopicPartition, PartitionReassignment> ongoingPartitionReassignments(Admin adminClient)
throws InterruptedException, ExecutionException, TimeoutException {
Map<TopicPartition, PartitionReassignment> partitionReassignments = null;
int attempts = 0;
Expand Down Expand Up @@ -404,7 +404,7 @@ private static Optional<NewPartitionReassignment> reassignmentValue(List<Integer
* @param tasks Preferred leader election tasks to execute.
* @return The {@link ElectLeadersResult result} of preferred leader election request -- cannot be {@code null}.
*/
public static ElectLeadersResult submitPreferredLeaderElection(AdminClient adminClient, List<ExecutionTask> tasks) {
public static ElectLeadersResult submitPreferredLeaderElection(Admin adminClient, List<ExecutionTask> tasks) {
if (validateNotNull(tasks, "Tasks to execute cannot be null.").isEmpty()) {
throw new IllegalArgumentException("Tasks to execute cannot be empty.");
}
Expand Down Expand Up @@ -442,7 +442,7 @@ public static ElectLeadersResult submitPreferredLeaderElection(AdminClient admin
* @param tasks Inter-broker replica reassignment tasks to execute.
* @return The {@link AlterPartitionReassignmentsResult result} of reassignment request -- cannot be {@code null}.
*/
public static AlterPartitionReassignmentsResult submitReplicaReassignmentTasks(AdminClient adminClient, List<ExecutionTask> tasks) {
public static AlterPartitionReassignmentsResult submitReplicaReassignmentTasks(Admin adminClient, List<ExecutionTask> tasks) {
if (validateNotNull(tasks, "Tasks to execute cannot be null.").isEmpty()) {
throw new IllegalArgumentException("Tasks to execute cannot be empty.");
}
Expand Down Expand Up @@ -491,7 +491,7 @@ public static AlterPartitionReassignmentsResult submitReplicaReassignmentTasks(A
* @param adminClient The adminClient to stop the inter-broker replica reassignments.
* @return The {@link AlterPartitionReassignmentsResult result} of stop reassignment request, {@code null} if there isn't any reassignments.
*/
public static AlterPartitionReassignmentsResult maybeStopPartitionReassignment(AdminClient adminClient) {
public static AlterPartitionReassignmentsResult maybeStopPartitionReassignment(Admin adminClient) {
Set<TopicPartition> partitionsBeingReassigned;
try {
partitionsBeingReassigned = partitionsBeingReassigned(adminClient);
Expand Down
Loading
Loading