Skip to content

Commit 4abe756

Browse files
committed
Migrate Executor off private Kafka APIs
Signed-off-by: Kyle Liberti <kliberti.us@gmail.com>
1 parent 2c9e746 commit 4abe756

8 files changed

Lines changed: 133 additions & 58 deletions

File tree

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ project(':cruise-control') {
339339
testImplementation 'com.jayway.jsonpath:json-path:2.7.0'
340340
testImplementation 'org.powermock:powermock-module-junit4:2.0.9'
341341
testImplementation 'org.powermock:powermock-api-easymock:2.0.9'
342-
testImplementation "org.testcontainers:kafka:$testcontainersVersion"
342+
testImplementation "org.testcontainers:testcontainers-kafka:$testcontainersVersion"
343343
}
344344

345345
publishing {
@@ -484,7 +484,7 @@ project(':cruise-control-metrics-reporter') {
484484
testImplementation "org.apache.kafka:kafka-raft:$kafkaVersion"
485485
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
486486
testImplementation 'commons-io:commons-io:2.11.0'
487-
testImplementation "org.testcontainers:kafka:$testcontainersVersion"
487+
testImplementation "org.testcontainers:testcontainers-kafka:$testcontainersVersion"
488488
testOutput sourceSets.test.output
489489
}
490490

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -396,25 +396,6 @@ public void shutDownBroker(int brokerId) {
396396
timeout,
397397
String.format("Broker %s did not shutdown properly.", brokerId)
398398
);
399-
400-
// Wait until describeLogDirs fails
401-
waitUntil(
402-
() -> {
403-
try {
404-
_adminClient.describeLogDirs(Collections.singletonList(brokerId))
405-
.allDescriptions()
406-
.get(5, TimeUnit.SECONDS);
407-
return false;
408-
} catch (InterruptedException ie) {
409-
throw new RuntimeException(ie);
410-
} catch (Exception e) {
411-
return true;
412-
}
413-
},
414-
result -> result,
415-
timeout,
416-
String.format("Broker %s did not fully shut down (logDirs RPC still succeeds).", brokerId)
417-
);
418399
}
419400

420401
public static class BrokerWaitStrategy extends AbstractWaitStrategy {
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
3+
*/
4+
5+
package com.linkedin.kafka.cruisecontrol.common;
6+
7+
import java.util.ArrayList;
8+
import java.util.Collection;
9+
import java.util.Collections;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.Set;
13+
import java.util.concurrent.ExecutionException;
14+
import org.apache.kafka.clients.admin.Admin;
15+
import org.apache.kafka.clients.admin.DescribeClusterResult;
16+
import org.apache.kafka.clients.admin.TopicDescription;
17+
import org.apache.kafka.common.Cluster;
18+
import org.apache.kafka.common.Node;
19+
import org.apache.kafka.common.PartitionInfo;
20+
import org.apache.kafka.common.TopicPartitionInfo;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
/**
25+
* Client for fetching Kafka cluster metadata using Kafka Admin APIs.
26+
*
27+
* This replaces the use of MetadataClient which relies on internal Kafka APIs.
28+
*/
29+
public class MetadataAdminClient {
30+
private static final Logger LOG = LoggerFactory.getLogger(MetadataAdminClient.class);
31+
32+
private final Admin _adminClient;
33+
34+
/**
35+
* Creates a new MetadataAdminClient.
36+
*
37+
* @param adminClient The AdminClient to use for fetching cluster metadata.
38+
*/
39+
public MetadataAdminClient(Admin adminClient) {
40+
_adminClient = adminClient;
41+
}
42+
43+
/**
44+
* Close adminClient
45+
*/
46+
public void close() {
47+
if (_adminClient != null) {
48+
try {
49+
_adminClient.close();
50+
} catch (Exception e) {
51+
LOG.warn("Failed to close AdminClient", e);
52+
}
53+
}
54+
}
55+
56+
/**
57+
* Get cluster metadata.
58+
*
59+
* @return Cluster containing the cluster metadata.
60+
*/
61+
public Cluster cluster() {
62+
try {
63+
DescribeClusterResult describeResult = _adminClient.describeCluster();
64+
Collection<Node> nodes = describeResult.nodes().get();
65+
String clusterId = describeResult.clusterId().get();
66+
67+
Set<String> topicNames = _adminClient.listTopics()
68+
.names().get();
69+
70+
Map<String, TopicDescription> topicDescriptions = _adminClient.describeTopics(topicNames)
71+
.allTopicNames().get();
72+
73+
List<PartitionInfo> partitionInfos = new ArrayList<>();
74+
for (TopicDescription desc : topicDescriptions.values()) {
75+
for (TopicPartitionInfo partInfo : desc.partitions()) {
76+
Node leader = partInfo.leader();
77+
Node[] replicas = partInfo.replicas().toArray(Node[]::new);
78+
Node[] isr = partInfo.isr().toArray(Node[]::new);
79+
partitionInfos.add(new PartitionInfo(desc.name(), partInfo.partition(), leader, replicas, isr));
80+
}
81+
}
82+
83+
return new Cluster(clusterId, nodes, partitionInfos, Collections.emptySet(), Collections.emptySet());
84+
} catch (ExecutionException | InterruptedException e) {
85+
LOG.error("Failed to fetch cluster metadata", e);
86+
throw new RuntimeException("Failed to fetch cluster metadata", e);
87+
}
88+
}
89+
}

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/common/MetadataClient.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,13 @@
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

29-
29+
/**
30+
* Legacy client for fetching Kafka cluster metadata.
31+
*
32+
* @deprecated This class is deprecated and will be removed in a future release in favor of {@link MetadataAdminClient}
33+
* which uses public Kafka APIs.
34+
*/
35+
@Deprecated
3036
public class MetadataClient {
3137
private static final Logger LOG = LoggerFactory.getLogger(MetadataClient.class);
3238
private static final LogContext LOG_CONTEXT = new LogContext();

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionUtils.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.TimeoutException;
3030
import javax.annotation.Nullable;
3131
import org.apache.kafka.clients.CommonClientConfigs;
32+
import org.apache.kafka.clients.admin.Admin;
3233
import org.apache.kafka.clients.admin.AdminClient;
3334
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
3435
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
@@ -347,7 +348,7 @@ public static void populateMinIsrState(Cluster cluster,
347348
* @param adminClient The adminClient to ask for ongoing partition reassignments.
348349
* @return The set of {@link TopicPartition partitions} that are being reassigned.
349350
*/
350-
public static Set<TopicPartition> partitionsBeingReassigned(AdminClient adminClient)
351+
public static Set<TopicPartition> partitionsBeingReassigned(Admin adminClient)
351352
throws InterruptedException, ExecutionException, TimeoutException {
352353
return ongoingPartitionReassignments(adminClient).keySet();
353354
}
@@ -362,7 +363,7 @@ public static Set<TopicPartition> partitionsBeingReassigned(AdminClient adminCli
362363
* @param adminClient The adminClient to ask for ongoing partition reassignments.
363364
* @return The map of {@link PartitionReassignment reassignment} by {@link TopicPartition partitions}.
364365
*/
365-
public static Map<TopicPartition, PartitionReassignment> ongoingPartitionReassignments(AdminClient adminClient)
366+
public static Map<TopicPartition, PartitionReassignment> ongoingPartitionReassignments(Admin adminClient)
366367
throws InterruptedException, ExecutionException, TimeoutException {
367368
Map<TopicPartition, PartitionReassignment> partitionReassignments = null;
368369
int attempts = 0;
@@ -404,7 +405,7 @@ private static Optional<NewPartitionReassignment> reassignmentValue(List<Integer
404405
* @param tasks Preferred leader election tasks to execute.
405406
* @return The {@link ElectLeadersResult result} of preferred leader election request -- cannot be {@code null}.
406407
*/
407-
public static ElectLeadersResult submitPreferredLeaderElection(AdminClient adminClient, List<ExecutionTask> tasks) {
408+
public static ElectLeadersResult submitPreferredLeaderElection(Admin adminClient, List<ExecutionTask> tasks) {
408409
if (validateNotNull(tasks, "Tasks to execute cannot be null.").isEmpty()) {
409410
throw new IllegalArgumentException("Tasks to execute cannot be empty.");
410411
}
@@ -442,7 +443,7 @@ public static ElectLeadersResult submitPreferredLeaderElection(AdminClient admin
442443
* @param tasks Inter-broker replica reassignment tasks to execute.
443444
* @return The {@link AlterPartitionReassignmentsResult result} of reassignment request -- cannot be {@code null}.
444445
*/
445-
public static AlterPartitionReassignmentsResult submitReplicaReassignmentTasks(AdminClient adminClient, List<ExecutionTask> tasks) {
446+
public static AlterPartitionReassignmentsResult submitReplicaReassignmentTasks(Admin adminClient, List<ExecutionTask> tasks) {
446447
if (validateNotNull(tasks, "Tasks to execute cannot be null.").isEmpty()) {
447448
throw new IllegalArgumentException("Tasks to execute cannot be empty.");
448449
}

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
import com.codahale.metrics.Timer;
1010
import com.google.common.util.concurrent.AtomicDouble;
1111
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
12+
import com.linkedin.kafka.cruisecontrol.common.MetadataAdminClient;
1213
import com.linkedin.kafka.cruisecontrol.common.TopicMinIsrCache;
1314
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
1415
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
15-
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
1616
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
1717
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorManager;
1818
import com.linkedin.kafka.cruisecontrol.exception.OngoingExecutionException;
@@ -42,7 +42,6 @@
4242
import java.util.concurrent.atomic.AtomicInteger;
4343
import java.util.function.Supplier;
4444
import java.util.stream.Collectors;
45-
import org.apache.kafka.clients.Metadata;
4645
import org.apache.kafka.clients.admin.AdminClient;
4746
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
4847
import org.apache.kafka.clients.admin.DescribeConfigsResult;
@@ -52,8 +51,6 @@
5251
import org.apache.kafka.common.Node;
5352
import org.apache.kafka.common.TopicPartition;
5453
import org.apache.kafka.common.config.ConfigResource;
55-
import org.apache.kafka.common.internals.ClusterResourceListeners;
56-
import org.apache.kafka.common.utils.LogContext;
5754
import org.apache.kafka.common.utils.Time;
5855
import org.slf4j.Logger;
5956
import org.slf4j.LoggerFactory;
@@ -87,7 +84,7 @@ public class Executor {
8784
private static final long EXECUTION_PROGRESS_CHECK_INTERVAL_ADJUSTING_MS = 1000;
8885
// The execution progress is controlled by the ExecutionTaskManager.
8986
private final ExecutionTaskManager _executionTaskManager;
90-
private final MetadataClient _metadataClient;
87+
private final MetadataAdminClient _metadataClient;
9188
private volatile long _executionProgressCheckIntervalMs;
9289
private final long _defaultExecutionProgressCheckIntervalMs;
9390
private Long _requestedExecutionProgressCheckIntervalMs;
@@ -168,7 +165,7 @@ public Executor(KafkaCruiseControlConfig config,
168165
Executor(KafkaCruiseControlConfig config,
169166
Time time,
170167
MetricRegistry dropwizardMetricRegistry,
171-
MetadataClient metadataClient,
168+
MetadataAdminClient metadataClient,
172169
ExecutorNotifier executorNotifier,
173170
AnomalyDetectorManager anomalyDetectorManager) {
174171
_numExecutionStopped = new AtomicInteger(0);
@@ -189,15 +186,7 @@ public Executor(KafkaCruiseControlConfig config,
189186
_executionTaskManager = new ExecutionTaskManager(_adminClient, dropwizardMetricRegistry, time, config);
190187
// Register gauge sensors.
191188
registerGaugeSensors(dropwizardMetricRegistry);
192-
_metadataClient = metadataClient != null ? metadataClient
193-
: new MetadataClient(config,
194-
new Metadata(ExecutionUtils.METADATA_REFRESH_BACKOFF,
195-
ExecutionUtils.METADATA_REFRESH_BACKOFF_MAX,
196-
ExecutionUtils.METADATA_EXPIRY_MS,
197-
new LogContext(),
198-
new ClusterResourceListeners()),
199-
-1L,
200-
time);
189+
_metadataClient = metadataClient != null ? metadataClient : new MetadataAdminClient(_adminClient);
201190
_defaultExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG);
202191
_executionProgressCheckIntervalMs = _defaultExecutionProgressCheckIntervalMs;
203192
_leaderMovementTimeoutMs = config.getLong(ExecutorConfig.LEADER_MOVEMENT_TIMEOUT_MS_CONFIG);
@@ -875,7 +864,7 @@ private synchronized void initProposalExecution(Collection<ExecutionProposal> pr
875864
recentlyRemovedBrokers(), isTriggeredByUserRequest);
876865
_executionTaskManager.setExecutionModeForTaskTracker(_isKafkaAssignerMode);
877866
// Get a snapshot of (1) cluster and (2) minIsr with time by topic name.
878-
StrategyOptions strategyOptions = new StrategyOptions.Builder(_metadataClient.refreshMetadata().cluster())
867+
StrategyOptions strategyOptions = new StrategyOptions.Builder(_metadataClient.cluster())
879868
.minIsrWithTimeByTopic(_topicMinIsrCache.minIsrWithTimeByTopic()).build();
880869
_executionTaskManager.addExecutionProposals(proposals, brokersToSkipConcurrencyCheck, strategyOptions, replicaMovementStrategy);
881870
_concurrencyAdjuster.initAdjustment(loadMonitor,
@@ -1799,7 +1788,7 @@ private Cluster getClusterForExecutionProgressCheck() {
17991788
if (LOG.isDebugEnabled()) {
18001789
LOG.debug("Tasks in execution: {}", inExecutionTasks());
18011790
}
1802-
return _metadataClient.refreshMetadata().cluster();
1791+
return _metadataClient.cluster();
18031792
}
18041793

18051794
/**

0 commit comments

Comments
 (0)