Skip to content

KAFKA-1792: change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments #18903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: trunk
Choose a base branch
from
Open
9 changes: 8 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -2473,9 +2473,10 @@ project(':tools') {
testImplementation libs.apachedsProtocolLdap
testImplementation libs.apachedsLdifPartition
testImplementation testLog4j2Libs
testImplementation libs.jqwik
testImplementation libs.hamcrest

testRuntimeOnly runtimeTestLibs
testRuntimeOnly libs.hamcrest
}

javadoc {
@@ -2491,6 +2492,12 @@ project(':tools') {
duplicatesStrategy 'exclude'
}

test {
useJUnitPlatform {
includeEngines 'jqwik', 'junit-jupiter'
}
}

jar {
dependsOn 'copyDependantLibs'
}
Original file line number Diff line number Diff line change
@@ -16,6 +16,10 @@
*/
package org.apache.kafka.tools.reassign;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import joptsimple.OptionSpec;
import org.apache.kafka.admin.AdminUtils;
import org.apache.kafka.admin.BrokerMetadata;
import org.apache.kafka.clients.admin.Admin;
@@ -47,10 +51,6 @@
import org.apache.kafka.tools.TerseException;
import org.apache.kafka.tools.ToolsUtils;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;

import java.io.IOException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
@@ -72,8 +72,6 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import joptsimple.OptionSpec;

@SuppressWarnings("ClassDataAbstractionCoupling")
public class ReassignPartitionsCommand {
private static final String ANY_LOG_DIR = "any";
@@ -164,7 +162,8 @@ private static void handleAction(Admin adminClient, ReassignPartitionsCommandOpt
generateAssignment(adminClient,
Utils.readFileAsString(opts.options.valueOf(opts.topicsToMoveJsonFileOpt)),
opts.options.valueOf(opts.brokerListOpt),
!opts.options.has(opts.disableRackAware));
!opts.options.has(opts.disableRackAware),
opts.options.has(opts.sticky));
} else if (opts.options.has(opts.executeOpt)) {
executeAssignment(adminClient,
opts.options.has(opts.additionalOpt),
@@ -549,7 +548,8 @@ private static void clearTopicLevelThrottles(Admin adminClient, Set<String> topi
public static Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List<Integer>>> generateAssignment(Admin adminClient,
String reassignmentJson,
String brokerListString,
Boolean enableRackAwareness
Boolean enableRackAwareness,
Boolean sticky
) throws ExecutionException, InterruptedException, JsonProcessingException {
Entry<List<Integer>, List<String>> t0 = parseGenerateAssignmentArgs(reassignmentJson, brokerListString);

@@ -558,7 +558,13 @@ public static Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List

Map<TopicPartition, List<Integer>> currentAssignments = getReplicaAssignmentForTopics(adminClient, topicsToReassign);
List<BrokerMetadata> brokerMetadatas = getBrokerMetadata(adminClient, brokersToReassign, enableRackAwareness);
Map<TopicPartition, List<Integer>> proposedAssignments = calculateAssignment(currentAssignments, brokerMetadatas);
Map<TopicPartition, List<Integer>> proposedAssignments;
if (sticky) {
final StickyPartitionReassignor assignor = new StickyPartitionReassignor(currentAssignments, brokerMetadatas);
proposedAssignments = assignor.reassign();
} else {
proposedAssignments = calculateAssignment(currentAssignments, brokerMetadatas);
}
System.out.printf("Current partition replica assignment%n%s%n%n",
formatAsReassignmentJson(currentAssignments, Collections.emptyMap()));
System.out.printf("Proposed partition reassignment configuration%n%s%n",
@@ -574,7 +580,7 @@ public static Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List
*
* @return A map from partitions to the proposed assignments for each.
*/
private static Map<TopicPartition, List<Integer>> calculateAssignment(Map<TopicPartition, List<Integer>> currentAssignment,
public static Map<TopicPartition, List<Integer>> calculateAssignment(Map<TopicPartition, List<Integer>> currentAssignment,
List<BrokerMetadata> brokerMetadatas) {
Map<String, List<Entry<TopicPartition, List<Integer>>>> groupedByTopic = new HashMap<>();
for (Entry<TopicPartition, List<Integer>> e : currentAssignment.entrySet())
@@ -1409,7 +1415,7 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) {
}

OptionSpec<?> action = allActions.get(0);

if (opts.options.has(opts.bootstrapServerOpt) && opts.options.has(opts.bootstrapControllerOpt))
CommandLineUtils.printUsageAndExit(opts.parser, "Please don't specify both --bootstrap-server and --bootstrap-controller");
else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bootstrapControllerOpt))
@@ -1448,7 +1454,8 @@ else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bo
opts.bootstrapServerOpt,
opts.brokerListOpt,
opts.commandConfigOpt,
opts.disableRackAware
opts.disableRackAware,
opts.sticky
));
permittedArgs.put(opts.executeOpt, Arrays.asList(
opts.additionalOpt,
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ public class ReassignPartitionsCommandOptions extends CommandDefaultOptions {
final OptionSpec<String> brokerListOpt;
final OptionSpec<String> bootstrapControllerOpt;
final OptionSpec<?> disableRackAware;
final OptionSpec<?> sticky;
final OptionSpec<Long> interBrokerThrottleOpt;
final OptionSpec<Long> replicaAlterLogDirsThrottleOpt;
final OptionSpec<Long> timeoutOpt;
@@ -92,6 +93,7 @@ public ReassignPartitionsCommandOptions(String[] args) {
.ofType(String.class);

disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment");
sticky = parser.accepts("sticky", "Use a sticky reassignment strategy to reduce the number of replica moves needed to balance out brokers");
interBrokerThrottleOpt = parser.accepts("throttle", "The movement of partitions between brokers will be throttled to this value (bytes/sec). " +
"This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment " +
"along with the --additional flag. The throttle rate should be at least 1 KB/s.")

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -241,7 +241,8 @@ public void run(ExperimentDef config, Journal journal, boolean displayChartsOnSc
cluster.brokers().values().stream()
.map(server -> String.valueOf(server.replicaManager().localBrokerId()))
.collect(Collectors.joining(",")),
true
true,
false
).getKey();

System.out.println("Starting Reassignment");
Original file line number Diff line number Diff line change
@@ -175,7 +175,7 @@ public void testGenerateAssignmentWithBootstrapServer() throws Exception {
String assignment = "{\"version\":1,\"partitions\":" +
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}" +
"]}";
generateAssignment(admin, assignment, "1,2,3", false);
generateAssignment(admin, assignment, "1,2,3", false, false);
Map<TopicPartition, PartitionReassignmentState> finalAssignment = singletonMap(foo0,
new PartitionReassignmentState(asList(0, 1, 2), asList(3, 1, 2), true));
waitForVerifyAssignment(admin, assignment, false,
Original file line number Diff line number Diff line change
@@ -361,7 +361,7 @@ public void testGenerateAssignmentFailsWithoutEnoughReplicas() {
addTopics(adminClient);
assertStartsWith("Replication factor: 3 larger than available brokers: 2",
assertThrows(InvalidReplicationFactorException.class,
() -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"bar\"}]}", "0,1", false),
() -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"bar\"}]}", "0,1", false, false),
"Expected generateAssignment to fail").getMessage());
}
}
@@ -372,7 +372,7 @@ public void testGenerateAssignmentWithInvalidPartitionsFails() {
addTopics(adminClient);
assertStartsWith("Topic quux not found",
assertThrows(ExecutionException.class,
() -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"quux\"}]}", "0,1", false),
() -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"quux\"}]}", "0,1", false, false),
"Expected generateAssignment to fail").getCause().getMessage());
}
}
@@ -392,11 +392,11 @@ public void testGenerateAssignmentWithInconsistentRacks() throws Exception {
addTopics(adminClient);
assertStartsWith("Not all brokers have rack information.",
assertThrows(AdminOperationException.class,
() -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", true),
() -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", true, false),
"Expected generateAssignment to fail").getMessage());
// It should succeed when --disable-rack-aware is used.
Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List<Integer>>>
proposedCurrent = generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", false);
proposedCurrent = generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", false, false);

Map<TopicPartition, List<Integer>> expCurrent = new HashMap<>();

@@ -416,7 +416,7 @@ public void testGenerateAssignmentWithFewerBrokers() throws Exception {
Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List<Integer>>>
proposedCurrent = generateAssignment(adminClient,
"{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"bar\"}]}",
goalBrokers.stream().map(Object::toString).collect(Collectors.joining(",")), false);
goalBrokers.stream().map(Object::toString).collect(Collectors.joining(",")), false, false);

Map<TopicPartition, List<Integer>> expCurrent = new HashMap<>();

@@ -434,6 +434,33 @@ public void testGenerateAssignmentWithFewerBrokers() throws Exception {
}
}

@Test
public void testGenerateStickyAssignmentWithFewerBrokers() throws Exception {
try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) {
addTopics(adminClient);
List<Integer> goalBrokers = asList(0, 1, 3);

Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List<Integer>>>
proposedCurrent = generateAssignment(adminClient,
"{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"bar\"}]}",
goalBrokers.stream().map(Object::toString).collect(Collectors.joining(",")), false, true);

Map<TopicPartition, List<Integer>> expCurrent = new HashMap<>();

expCurrent.put(new TopicPartition("foo", 0), asList(0, 1, 2));
expCurrent.put(new TopicPartition("foo", 1), asList(1, 2, 3));
expCurrent.put(new TopicPartition("bar", 0), asList(2, 3, 0));

assertEquals(expCurrent, proposedCurrent.getValue());

// The proposed assignment should only span the provided brokers
proposedCurrent.getKey().values().forEach(replicas ->
assertTrue(goalBrokers.containsAll(replicas),
"Proposed assignment " + proposedCurrent.getKey() + " puts replicas on brokers other than " + goalBrokers)
);
}
}

@Test
public void testCurrentPartitionReplicaAssignmentToString() throws Exception {
Map<TopicPartition, List<Integer>> proposedParts = new HashMap<>();

Large diffs are not rendered by default.