Skip to content

Commit 93f4bc5

Browse files
committed
Addressing feedback - mm
Signed-off-by: Kyle Liberti <kliberti.us@gmail.com>
1 parent aeb8409 commit 93f4bc5

11 files changed

Lines changed: 32 additions & 48 deletions

File tree

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ project(':cruise-control') {
339339
testImplementation 'com.jayway.jsonpath:json-path:2.7.0'
340340
testImplementation 'org.powermock:powermock-module-junit4:2.0.9'
341341
testImplementation 'org.powermock:powermock-api-easymock:2.0.9'
342-
testImplementation "org.testcontainers:testcontainers-kafka:$testcontainersVersion"
342+
testImplementation "org.testcontainers:kafka:$testcontainersVersion"
343343
}
344344

345345
publishing {
@@ -484,7 +484,7 @@ project(':cruise-control-metrics-reporter') {
484484
testImplementation "org.apache.kafka:kafka-raft:$kafkaVersion"
485485
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
486486
testImplementation 'commons-io:commons-io:2.11.0'
487-
testImplementation "org.testcontainers:testcontainers-kafka:$testcontainersVersion"
487+
testImplementation "org.testcontainers:kafka:$testcontainersVersion"
488488
testOutput sourceSets.test.output
489489
}
490490

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import com.linkedin.kafka.cruisecontrol.metricsreporter.config.EnvConfigProvider;
1818
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
1919
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
20-
import org.apache.kafka.clients.admin.Admin;
2120
import org.apache.kafka.clients.admin.AdminClient;
2221
import org.apache.kafka.clients.admin.AdminClientConfig;
2322
import org.apache.kafka.clients.admin.AlterConfigOp;
@@ -514,7 +513,7 @@ public static AdminClient createAdminClient(Map<String, Object> adminClientConfi
514513
*
515514
* @param adminClient AdminClient to be closed.
516515
*/
517-
public static void closeAdminClientWithTimeout(Admin adminClient) {
516+
public static void closeAdminClientWithTimeout(AdminClient adminClient) {
518517
closeAdminClientWithTimeout(adminClient, ADMIN_CLIENT_CLOSE_TIMEOUT_MS);
519518
}
520519

@@ -524,7 +523,7 @@ public static void closeAdminClientWithTimeout(Admin adminClient) {
524523
* @param adminClient AdminClient to be closed.
525524
* @param timeoutMs the timeout.
526525
*/
527-
public static void closeAdminClientWithTimeout(Admin adminClient, long timeoutMs) {
526+
public static void closeAdminClientWithTimeout(AdminClient adminClient, long timeoutMs) {
528527
closeClientWithTimeout(() -> {
529528
try {
530529
((AutoCloseable) adminClient).close();

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/common/MetadataAdminClient.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,6 @@ public MetadataAdminClient(Admin adminClient) {
4040
_adminClient = adminClient;
4141
}
4242

43-
/**
44-
* Close adminClient
45-
*/
46-
public void close() {
47-
if (_adminClient != null) {
48-
try {
49-
_adminClient.close();
50-
} catch (Exception e) {
51-
LOG.warn("Failed to close AdminClient", e);
52-
}
53-
}
54-
}
55-
5643
/**
5744
* Fetches the current metadata for the Kafka cluster.
5845
*

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import java.util.List;
1616
import java.util.Map;
1717
import java.util.Set;
18-
import org.apache.kafka.clients.admin.Admin;
18+
import org.apache.kafka.clients.admin.AdminClient;
1919
import org.apache.kafka.common.TopicPartition;
2020
import org.apache.kafka.common.utils.Time;
2121
import org.slf4j.Logger;
@@ -53,7 +53,7 @@ public class ExecutionTaskManager {
5353
* @param time The time object to get the time.
5454
* @param config config object that holds all Kafka Cruise control related configs
5555
*/
56-
public ExecutionTaskManager(Admin adminClient,
56+
public ExecutionTaskManager(AdminClient adminClient,
5757
MetricRegistry dropwizardMetricRegistry,
5858
Time time,
5959
KafkaCruiseControlConfig config) {

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.TimeoutException;
3030
import java.util.stream.Collectors;
31-
import org.apache.kafka.clients.admin.Admin;
31+
import org.apache.kafka.clients.admin.AdminClient;
3232
import org.apache.kafka.common.Cluster;
3333
import org.apache.kafka.common.KafkaFuture;
3434
import org.apache.kafka.common.Node;
@@ -75,7 +75,7 @@ public class ExecutionTaskPlanner {
7575
private final Map<Long, ExecutionTask> _remainingLeadershipMovements;
7676
private long _executionId;
7777
private ReplicaMovementStrategy _defaultReplicaMovementTaskStrategy;
78-
private final Admin _adminClient;
78+
private final AdminClient _adminClient;
7979
private final KafkaCruiseControlConfig _config;
8080
private final long _taskExecutionAlertingThresholdMs;
8181
private final double _interBrokerReplicaMovementRateAlertingThreshold;
@@ -89,7 +89,7 @@ public class ExecutionTaskPlanner {
8989
* @param adminClient The adminClient to send describeReplicaLogDirs request.
9090
* @param config The config object that holds all the Cruise Control related configs.
9191
*/
92-
public ExecutionTaskPlanner(Admin adminClient, KafkaCruiseControlConfig config) {
92+
public ExecutionTaskPlanner(AdminClient adminClient, KafkaCruiseControlConfig config) {
9393
_executionId = 0L;
9494
_interPartMoveTasksByBrokerId = new HashMap<>();
9595
_intraPartMoveTasksByBrokerId = new HashMap<>();

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionUtils.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import java.util.concurrent.TimeoutException;
3030
import javax.annotation.Nullable;
3131
import org.apache.kafka.clients.CommonClientConfigs;
32-
import org.apache.kafka.clients.admin.Admin;
32+
import org.apache.kafka.clients.admin.AdminClient;
3333
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
3434
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
3535
import org.apache.kafka.clients.admin.ElectLeadersResult;
@@ -347,7 +347,7 @@ public static void populateMinIsrState(Cluster cluster,
347347
* @param adminClient The adminClient to ask for ongoing partition reassignments.
348348
* @return The set of {@link TopicPartition partitions} that are being reassigned.
349349
*/
350-
public static Set<TopicPartition> partitionsBeingReassigned(Admin adminClient)
350+
public static Set<TopicPartition> partitionsBeingReassigned(AdminClient adminClient)
351351
throws InterruptedException, ExecutionException, TimeoutException {
352352
return ongoingPartitionReassignments(adminClient).keySet();
353353
}
@@ -362,7 +362,7 @@ public static Set<TopicPartition> partitionsBeingReassigned(Admin adminClient)
362362
* @param adminClient The adminClient to ask for ongoing partition reassignments.
363363
* @return The map of {@link PartitionReassignment reassignment} by {@link TopicPartition partitions}.
364364
*/
365-
public static Map<TopicPartition, PartitionReassignment> ongoingPartitionReassignments(Admin adminClient)
365+
public static Map<TopicPartition, PartitionReassignment> ongoingPartitionReassignments(AdminClient adminClient)
366366
throws InterruptedException, ExecutionException, TimeoutException {
367367
Map<TopicPartition, PartitionReassignment> partitionReassignments = null;
368368
int attempts = 0;
@@ -404,7 +404,7 @@ private static Optional<NewPartitionReassignment> reassignmentValue(List<Integer
404404
* @param tasks Preferred leader election tasks to execute.
405405
* @return The {@link ElectLeadersResult result} of preferred leader election request -- cannot be {@code null}.
406406
*/
407-
public static ElectLeadersResult submitPreferredLeaderElection(Admin adminClient, List<ExecutionTask> tasks) {
407+
public static ElectLeadersResult submitPreferredLeaderElection(AdminClient adminClient, List<ExecutionTask> tasks) {
408408
if (validateNotNull(tasks, "Tasks to execute cannot be null.").isEmpty()) {
409409
throw new IllegalArgumentException("Tasks to execute cannot be empty.");
410410
}
@@ -442,7 +442,7 @@ public static ElectLeadersResult submitPreferredLeaderElection(Admin adminClient
442442
* @param tasks Inter-broker replica reassignment tasks to execute.
443443
* @return The {@link AlterPartitionReassignmentsResult result} of reassignment request -- cannot be {@code null}.
444444
*/
445-
public static AlterPartitionReassignmentsResult submitReplicaReassignmentTasks(Admin adminClient, List<ExecutionTask> tasks) {
445+
public static AlterPartitionReassignmentsResult submitReplicaReassignmentTasks(AdminClient adminClient, List<ExecutionTask> tasks) {
446446
if (validateNotNull(tasks, "Tasks to execute cannot be null.").isEmpty()) {
447447
throw new IllegalArgumentException("Tasks to execute cannot be empty.");
448448
}
@@ -491,7 +491,7 @@ public static AlterPartitionReassignmentsResult submitReplicaReassignmentTasks(A
491491
* @param adminClient The adminClient to stop the inter-broker replica reassignments.
492492
* @return The {@link AlterPartitionReassignmentsResult result} of stop reassignment request, {@code null} if there isn't any reassignments.
493493
*/
494-
public static AlterPartitionReassignmentsResult maybeStopPartitionReassignment(Admin adminClient) {
494+
public static AlterPartitionReassignmentsResult maybeStopPartitionReassignment(AdminClient adminClient) {
495495
Set<TopicPartition> partitionsBeingReassigned;
496496
try {
497497
partitionsBeingReassigned = partitionsBeingReassigned(adminClient);

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import java.util.concurrent.atomic.AtomicInteger;
4343
import java.util.function.Supplier;
4444
import java.util.stream.Collectors;
45-
import org.apache.kafka.clients.admin.Admin;
45+
import org.apache.kafka.clients.admin.AdminClient;
4646
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
4747
import org.apache.kafka.clients.admin.DescribeConfigsResult;
4848
import org.apache.kafka.clients.admin.ElectLeadersResult;
@@ -88,7 +88,7 @@ public class Executor {
8888
private final long _defaultExecutionProgressCheckIntervalMs;
8989
private Long _requestedExecutionProgressCheckIntervalMs;
9090
private final ExecutorService _proposalExecutor;
91-
private final Admin _adminClient;
91+
private final AdminClient _adminClient;
9292
private final double _leaderMovementTimeoutMs;
9393

9494
private static final int NO_STOP_EXECUTION = 0;
@@ -164,7 +164,7 @@ public Executor(KafkaCruiseControlConfig config,
164164
Executor(KafkaCruiseControlConfig config,
165165
Time time,
166166
MetricRegistry dropwizardMetricRegistry,
167-
Admin adminClient,
167+
AdminClient adminClient,
168168
MetadataAdminClient metadataClient,
169169
ExecutorNotifier executorNotifier,
170170
AnomalyDetectorManager anomalyDetectorManager) {
@@ -1193,7 +1193,6 @@ public synchronized void shutdown() {
11931193
} catch (InterruptedException e) {
11941194
LOG.warn("Interrupted while waiting for anomaly detector to shutdown.");
11951195
}
1196-
_metadataClient.close();
11971196
KafkaCruiseControlUtils.closeAdminClientWithTimeout(_adminClient);
11981197
_executionHistoryScannerExecutor.shutdownNow();
11991198
_concurrencyAdjusterExecutor.shutdownNow();

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorAdminUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import java.util.concurrent.TimeoutException;
1616
import java.util.stream.Collectors;
1717
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
18-
import org.apache.kafka.clients.admin.Admin;
18+
import org.apache.kafka.clients.admin.AdminClient;
1919
import org.apache.kafka.clients.admin.LogDirDescription;
2020
import org.apache.kafka.clients.admin.ReplicaInfo;
2121
import org.apache.kafka.common.KafkaFuture;
@@ -46,7 +46,7 @@ private ExecutorAdminUtils() {
4646
* @return Replica logdir information by task.
4747
*/
4848
static Map<ExecutionTask, ReplicaLogDirInfo> getLogdirInfoForExecutionTask(Collection<ExecutionTask> tasks,
49-
Admin adminClient,
49+
AdminClient adminClient,
5050
KafkaCruiseControlConfig config) {
5151
Set<TopicPartitionReplica> replicasToCheck = new HashSet<>();
5252
Map<ExecutionTask, ReplicaLogDirInfo> logdirInfoByTask = new HashMap<>();
@@ -77,7 +77,7 @@ static Map<ExecutionTask, ReplicaLogDirInfo> getLogdirInfoForExecutionTask(Colle
7777
* @param config The config object that holds all the Cruise Control related configs
7878
*/
7979
static void executeIntraBrokerReplicaMovements(List<ExecutionTask> tasksToExecute,
80-
Admin adminClient,
80+
AdminClient adminClient,
8181
ExecutionTaskManager executionTaskManager,
8282
KafkaCruiseControlConfig config) {
8383
Map<TopicPartitionReplica, String> replicaAssignment = new HashMap<>();
@@ -105,7 +105,7 @@ static void executeIntraBrokerReplicaMovements(List<ExecutionTask> tasksToExecut
105105
* @param config The config object that holds all the Cruise Control related configs
106106
* @return {@code true} if there is ongoing intra-broker replica movement.
107107
*/
108-
static boolean hasOngoingIntraBrokerReplicaMovement(Admin adminClient,
108+
static boolean hasOngoingIntraBrokerReplicaMovement(AdminClient adminClient,
109109
KafkaCruiseControlConfig config)
110110
throws InterruptedException, ExecutionException, TimeoutException {
111111
Collection<Integer> brokersToCheck = adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet());

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsUtils;
88
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
9-
import org.apache.kafka.clients.admin.Admin;
9+
import org.apache.kafka.clients.admin.AdminClient;
1010
import org.apache.kafka.clients.admin.AlterConfigOp;
1111
import org.apache.kafka.clients.admin.Config;
1212
import org.apache.kafka.clients.admin.ConfigEntry;
@@ -43,28 +43,28 @@ class ReplicationThrottleHelper {
4343
public static final long CLIENT_REQUEST_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30);
4444
static final int RETRIES = 30;
4545

46-
private final Admin _adminClient;
46+
private final AdminClient _adminClient;
4747
private final Long _throttleRate;
4848
private final int _retries;
4949
private final Set<Integer> _deadBrokers;
5050

51-
ReplicationThrottleHelper(Admin adminClient, Long throttleRate) {
51+
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate) {
5252
this(adminClient, throttleRate, RETRIES);
5353
}
5454

55-
ReplicationThrottleHelper(Admin adminClient, Long throttleRate, Set<Integer> deadBrokers) {
55+
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, Set<Integer> deadBrokers) {
5656
this(adminClient, throttleRate, RETRIES, deadBrokers);
5757
}
5858

5959
// for testing
60-
ReplicationThrottleHelper(Admin adminClient, Long throttleRate, int retries) {
60+
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries) {
6161
this._adminClient = adminClient;
6262
this._throttleRate = throttleRate;
6363
this._retries = retries;
6464
this._deadBrokers = new HashSet<Integer>();
6565
}
6666

67-
ReplicationThrottleHelper(Admin adminClient, Long throttleRate, int retries, Set<Integer> deadBrokers) {
67+
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries, Set<Integer> deadBrokers) {
6868
this._adminClient = adminClient;
6969
this._throttleRate = throttleRate;
7070
this._retries = retries;

cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ public int clusterSize() {
106106
@Before
107107
public void setUp() {
108108
Properties adminClientProps = new Properties();
109-
adminClientProps.setProperty(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "0");
110109
setSecurityConfigs(adminClientProps, "admin");
111110

112111
_cluster = new CCContainerizedKraftCluster(clusterSize(), buildBrokerConfigs(), adminClientProps);
@@ -662,8 +661,8 @@ private Map<String, TopicDescription> createTopics(int produceSizeInBytes) {
662661
&& !topicDescription.partitions().isEmpty()
663662
&& topicDescription.partitions().stream().allMatch(p ->
664663
p.leader() != null
665-
&& p.replicas() != null && !p.replicas().isEmpty()
666-
&& p.isr() != null && !p.isr().isEmpty());
664+
&& !p.replicas().isEmpty()
665+
&& !p.isr().isEmpty());
667666
});
668667

669668
produceRandomDataToTopic(TOPIC0, produceSizeInBytes);
@@ -785,7 +784,7 @@ private void executeAndVerifyProposals(Admin adminClient,
785784
EasyMock.replay(mockUserTaskInfo, mockExecutorNotifier, mockLoadMonitor, mockAnomalyDetectorManager);
786785
}
787786
MetricRegistry metricRegistry = new MetricRegistry();
788-
Executor executor = new Executor(configs, Time.SYSTEM, metricRegistry, _adminClient,
787+
Executor executor = new Executor(configs, Time.SYSTEM, metricRegistry, (AdminClient) _adminClient,
789788
new MetadataAdminClient(_adminClient), mockExecutorNotifier, mockAnomalyDetectorManager);
790789
executor.setUserTaskManager(mockUserTaskManager);
791790
Map<TopicPartition, Integer> replicationFactors = new HashMap<>();

0 commit comments

Comments
 (0)