diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 1933e4e3526bd..18c08e23e167a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.tools.reassign; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; @@ -178,6 +181,7 @@ private static void handleAction(Admin adminClient, ReassignPartitionsCommandOpt opts.options.has(opts.additionalOpt), Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)), opts.options.valueOf(opts.interBrokerThrottleOpt), + opts.options.valueOf(opts.brokerListWithoutThrottleOpt), opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt), opts.options.valueOf(opts.timeoutOpt), Time.SYSTEM, @@ -752,21 +756,24 @@ static Entry, List> parseGenerateAssignmentArgs(String rea /** * The entry point for the --execute and --execute-additional commands. * - * @param adminClient The AdminClient to use. - * @param additional Whether --additional was passed. - * @param reassignmentJson The JSON string to use for the topics to reassign. - * @param interBrokerThrottle The inter-broker throttle to use, or a negative - * number to skip using a throttle. - * @param logDirThrottle The replica log directory throttle to use, or a - * negative number to skip using a throttle. - * @param timeoutMs The maximum time in ms to wait for log directory - * replica assignment to begin. - * @param time The Time object to use. + * @param adminClient The AdminClient to use. + * @param additional Whether --additional was passed. + * @param reassignmentJson The JSON string to use for the topics to reassign. + * @param interBrokerThrottle The inter-broker throttle to use, or a negative + * number to skip using a throttle. + * @param brokerListWithoutThrottleString The brokerIds string to skip broker-level throttle + * config updates for during reassignment. + * @param logDirThrottle The replica log directory throttle to use, or a + * negative number to skip using a throttle. + * @param timeoutMs The maximum time in ms to wait for log directory + * replica assignment to begin. + * @param time The Time object to use. */ public static void executeAssignment(Admin adminClient, boolean additional, String reassignmentJson, long interBrokerThrottle, + String brokerListWithoutThrottleString, long logDirThrottle, long timeoutMs, Time time, @@ -795,14 +802,19 @@ public static void executeAssignment(Admin adminClient, if (interBrokerThrottle >= 0 || logDirThrottle >= 0) { System.out.println(YOU_MUST_RUN_VERIFY_PERIODICALLY_MESSAGE); + Set brokerWithoutThrottleSet = new HashSet<>(); + if (!brokerListWithoutThrottleString.isEmpty()) { + brokerWithoutThrottleSet = parseCommaSeparatedIntSet(brokerListWithoutThrottleString); + } + if (interBrokerThrottle >= 0) { Map> moveMap = calculateProposedMoveMap(currentReassignments, proposedParts, currentParts); - modifyReassignmentThrottle(adminClient, moveMap, interBrokerThrottle); + modifyReassignmentThrottle(adminClient, moveMap, interBrokerThrottle, brokerWithoutThrottleSet); } if (logDirThrottle >= 0) { Set movingBrokers = calculateMovingBrokers(proposedReplicas.keySet()); - modifyLogDirThrottle(adminClient, movingBrokers, logDirThrottle); + modifyLogDirThrottle(adminClient, movingBrokers, logDirThrottle, brokerWithoutThrottleSet); } } @@ -827,6 +839,15 @@ public static void executeAssignment(Admin adminClient, } } + private static Set parseCommaSeparatedIntSet(String s) { + if (s == null || s.trim().isEmpty()) return Collections.emptySet(); + + return Arrays.stream(s.trim().split(",")) + .filter(t -> !t.isEmpty()) + .map(Integer::parseInt) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + /** * Execute some partition log directory movements. * @@ -1078,15 +1099,19 @@ static Map> calculateProposedMoveMap(Map calculateLeaderThrottles(Map> moveMap) { + static Map calculateLeaderThrottles(Map> moveMap, + Set brokerWithoutThrottleSet) { Map results = new HashMap<>(); moveMap.forEach((topicName, partMoveMap) -> { Set components = new TreeSet<>(); partMoveMap.forEach((partId, move) -> - move.sources.forEach(source -> components.add(String.format("%d:%d", partId, source)))); + move.sources.stream().filter(source -> !brokerWithoutThrottleSet.contains(source)) + .forEach(source -> components.add(String.format("%d:%d", partId, source)))); results.put(topicName, String.join(",", components)); }); return results; @@ -1095,16 +1120,19 @@ static Map calculateLeaderThrottles(Map calculateFollowerThrottles(Map> moveMap) { + static Map calculateFollowerThrottles(Map> moveMap, + Set brokerWithoutThrottleSet) { Map results = new HashMap<>(); moveMap.forEach((topicName, partMoveMap) -> { Set components = new TreeSet<>(); partMoveMap.forEach((partId, move) -> move.destinations.forEach(destination -> { - if (!move.sources.contains(destination)) { + if (!move.sources.contains(destination) && !brokerWithoutThrottleSet.contains(destination)) { components.add(String.format("%d:%d", partId, destination)); } }) @@ -1118,15 +1146,19 @@ static Map calculateFollowerThrottles(Map calculateReassigningBrokers(Map> moveMap) { + static Set calculateReassigningBrokers(Map> moveMap, + Set brokerWithoutThrottleSet) { Set reassigningBrokers = new TreeSet<>(); moveMap.values().forEach(partMoveMap -> partMoveMap.values().forEach(partMove -> { reassigningBrokers.addAll(partMove.sources); reassigningBrokers.addAll(partMove.destinations); })); + reassigningBrokers.removeAll(brokerWithoutThrottleSet); return reassigningBrokers; } @@ -1171,13 +1203,14 @@ static void modifyTopicThrottles(Admin adminClient, private static void modifyReassignmentThrottle( Admin admin, Map> moveMap, - long interBrokerThrottle + long interBrokerThrottle, + Set brokerWithoutThrottleSet ) throws ExecutionException, InterruptedException { - Map leaderThrottles = calculateLeaderThrottles(moveMap); - Map followerThrottles = calculateFollowerThrottles(moveMap); + Map leaderThrottles = calculateLeaderThrottles(moveMap, brokerWithoutThrottleSet); + Map followerThrottles = calculateFollowerThrottles(moveMap, brokerWithoutThrottleSet); modifyTopicThrottles(admin, leaderThrottles, followerThrottles); - Set reassigningBrokers = calculateReassigningBrokers(moveMap); + Set reassigningBrokers = calculateReassigningBrokers(moveMap, brokerWithoutThrottleSet); modifyInterBrokerThrottle(admin, reassigningBrokers, interBrokerThrottle); } @@ -1215,7 +1248,8 @@ static void modifyInterBrokerThrottle(Admin adminClient, */ static void modifyLogDirThrottle(Admin admin, Set movingBrokers, - long logDirThrottle) throws ExecutionException, InterruptedException { + long logDirThrottle, + Set brokerWithoutThrottleSet) throws ExecutionException, InterruptedException { if (logDirThrottle >= 0) { Map> configs = new HashMap<>(); movingBrokers.forEach(brokerId -> { @@ -1495,6 +1529,7 @@ else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bo opts.bootstrapServerOpt, opts.commandConfigOpt, opts.interBrokerThrottleOpt, + opts.brokerListWithoutThrottleOpt, opts.replicaAlterLogDirsThrottleOpt, opts.timeoutOpt, opts.disallowReplicationFactorChangeOpt diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java index 3954128871222..74999f73d7317 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java @@ -35,6 +35,7 @@ public class ReassignPartitionsCommandOptions extends CommandDefaultOptions { final OptionSpec reassignmentJsonFileOpt; final OptionSpec topicsToMoveJsonFileOpt; final OptionSpec brokerListOpt; + final OptionSpec brokerListWithoutThrottleOpt; final OptionSpec bootstrapControllerOpt; final OptionSpec disableRackAware; final OptionSpec interBrokerThrottleOpt; @@ -86,6 +87,15 @@ public ReassignPartitionsCommandOptions(String[] args) { .describedAs("brokerlist") .ofType(String.class); + brokerListWithoutThrottleOpt = parser.accepts("broker-list-without-throttle", "Optional. Comma-separated broker ID list (e.g. 1,2) that " + + "should be excluded from broker-level throttle config updates during partition reassignment execution. " + + "When --execute and --throttle are used, it normally applies throttle configs on all brokers involved in the reassignment. " + + "If any of those brokers are known to be down or unreachable, adding them to --broker-list-without-throttle makes it " + + "skip the throttle-setting step for those brokers, avoiding retries/timeouts, while still throttling the remaining reachable brokers.") + .withRequiredArg() + .describedAs("broker list without throttle") + .ofType(String.class); + bootstrapControllerOpt = parser.accepts("bootstrap-controller", "The controller to use for reassignment. " + "By default, the tool will get the quorum controller. This option supports the actions --cancel and --list.") .withOptionalArg() diff --git a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java index 8aa6dd8bc46e9..677d60d7c3cfa 100644 --- a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java +++ b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java @@ -249,7 +249,7 @@ public void run(ExperimentDef config, Journal journal, boolean displayChartsOnSc ReassignPartitionsCommand.executeAssignment(adminClient, false, ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.of()), - config.throttle, -1L, 10000L, Time.SYSTEM, false); + config.throttle, "", -1L, 10000L, Time.SYSTEM, false); //Await completion waitForReassignmentToComplete(); diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java index a71e4779a3e4e..c6de686a98f90 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java @@ -498,7 +498,7 @@ public void testDisallowReplicationFactorChange() { "bar-0: The replication factor is changed from 3 to 1\n" + "foo-0: The replication factor is changed from 3 to 2\n" + "foo-1: The replication factor is changed from 3 to 4", - assertThrows(TerseException.class, () -> executeAssignment(admin, false, assignment, -1L, -1L, 10000L, Time.SYSTEM, true)).getMessage()); + assertThrows(TerseException.class, () -> executeAssignment(admin, false, assignment, -1L, "", -1L, 10000L, Time.SYSTEM, true)).getMessage()); } } @@ -757,7 +757,7 @@ private void runExecuteAssignment(boolean additional, Long replicaAlterLogDirsThrottle) throws RuntimeException { try (Admin admin = Admin.create(Map.of(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { executeAssignment(admin, additional, reassignmentJson, - interBrokerThrottle, replicaAlterLogDirsThrottle, 10000L, Time.SYSTEM, false); + interBrokerThrottle, "", replicaAlterLogDirsThrottle, 10000L, Time.SYSTEM, false); } catch (ExecutionException | InterruptedException | JsonProcessingException | TerseException e) { throw new RuntimeException(e); } diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java index d953ccf9ea74b..053291f3e4b94 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.tools.reassign; +import java.util.LinkedHashSet; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.admin.PartitionReassignment; @@ -566,16 +567,51 @@ public void testMoveMap() { expLeaderThrottle.put("foo", "0:1,0:2,0:3,1:4,1:5,1:6,2:1,2:2,3:1,3:2,4:1,4:2,5:1,5:2"); expLeaderThrottle.put("bar", "0:2,0:3,0:4"); - assertEquals(expLeaderThrottle, calculateLeaderThrottles(moveMap)); + assertEquals(expLeaderThrottle, calculateLeaderThrottles(moveMap, Collections.emptySet())); + + // Exclude a subset of brokers(1,2) + expLeaderThrottle.put("foo", "0:3,1:4,1:5,1:6"); + expLeaderThrottle.put("bar", "0:3,0:4"); + + Set brokerWithoutThrottleSet = new LinkedHashSet<>(Arrays.asList(1, 2)); + assertEquals(expLeaderThrottle, calculateLeaderThrottles(moveMap, brokerWithoutThrottleSet)); + + // Exclude a subset of brokers(2,3,4) + expLeaderThrottle.put("foo", "0:1,1:5,1:6,2:1,3:1,4:1,5:1"); + expLeaderThrottle.put("bar", ""); + + brokerWithoutThrottleSet = new LinkedHashSet<>(Arrays.asList(2, 3, 4)); + assertEquals(expLeaderThrottle, calculateLeaderThrottles(moveMap, brokerWithoutThrottleSet)); Map expFollowerThrottle = new HashMap<>(); expFollowerThrottle.put("foo", "0:5,1:7,1:8,2:3,2:4,3:5,3:6,4:3,5:3,5:4,5:5,5:6"); expFollowerThrottle.put("bar", "0:1"); - assertEquals(expFollowerThrottle, calculateFollowerThrottles(moveMap)); + assertEquals(expFollowerThrottle, calculateFollowerThrottles(moveMap, Collections.emptySet())); + + // Exclude a subset of brokers(3,5) + expFollowerThrottle.put("foo", "1:7,1:8,2:4,3:6,5:4,5:6"); + expFollowerThrottle.put("bar", "0:1"); + + brokerWithoutThrottleSet = new LinkedHashSet<>(Arrays.asList(3, 5)); + assertEquals(expFollowerThrottle, calculateFollowerThrottles(moveMap, brokerWithoutThrottleSet)); + + // Exclude a subset of brokers(1) + expFollowerThrottle.put("foo", "0:5,1:7,1:8,2:3,2:4,3:5,3:6,4:3,5:3,5:4,5:5,5:6"); + expFollowerThrottle.put("bar", ""); + + brokerWithoutThrottleSet = new LinkedHashSet<>(Collections.singletonList(1)); + assertEquals(expFollowerThrottle, calculateFollowerThrottles(moveMap, brokerWithoutThrottleSet)); + + assertEquals(Set.of(1, 2, 3, 4, 5, 6, 7, 8), calculateReassigningBrokers(moveMap, Collections.emptySet())); + // Exclude a subset of brokers + assertEquals(Set.of(2, 3, 4, 5, 6, 7), calculateReassigningBrokers(moveMap, Set.of(1, 8))); + // Exclude all brokers (result should be empty) + assertEquals(Collections.emptySet(), calculateReassigningBrokers(moveMap, Set.of(1, 2, 3, 4, 5, 6, 7, 8))); + // Exclude a broker that doesn’t exist in the reassignment (no change) + assertEquals(Set.of(1, 2, 3, 4, 5, 6, 7, 8), calculateReassigningBrokers(moveMap, Set.of(999))); - assertEquals(Set.of(1, 2, 3, 4, 5, 6, 7, 8), calculateReassigningBrokers(moveMap)); assertEquals(Set.of(0, 2), calculateMovingBrokers(Set.of( new TopicPartitionReplica("quux", 0, 0), new TopicPartitionReplica("quux", 1, 2)))); @@ -644,7 +680,7 @@ public void testExecuteWithInvalidPartitionsFails() { "{\"version\":1,\"partitions\":" + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]}," + "{\"topic\":\"quux\",\"partition\":0,\"replicas\":[2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + - "]}", -1L, -1L, 10000L, Time.SYSTEM, false), "Expected reassignment with non-existent topic to fail").getCause().getMessage()); + "]}", -1L, "", -1L, 10000L, Time.SYSTEM, false), "Expected reassignment with non-existent topic to fail").getCause().getMessage()); } } @@ -657,7 +693,7 @@ public void testExecuteWithInvalidBrokerIdFails() { "{\"version\":1,\"partitions\":" + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]}," + "{\"topic\":\"foo\",\"partition\":1,\"replicas\":[2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + - "]}", -1L, -1L, 10000L, Time.SYSTEM, false), "Expected reassignment with non-existent broker id to fail").getMessage()); + "]}", -1L, "", -1L, 10000L, Time.SYSTEM, false), "Expected reassignment with non-existent broker id to fail").getMessage()); } } @@ -800,7 +836,7 @@ public void testPropagateInvalidJsonError() { try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) { addTopics(adminClient); assertStartsWith("Unexpected character", - assertThrows(AdminOperationException.class, () -> executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L, Time.SYSTEM, false)).getMessage()); + assertThrows(AdminOperationException.class, () -> executeAssignment(adminClient, false, "{invalid_json", -1L, "", -1L, 10000L, Time.SYSTEM, false)).getMessage()); } }