Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.kafka.tools.reassign;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
Expand Down Expand Up @@ -178,6 +181,7 @@ private static void handleAction(Admin adminClient, ReassignPartitionsCommandOpt
opts.options.has(opts.additionalOpt),
Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)),
opts.options.valueOf(opts.interBrokerThrottleOpt),
opts.options.valueOf(opts.brokerListWithoutThrottleOpt),
opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt),
opts.options.valueOf(opts.timeoutOpt),
Time.SYSTEM,
Expand Down Expand Up @@ -752,21 +756,24 @@ static Entry<List<Integer>, List<String>> parseGenerateAssignmentArgs(String rea
/**
* The entry point for the --execute and --execute-additional commands.
*
* @param adminClient The AdminClient to use.
* @param additional Whether --additional was passed.
* @param reassignmentJson The JSON string to use for the topics to reassign.
* @param interBrokerThrottle The inter-broker throttle to use, or a negative
* number to skip using a throttle.
* @param logDirThrottle The replica log directory throttle to use, or a
* negative number to skip using a throttle.
* @param timeoutMs The maximum time in ms to wait for log directory
* replica assignment to begin.
* @param time The Time object to use.
* @param adminClient The AdminClient to use.
* @param additional Whether --additional was passed.
* @param reassignmentJson The JSON string to use for the topics to reassign.
* @param interBrokerThrottle The inter-broker throttle to use, or a negative
* number to skip using a throttle.
* @param brokerListWithoutThrottleString The brokerIds string to skip broker-level throttle
* config updates for during reassignment.
* @param logDirThrottle The replica log directory throttle to use, or a
* negative number to skip using a throttle.
* @param timeoutMs The maximum time in ms to wait for log directory
* replica assignment to begin.
* @param time The Time object to use.
*/
public static void executeAssignment(Admin adminClient,
boolean additional,
String reassignmentJson,
long interBrokerThrottle,
String brokerListWithoutThrottleString,
long logDirThrottle,
long timeoutMs,
Time time,
Expand Down Expand Up @@ -795,14 +802,19 @@ public static void executeAssignment(Admin adminClient,
if (interBrokerThrottle >= 0 || logDirThrottle >= 0) {
System.out.println(YOU_MUST_RUN_VERIFY_PERIODICALLY_MESSAGE);

Set<Integer> brokerWithoutThrottleSet = new HashSet<>();
if (!brokerListWithoutThrottleString.isEmpty()) {
brokerWithoutThrottleSet = parseCommaSeparatedIntSet(brokerListWithoutThrottleString);
}

if (interBrokerThrottle >= 0) {
Map<String, Map<Integer, PartitionMove>> moveMap = calculateProposedMoveMap(currentReassignments, proposedParts, currentParts);
modifyReassignmentThrottle(adminClient, moveMap, interBrokerThrottle);
modifyReassignmentThrottle(adminClient, moveMap, interBrokerThrottle, brokerWithoutThrottleSet);
}

if (logDirThrottle >= 0) {
Set<Integer> movingBrokers = calculateMovingBrokers(proposedReplicas.keySet());
modifyLogDirThrottle(adminClient, movingBrokers, logDirThrottle);
modifyLogDirThrottle(adminClient, movingBrokers, logDirThrottle, brokerWithoutThrottleSet);
}
}

Expand All @@ -827,6 +839,15 @@ public static void executeAssignment(Admin adminClient,
}
}

private static Set<Integer> parseCommaSeparatedIntSet(String s) {
if (s == null || s.trim().isEmpty()) return Collections.emptySet();

return Arrays.stream(s.trim().split(","))
.filter(t -> !t.isEmpty())
.map(Integer::parseInt)
.collect(Collectors.toCollection(LinkedHashSet::new));
}

/**
* Execute some partition log directory movements.
*
Expand Down Expand Up @@ -1078,15 +1099,19 @@ static Map<String, Map<Integer, PartitionMove>> calculateProposedMoveMap(Map<Top
/**
* Calculate the leader throttle configurations to use.
*
* @param moveMap The movements.
* @return A map from topic names to leader throttle configurations.
* @param moveMap The movements.
* @param brokerWithoutThrottleSet The brokerIds set to skip broker-level throttle
* * config updates for during reassignment.
* @return A map from topic names to leader throttle configurations.
*/
static Map<String, String> calculateLeaderThrottles(Map<String, Map<Integer, PartitionMove>> moveMap) {
static Map<String, String> calculateLeaderThrottles(Map<String, Map<Integer, PartitionMove>> moveMap,
Set<Integer> brokerWithoutThrottleSet) {
Map<String, String> results = new HashMap<>();
moveMap.forEach((topicName, partMoveMap) -> {
Set<String> components = new TreeSet<>();
partMoveMap.forEach((partId, move) ->
move.sources.forEach(source -> components.add(String.format("%d:%d", partId, source))));
move.sources.stream().filter(source -> !brokerWithoutThrottleSet.contains(source))
.forEach(source -> components.add(String.format("%d:%d", partId, source))));
results.put(topicName, String.join(",", components));
});
return results;
Expand All @@ -1095,16 +1120,19 @@ static Map<String, String> calculateLeaderThrottles(Map<String, Map<Integer, Par
/**
* Calculate the follower throttle configurations to use.
*
* @param moveMap The movements.
* @return A map from topic names to follower throttle configurations.
* @param moveMap The movements.
* @param brokerWithoutThrottleSet The brokerIds set to skip broker-level throttle
* config updates for during reassignment.
* @return A map from topic names to follower throttle configurations.
*/
static Map<String, String> calculateFollowerThrottles(Map<String, Map<Integer, PartitionMove>> moveMap) {
static Map<String, String> calculateFollowerThrottles(Map<String, Map<Integer, PartitionMove>> moveMap,
Set<Integer> brokerWithoutThrottleSet) {
Map<String, String> results = new HashMap<>();
moveMap.forEach((topicName, partMoveMap) -> {
Set<String> components = new TreeSet<>();
partMoveMap.forEach((partId, move) ->
move.destinations.forEach(destination -> {
if (!move.sources.contains(destination)) {
if (!move.sources.contains(destination) && !brokerWithoutThrottleSet.contains(destination)) {
components.add(String.format("%d:%d", partId, destination));
}
})
Expand All @@ -1118,15 +1146,19 @@ static Map<String, String> calculateFollowerThrottles(Map<String, Map<Integer, P
/**
* Calculate all the brokers which are involved in the given partition reassignments.
*
* @param moveMap The partition movements.
* @return A set of all the brokers involved.
* @param moveMap The partition movements.
* @param brokerWithoutThrottleSet The brokerIds set to skip broker-level throttle
* config updates for during reassignment.
* @return A set of all the brokers involved.
*/
static Set<Integer> calculateReassigningBrokers(Map<String, Map<Integer, PartitionMove>> moveMap) {
static Set<Integer> calculateReassigningBrokers(Map<String, Map<Integer, PartitionMove>> moveMap,
Set<Integer> brokerWithoutThrottleSet) {
Set<Integer> reassigningBrokers = new TreeSet<>();
moveMap.values().forEach(partMoveMap -> partMoveMap.values().forEach(partMove -> {
reassigningBrokers.addAll(partMove.sources);
reassigningBrokers.addAll(partMove.destinations);
}));
reassigningBrokers.removeAll(brokerWithoutThrottleSet);
return reassigningBrokers;
}

Expand Down Expand Up @@ -1171,13 +1203,14 @@ static void modifyTopicThrottles(Admin adminClient,
private static void modifyReassignmentThrottle(
Admin admin,
Map<String, Map<Integer, PartitionMove>> moveMap,
long interBrokerThrottle
long interBrokerThrottle,
Set<Integer> brokerWithoutThrottleSet
) throws ExecutionException, InterruptedException {
Map<String, String> leaderThrottles = calculateLeaderThrottles(moveMap);
Map<String, String> followerThrottles = calculateFollowerThrottles(moveMap);
Map<String, String> leaderThrottles = calculateLeaderThrottles(moveMap, brokerWithoutThrottleSet);
Map<String, String> followerThrottles = calculateFollowerThrottles(moveMap, brokerWithoutThrottleSet);
modifyTopicThrottles(admin, leaderThrottles, followerThrottles);

Set<Integer> reassigningBrokers = calculateReassigningBrokers(moveMap);
Set<Integer> reassigningBrokers = calculateReassigningBrokers(moveMap, brokerWithoutThrottleSet);
modifyInterBrokerThrottle(admin, reassigningBrokers, interBrokerThrottle);
}

Expand Down Expand Up @@ -1215,7 +1248,8 @@ static void modifyInterBrokerThrottle(Admin adminClient,
*/
static void modifyLogDirThrottle(Admin admin,
Set<Integer> movingBrokers,
long logDirThrottle) throws ExecutionException, InterruptedException {
long logDirThrottle,
Set<Integer> brokerWithoutThrottleSet) throws ExecutionException, InterruptedException {
if (logDirThrottle >= 0) {
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
movingBrokers.forEach(brokerId -> {
Expand Down Expand Up @@ -1495,6 +1529,7 @@ else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bo
opts.bootstrapServerOpt,
opts.commandConfigOpt,
opts.interBrokerThrottleOpt,
opts.brokerListWithoutThrottleOpt,
opts.replicaAlterLogDirsThrottleOpt,
opts.timeoutOpt,
opts.disallowReplicationFactorChangeOpt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ReassignPartitionsCommandOptions extends CommandDefaultOptions {
final OptionSpec<String> reassignmentJsonFileOpt;
final OptionSpec<String> topicsToMoveJsonFileOpt;
final OptionSpec<String> brokerListOpt;
final OptionSpec<String> brokerListWithoutThrottleOpt;
final OptionSpec<String> bootstrapControllerOpt;
final OptionSpec<?> disableRackAware;
final OptionSpec<Long> interBrokerThrottleOpt;
Expand Down Expand Up @@ -86,6 +87,15 @@ public ReassignPartitionsCommandOptions(String[] args) {
.describedAs("brokerlist")
.ofType(String.class);

brokerListWithoutThrottleOpt = parser.accepts("broker-list-without-throttle", "Optional. Comma-separated broker ID list (e.g. 1,2) that " +
"should be excluded from broker-level throttle config updates during partition reassignment execution. " +
"When --execute and --throttle are used, it normally applies throttle configs on all brokers involved in the reassignment. " +
"If any of those brokers are known to be down or unreachable, adding them to --broker-list-without-throttle makes it " +
"skip the throttle-setting step for those brokers, avoiding retries/timeouts, while still throttling the remaining reachable brokers.")
.withRequiredArg()
.describedAs("broker list without throttle")
.ofType(String.class);
Comment on lines +90 to +97
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is adding a new param to a command line tool (public API), so it would need a KIP to discuss and approve with the community. You can find info here and take it from there https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-Process
Thanks for looking into this!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm Thank you very much for the information you provided! I have learned that adding new parameters to the command-line tool (public API) requires going through the KIP process, and it needs to be discussed and approved by the community. I will carefully read the link you provided to understand the specific steps of the KIP process and start preparing the proposal. Thank you again for your guidance!


bootstrapControllerOpt = parser.accepts("bootstrap-controller", "The controller to use for reassignment. " +
"By default, the tool will get the quorum controller. This option supports the actions --cancel and --list.")
.withOptionalArg()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public void run(ExperimentDef config, Journal journal, boolean displayChartsOnSc

ReassignPartitionsCommand.executeAssignment(adminClient, false,
ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.of()),
config.throttle, -1L, 10000L, Time.SYSTEM, false);
config.throttle, "", -1L, 10000L, Time.SYSTEM, false);

//Await completion
waitForReassignmentToComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ public void testDisallowReplicationFactorChange() {
"bar-0: The replication factor is changed from 3 to 1\n" +
"foo-0: The replication factor is changed from 3 to 2\n" +
"foo-1: The replication factor is changed from 3 to 4",
assertThrows(TerseException.class, () -> executeAssignment(admin, false, assignment, -1L, -1L, 10000L, Time.SYSTEM, true)).getMessage());
assertThrows(TerseException.class, () -> executeAssignment(admin, false, assignment, -1L, "", -1L, 10000L, Time.SYSTEM, true)).getMessage());
}
}

Expand Down Expand Up @@ -757,7 +757,7 @@ private void runExecuteAssignment(boolean additional,
Long replicaAlterLogDirsThrottle) throws RuntimeException {
try (Admin admin = Admin.create(Map.of(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
executeAssignment(admin, additional, reassignmentJson,
interBrokerThrottle, replicaAlterLogDirsThrottle, 10000L, Time.SYSTEM, false);
interBrokerThrottle, "", replicaAlterLogDirsThrottle, 10000L, Time.SYSTEM, false);
} catch (ExecutionException | InterruptedException | JsonProcessingException | TerseException e) {
throw new RuntimeException(e);
}
Expand Down
Loading