diff --git a/build.gradle b/build.gradle index ad68058c1837f..90fccbaa033ed 100644 --- a/build.gradle +++ b/build.gradle @@ -2473,9 +2473,10 @@ project(':tools') { testImplementation libs.apachedsProtocolLdap testImplementation libs.apachedsLdifPartition testImplementation testLog4j2Libs + testImplementation libs.jqwik + testImplementation libs.hamcrest testRuntimeOnly runtimeTestLibs - testRuntimeOnly libs.hamcrest } javadoc { @@ -2491,6 +2492,12 @@ project(':tools') { duplicatesStrategy 'exclude' } + test { + useJUnitPlatform { + includeEngines 'jqwik', 'junit-jupiter' + } + } + jar { dependsOn 'copyDependantLibs' } diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 768d022ba6616..f66075657291f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.tools.reassign; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import joptsimple.OptionSpec; import org.apache.kafka.admin.AdminUtils; import org.apache.kafka.admin.BrokerMetadata; import org.apache.kafka.clients.admin.Admin; @@ -47,10 +51,6 @@ import org.apache.kafka.tools.TerseException; import org.apache.kafka.tools.ToolsUtils; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonMappingException; - import java.io.IOException; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; @@ -72,8 +72,6 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -import joptsimple.OptionSpec; - @SuppressWarnings("ClassDataAbstractionCoupling") public class ReassignPartitionsCommand { private static final String ANY_LOG_DIR = "any"; @@ -164,7 +162,8 @@ private static void handleAction(Admin adminClient, ReassignPartitionsCommandOpt generateAssignment(adminClient, Utils.readFileAsString(opts.options.valueOf(opts.topicsToMoveJsonFileOpt)), opts.options.valueOf(opts.brokerListOpt), - !opts.options.has(opts.disableRackAware)); + !opts.options.has(opts.disableRackAware), + opts.options.has(opts.sticky)); } else if (opts.options.has(opts.executeOpt)) { executeAssignment(adminClient, opts.options.has(opts.additionalOpt), @@ -549,7 +548,8 @@ private static void clearTopicLevelThrottles(Admin adminClient, Set topi public static Entry>, Map>> generateAssignment(Admin adminClient, String reassignmentJson, String brokerListString, - Boolean enableRackAwareness + Boolean enableRackAwareness, + Boolean sticky ) throws ExecutionException, InterruptedException, JsonProcessingException { Entry, List> t0 = parseGenerateAssignmentArgs(reassignmentJson, brokerListString); @@ -558,7 +558,13 @@ public static Entry>, Map> currentAssignments = getReplicaAssignmentForTopics(adminClient, topicsToReassign); List brokerMetadatas = getBrokerMetadata(adminClient, brokersToReassign, enableRackAwareness); - Map> proposedAssignments = calculateAssignment(currentAssignments, brokerMetadatas); + Map> proposedAssignments; + if (sticky) { + final StickyPartitionReassignor assignor = new StickyPartitionReassignor(currentAssignments, brokerMetadatas); + proposedAssignments = assignor.reassign(); + } else { + proposedAssignments = calculateAssignment(currentAssignments, brokerMetadatas); + } System.out.printf("Current partition replica assignment%n%s%n%n", formatAsReassignmentJson(currentAssignments, Collections.emptyMap())); System.out.printf("Proposed partition reassignment configuration%n%s%n", @@ -574,7 +580,7 @@ public static Entry>, Map> calculateAssignment(Map> currentAssignment, + public static Map> calculateAssignment(Map> currentAssignment, List brokerMetadatas) { Map>>> groupedByTopic = new HashMap<>(); for (Entry> e : currentAssignment.entrySet()) @@ -1409,7 +1415,7 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) { } OptionSpec action = allActions.get(0); - + if (opts.options.has(opts.bootstrapServerOpt) && opts.options.has(opts.bootstrapControllerOpt)) CommandLineUtils.printUsageAndExit(opts.parser, "Please don't specify both --bootstrap-server and --bootstrap-controller"); else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bootstrapControllerOpt)) @@ -1448,7 +1454,8 @@ else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bo opts.bootstrapServerOpt, opts.brokerListOpt, opts.commandConfigOpt, - opts.disableRackAware + opts.disableRackAware, + opts.sticky )); permittedArgs.put(opts.executeOpt, Arrays.asList( opts.additionalOpt, diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java index 2d31c5a902ab4..70d59b6179516 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java @@ -37,6 +37,7 @@ public class ReassignPartitionsCommandOptions extends CommandDefaultOptions { final OptionSpec brokerListOpt; final OptionSpec bootstrapControllerOpt; final OptionSpec disableRackAware; + final OptionSpec sticky; final OptionSpec interBrokerThrottleOpt; final OptionSpec replicaAlterLogDirsThrottleOpt; final OptionSpec timeoutOpt; @@ -92,6 +93,7 @@ public ReassignPartitionsCommandOptions(String[] args) { .ofType(String.class); disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment"); + sticky = parser.accepts("sticky", "Use a sticky reassignment strategy to reduce the number of replica moves needed to balance out brokers"); interBrokerThrottleOpt = parser.accepts("throttle", "The movement of partitions between brokers will be throttled to this value (bytes/sec). " + "This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment " + "along with the --additional flag. The throttle rate should be at least 1 KB/s.") diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/StickyPartitionReassignor.java b/tools/src/main/java/org/apache/kafka/tools/reassign/StickyPartitionReassignor.java new file mode 100644 index 0000000000000..41890d4f937d3 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/StickyPartitionReassignor.java @@ -0,0 +1,1012 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * (c) Copyright 2025, SAP SE and apache/kafka contributors + */ +package org.apache.kafka.tools.reassign; + +import org.apache.kafka.admin.BrokerMetadata; +import org.apache.kafka.common.TopicPartition; + +import java.io.Serializable; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; + + +public class StickyPartitionReassignor { + + private final Map brokerToAssignments; + private final Map brokerToLeaders; + + private final TreeSet sortedBrokersByAssignments; + + private final Map replicas; + private final Map unassignedReplicas; + private final Set partitions; + + private final Map>> possibleMoves; + private final Map> possibleInitialAssignments; + + private final Set> racks; + private final boolean rackAware; + + + public StickyPartitionReassignor(Map> currentAssignments, List brokerMetadatas) { + this.possibleMoves = new HashMap<>(); + this.possibleInitialAssignments = new HashMap<>(); + this.brokerToAssignments = new HashMap<>(); + this.brokerToLeaders = new HashMap<>(); + this.sortedBrokersByAssignments = new TreeSet<>(new BrokerReplicaCountComparator(this.brokerToAssignments)); + + this.racks = new HashSet<>(); + final Map brokers = new HashMap<>(); + + int rackAwareBrokers = 0; + int rackUnawareBrokers = 0; + + for (final BrokerMetadata brokerMetadata : brokerMetadatas) { + if (brokerMetadata.rack.isPresent()) { + rackAwareBrokers++; + } else { + rackUnawareBrokers++; + } + this.racks.add(brokerMetadata.rack); + brokers.put(brokerMetadata.id, brokerMetadata); + } + + if (rackAwareBrokers > 0 && rackUnawareBrokers > 0) + throw new RuntimeException("Not all brokers have rack information for replica rack aware assignment."); + this.rackAware = rackAwareBrokers > 0; + + this.replicas = new HashMap<>(); + this.unassignedReplicas = new HashMap<>(); + this.partitions = new HashSet<>(); + + this.initReplicas(currentAssignments, brokers); + + this.sortedBrokersByAssignments.addAll(brokerMetadatas); + + for (final Map.Entry entry : this.unassignedReplicas.entrySet()) { + final Replica replica = entry.getValue(); + this.computeAndUpdatePossibleMovesForUnassignedReplica(replica); + } + + for (final Map.Entry entry : this.replicas.entrySet()) { + final Replica replica = entry.getValue(); + + if (!this.unassignedReplicas.containsKey(entry.getKey())) this.computeAndUpdatePossibleMoves(replica); + } + } + + private static MoveFrom getBestMoveFrom(TreeSet toBrokerMoves, BrokerMetadata fromBroker, BrokerMetadata toBroker) { + final MoveFrom moveFrom = toBrokerMoves.first(); + if (moveFrom == null) + throw new RuntimeException(String.format("No available moves from %s to %s", fromBroker, toBroker)); + if (!moveFrom.replica.currentBroker.equals(fromBroker)) + throw new RuntimeException(String.format("Chosen from-broker %s do not match move %s", fromBroker, moveFrom)); + if (!moveFrom.to.equals(toBroker)) + throw new RuntimeException(String.format("Chosen to-broker %s do not match move %s", toBroker, moveFrom)); + return moveFrom; + } + + private static MoveFrom getBestInitialAssignmentFrom(BrokerMetadata toBroker, TreeSet toBrokerMoves) { + final MoveFrom initialAssignment = toBrokerMoves.first(); + if (initialAssignment == null) + throw new RuntimeException(String.format("No available moves for unassigned replicas from to %s", toBroker)); + if (!initialAssignment.to.equals(toBroker)) + throw new RuntimeException(String.format("Chosen to-broker %s do not match move %s", toBroker, initialAssignment)); + + return initialAssignment; + } + + private void initReplicas(Map> currentAssignments, Map brokers) { + for (final Map.Entry> entry : currentAssignments.entrySet()) { + final List replicaAssignments = entry.getValue(); + final TopicPartition topicPartition = entry.getKey(); + + final Partition partition = new Partition(topicPartition); + this.partitions.add(partition); + + for (int i = 0; i < replicaAssignments.size(); i++) { + final Integer currentNodeId = replicaAssignments.get(i); + if (currentNodeId == null) + throw new RuntimeException(String.format("Replica assignment for %s %s was null", topicPartition, i)); + final BrokerMetadata currentBroker = brokers.get(currentNodeId); + + final boolean isLeader = i == 0; + + final ReplicaId replicaId = new ReplicaId(partition, i); + final Replica replica = new Replica(replicaId, currentBroker, isLeader); + + if (this.replicas.put(replicaId, replica) != null) + throw new RuntimeException(String.format("Replica %s was already present in allReplicas", replicaId)); + + if (!partition.replicas.add(replica)) + throw new RuntimeException(String.format("Replica %s was already added to partition %s", replica.id, partition)); + + if (currentBroker != null) { + this.brokerToAssignments.compute(currentNodeId, (key, value) -> value == null ? 1 : value + 1); + if (isLeader) + this.brokerToLeaders.compute(currentNodeId, (key, value) -> value == null ? 1 : value + 1); + + partition.initPartitionCounts(replica); + } else { + if (this.unassignedReplicas.put(replicaId, replica) != null) + throw new RuntimeException(String.format("Replica %s was already present in unassignedReplicas", replicaId)); + } + + } + } + } + + private long computeCurrentRackScore(Replica replica) { + if (!this.rackAware) return 0L; + + // The count per broker can only grow as large as replicationFactor + // Using long arithmetic by squaring each count per broker should + // be safe + // Mathematically using just the square of the count is as good a + // metric for ranking the distribution scores as the squared + // residuals + long rackScore = 0; + for (final Optional rack : this.racks) { + final int replicasOnRack = replica.id.partition.replicaCountPerRack.getOrDefault(rack, 0); + rackScore += (long) Math.pow(replicasOnRack, 2); + } + return rackScore; + } + + private long computeNewRackScore(Replica replica, BrokerMetadata to) { + if (!this.rackAware) return 0L; + + final Optional currentRack = replica.currentBroker.rack; + final Optional newRack = to.rack; + + // The count per broker can only grow as large as replicationFactor + // Using long arithmetic by squaring each count per broker should + // be safe + // Mathematically using just the square of the count is as good a + // metric for ranking the distribution scores as the squared + // residuals + long rackScore = 0; + for (final Optional rack : this.racks) { + int replicasOnRack = replica.id.partition.replicaCountPerRack.getOrDefault(rack, 0); + // Assume that the move was executed and remove a replica from the from-rack count + // and add a replica to the to-rack count + if (rack.equals(currentRack)) replicasOnRack--; + if (rack.equals(newRack)) replicasOnRack++; + + rackScore += (long) Math.pow(replicasOnRack, 2); + } + return rackScore; + } + + private long computeNewRackScoreForUnassignedReplica(Replica replica, BrokerMetadata to) { + if (!this.rackAware) return 0L; + + final Optional newRack = to.rack; + + // The count per broker can only grow as large as replicationFactor + // Using long arithmetic by squaring each count per broker should + // be safe + // Mathematically using just the square of the count is as good a + // metric for ranking the distribution scores as the squared + // residuals + long rackScore = 0; + for (final Optional rack : this.racks) { + int replicasOnRack = replica.id.partition.replicaCountPerRack.getOrDefault(rack, 0); + // Assume that the move was executed and add a replica to the to-rack count + if (rack.equals(newRack)) replicasOnRack++; + + rackScore += (long) Math.pow(replicasOnRack, 2); + } + return rackScore; + } + + private Set getInitialAssignmentsForReplica(Replica replica) { + final Set initialAssignments = new TreeSet<>(new MovableFromScoreComparator()); + + for (final BrokerMetadata broker : this.sortedBrokersByAssignments) { + // Can not move to a replica to a broker that already has a replica for the partition + if (replica.id.partition.replicaCountPerBroker.get(broker) != null) continue; + + final long newRackScore = this.computeNewRackScoreForUnassignedReplica(replica, broker); + + initialAssignments.add(new MoveFrom(replica, broker, newRackScore)); + } + + return initialAssignments; + } + + private void updateInitialAssignments(Replica changedReplica, Set newReplicaPossibleMoves) { + // newReplicaPossibleMoves might contain the semantically same moves (same replica is moved to same broker) + // but with changed improvement scores. + // Since the global initialAssignments structures are ordered by improvement scores first, we might not be able + // to find the old moves using the new moves + + // Construct an oldMoves map containing all previous possibleMoves of changedReplica using only the moves + // identity (replica id and to-broker identity) + final Map oldMoves = new HashMap<>(); + if (changedReplica.possibleMoves != null) { + for (final MoveFrom move : changedReplica.possibleMoves) { + oldMoves.put(move, move); + } + } + // Construct a newMoves map containing all new possibleMoves of changedReplica using only the moves + // identity (replica id and to-broker identity) + final Map newMoves = new HashMap<>(); + if (newReplicaPossibleMoves != null) { + for (final MoveFrom move : newReplicaPossibleMoves) { + newMoves.put(move, move); + } + } + + changedReplica.possibleMoves = newReplicaPossibleMoves; + + // Contains all old moves that should be removed from the global initialAssignments + final List toRemove = new ArrayList<>(); + // Contains all new moves that should be added to the global initialAssignments + final List toAdd = new ArrayList<>(); + + for (final Map.Entry entry : newMoves.entrySet()) { + final MoveFrom newMove = entry.getKey(); + final MoveFrom oldMove = oldMoves.get(newMove); // Get old move by identity + + // new move does not have a corresponding old move + if (oldMove == null) { + toAdd.add(entry.getValue()); + continue; + } + + oldMoves.remove(oldMove); + + // If the old move and new move are the same by identity, check whether at least one of the scores changed + // If so, make sure that the old move is first removed from the global possibleMoves and then the new move + // is added + if (oldMove.replica.moved != newMove.replica.moved || !Objects.equals(oldMove.rackImprovementScore, newMove.rackImprovementScore)) { + toRemove.add(oldMove); + toAdd.add(entry.getValue()); + } + } + + for (final Map.Entry entry : oldMoves.entrySet()) { + toRemove.add(entry.getValue()); + } + + this.adjustGlobalInitialAssignments(toRemove, toAdd); + } + + private void adjustGlobalInitialAssignments(List toRemove, List toAdd) { + // First remove all old possible moves from the global initial assignment moves + // Removing old moves first is required due to the invariants defined in + // updateInitialAssignments + for (final MoveFrom move : toRemove) { + final Set previousMoves = this.possibleInitialAssignments.get(move.to); + if (previousMoves == null) + throw new RuntimeException(String.format("New broker %s has no has no possible initial assignments", move.to)); + + if (!previousMoves.remove(move)) + throw new RuntimeException(String.format("Initil assignment for replica %s was not contained in initial assignment set for %s", move.replica, move.to)); + + if (previousMoves.isEmpty()) this.possibleInitialAssignments.remove(move.to); + } + + for (final MoveFrom move : toAdd) { + final Set previousMoves = this.possibleInitialAssignments.computeIfAbsent(move.to, k -> new TreeSet<>(new MovableFromScoreComparator())); + previousMoves.add(move); + } + } + + private void computeAndUpdatePossibleMovesForUnassignedReplica(Replica replica) { + final Set newPossibleMoves = this.getInitialAssignmentsForReplica(replica); + + this.updateInitialAssignments(replica, newPossibleMoves); + } + + private void assign(Move move) { + // Remove broker nodes that take part in the move to update their assignment counts + this.sortedBrokersByAssignments.remove(move.to); + + // Update assignment counts to state after move + this.brokerToAssignments.compute(move.to.id, (key, value) -> value == null ? 1 : value + 1); + + if (move.replica.isLeader) { + this.brokerToLeaders.compute(move.to.id, (key, value) -> value == null ? 1 : value + 1); + } + + // Re-add broker nodes + this.sortedBrokersByAssignments.add(move.to); + + // Remove all initial assignment moves from the replica and global state + this.updateInitialAssignments(move.replica, null); + + // Update replica to state after move + move.replica.moved = true; + move.replica.currentBroker = move.to; + + // Update internal partition maps to state after move + move.replica.id.partition.initPartitionCounts(move.replica); + + // Compute possible moves for newly assigned replica + this.computeAndUpdatePossibleMoves(move.replica); + + this.unassignedReplicas.remove(move.replica.id); + + // Re-evaluate possible moves for each sibling replica + // This is required since the partitions internal state has been changed, + // so the improvement scores for each current possible move for each + // sibling replica might have changed to + for (final Replica replica : move.replica.id.partition.replicas) { + if (replica.id.equals(move.replica.id)) continue; + if (replica.currentBroker == null) { + this.computeAndUpdatePossibleMovesForUnassignedReplica(replica); + } else { + this.computeAndUpdatePossibleMoves(replica); + } + } + } + + private Move getBestInitialAssignment() { + final Comparator comparator = new MovableScoreComparator(); + Move bestMove = null; + + // Iterate through all brokers in ascending order by replica assignment count + // (broker with the least assignments first, broker with the most assignments last) + for (BrokerMetadata toBroker : this.sortedBrokersByAssignments) { + final BigInteger globalBrokerScore = this.computeNewGlobalBrokerScoreForUnassignedReplica(toBroker); + BigInteger leaderScore = BigInteger.ZERO; + + final TreeSet toBrokerMoves = this.possibleInitialAssignments.get(toBroker); + if (toBrokerMoves == null) continue; + + // Only get the best move, since all other moves would have a lower priority in the global best + // moves set anyway + final MoveFrom bestInitialAssignmentForBroker = getBestInitialAssignmentFrom(toBroker, toBrokerMoves); + + if (bestInitialAssignmentForBroker.replica.isLeader) { + leaderScore = this.computeNewLeaderScoreForUnassignedReplica(toBroker); + } + + final Move bestInitialAssignment = new Move(bestInitialAssignmentForBroker.replica, null, toBroker, globalBrokerScore, bestInitialAssignmentForBroker.rackImprovementScore, leaderScore); + + // Use the best move that moves a replica from fromBroker to toBroker if we do not have a + // best move yet or if the move is better than the current best move + if (bestMove == null || comparator.compare(bestInitialAssignment, bestMove) < 0) + bestMove = bestInitialAssignment; + } + + return bestMove; + } + + private Set getPossibleMovesForReplica(Replica replica) { + final Set possibleMoves = new TreeSet<>(new MovableFromScoreComparator()); + + final long currentRackScore = this.computeCurrentRackScore(replica); + + for (final BrokerMetadata broker : this.sortedBrokersByAssignments) { + if (replica.id.partition.replicaCountPerBroker.get(broker) != null) continue; + + final long newRackScore = this.computeNewRackScore(replica, broker); + final long rackImprovementScore = newRackScore - currentRackScore; + + // Do not consider moves that make the rack distribution for the partition + // worse than before + if (rackImprovementScore > 0) continue; + + possibleMoves.add(new MoveFrom(replica, broker, rackImprovementScore)); + } + + return possibleMoves; + } + + private void adjustGlobalPossibleMoves(BrokerMetadata fromBroker, List toRemove, List toAdd) { + Map> fromBrokerMoves = this.possibleMoves.get(fromBroker); + + // First remove all old possible moves from the global possible moves + // Removing old moves first is required due to the invariants defined in + // updatePossibleMoves + for (final MoveFrom move : toRemove) { + if (fromBrokerMoves == null) + throw new RuntimeException(String.format("Current broker %s has no available moves to another broker", fromBroker)); + final Set previousMoves = fromBrokerMoves.get(move.to); + if (previousMoves == null) + throw new RuntimeException(String.format("New broker %s has no available moves from current broker %s", move.to, fromBroker)); + + if (!previousMoves.remove(move)) + throw new RuntimeException(String.format("Move for replica %s was not contained in move set from %s to %s", move.replica, fromBroker, move.to)); + + if (previousMoves.isEmpty()) fromBrokerMoves.remove(move.to); + } + + for (final MoveFrom move : toAdd) { + if (fromBrokerMoves == null) { + fromBrokerMoves = new HashMap<>(); + this.possibleMoves.put(fromBroker, fromBrokerMoves); + } + + final Set previousMoves = fromBrokerMoves.computeIfAbsent(move.to, k -> new TreeSet<>(new MovableFromScoreComparator())); + previousMoves.add(move); + } + + if (fromBrokerMoves != null && fromBrokerMoves.isEmpty()) this.possibleMoves.remove(fromBroker); + } + + private void updatePossibleMoves(Replica changedReplica, Set newReplicaPossibleMoves) { + // newReplicaPossibleMoves might contain the semantically same moves (same replica is moved to same broker) + // but with changed improvement scores. + // Since the global possibleMoves structures are ordered by improvement scores first, we might not be able + // to find the old moves using the new moves + + // Construct an oldMoves map containing all previous possibleMoves of changedReplica using only the moves + // identity (replica id and to-broker identity) + final Map oldMoves = new HashMap<>(); + if (changedReplica.possibleMoves != null) { + for (final MoveFrom move : changedReplica.possibleMoves) { + oldMoves.put(move, move); + } + } + // Construct a newMoves map containing all new possibleMoves of changedReplica using only the moves + // identity (replica id and to-broker identity) + final Map newMoves = new HashMap<>(); + if (newReplicaPossibleMoves != null) { + for (final MoveFrom move : newReplicaPossibleMoves) { + newMoves.put(move, move); + } + } + + changedReplica.possibleMoves = newReplicaPossibleMoves; + + // Contains all old moves that should be removed from the global possibleMoves + final List toRemove = new ArrayList<>(); + // Contains all new moves that should be added to the global possibleMoves + final List toAdd = new ArrayList<>(); + + for (final Map.Entry entry : newMoves.entrySet()) { + final MoveFrom newMove = entry.getKey(); + final MoveFrom oldMove = oldMoves.get(newMove); // Get old move by identity + + // new move does not have a corresponding old move + if (oldMove == null) { + toAdd.add(entry.getValue()); + continue; + } + + oldMoves.remove(oldMove); + + // If the old move and new move are the same by identity, check whether at least one of the scores changed + // If so, make sure that the old move is first removed from the global possibleMoves and then the new move + // is added + if (oldMove.replica.moved != newMove.replica.moved || !Objects.equals(oldMove.rackImprovementScore, newMove.rackImprovementScore)) { + toRemove.add(oldMove); + toAdd.add(entry.getValue()); + } + } + + for (final Map.Entry entry : oldMoves.entrySet()) { + toRemove.add(entry.getValue()); + } + + this.adjustGlobalPossibleMoves(changedReplica.currentBroker, toRemove, toAdd); + } + + private void computeAndUpdatePossibleMoves(Replica replica) { + final Set newPossibleMoves = this.getPossibleMovesForReplica(replica); + + this.updatePossibleMoves(replica, newPossibleMoves); + } + + private void move(Move move) { + // Remove broker nodes that take part in the move to update their assignment counts + this.sortedBrokersByAssignments.remove(move.from); + this.sortedBrokersByAssignments.remove(move.to); + + // Update assignment counts to state after move + this.brokerToAssignments.compute(move.from.id, (key, value) -> value == null || value <= 1 ? null : value - 1); + this.brokerToAssignments.compute(move.to.id, (key, value) -> value == null ? 1 : value + 1); + + if (move.replica.isLeader) { + this.brokerToLeaders.compute(move.from.id, (key, value) -> value == null || value <= 1 ? null : value - 1); + this.brokerToLeaders.compute(move.to.id, (key, value) -> value == null ? 1 : value + 1); + } + + // Re-add broker nodes + this.sortedBrokersByAssignments.add(move.from); + this.sortedBrokersByAssignments.add(move.to); + + // Update internal partition maps to state after move + move.replica.id.partition.updatePartitionCounts(move); + + this.updatePossibleMoves(move.replica, null); + + // Update replica to state after move + move.replica.moved = true; + move.replica.currentBroker = move.to; + + this.computeAndUpdatePossibleMoves(move.replica); + + // Re-evaluate possible moves for each sibling replica + // This is required since the partitions internal state has been changed, + // so the improvement scores for each current possible move for each + // sibling replica might have changed to + for (final Replica replica : move.replica.id.partition.replicas) { + if (replica.id.equals(move.replica.id)) continue; + this.computeAndUpdatePossibleMoves(replica); + } + } + + private Move getBestMove() { + final Comparator comparator = new MovableScoreComparator(); + Move bestMove = null; + + final BigInteger currentGlobalBrokerScore = this.computeCurrentGlobalBrokerScore(); + final BigInteger currentLeaderScore = this.computeCurrentLeaderScore(); + + // Iterate through all brokers in descending order by replica assignment count + // (broker with the most assignments first, broker with the least assignments last) + final Iterator fromBrokerIterator = this.sortedBrokersByAssignments.descendingIterator(); + + while (fromBrokerIterator.hasNext()) { + final BrokerMetadata fromBroker = fromBrokerIterator.next(); + final Map> fromBrokerMoves = this.possibleMoves.get(fromBroker); + + if (fromBrokerMoves == null) continue; + + // Iterate through all brokers in ascending order by replica assignment count + // (broker with the least assignments first, broker with the most assignments last) + final Iterator toBrokerIterator = this.sortedBrokersByAssignments.iterator(); + boolean globalBrokerDistributionCouldImprove = false; + + while (toBrokerIterator.hasNext()) { + final BrokerMetadata toBroker = toBrokerIterator.next(); + if (fromBroker.equals(toBroker)) continue; + + final BigInteger newGlobalBrokerScore = this.computeNewGlobalBrokerScore(fromBroker, toBroker); + final BigInteger globalBrokerImprovement = newGlobalBrokerScore.subtract(currentGlobalBrokerScore); + final int globalBrokerImprovementToZero = globalBrokerImprovement.compareTo(BigInteger.ZERO); + + final BigInteger newLeaderScore = this.computeNewLeaderScore(fromBroker, toBroker); + final BigInteger potentialLeaderImprovement = newLeaderScore.subtract(currentLeaderScore); + + // Do not consider moves to brokers that would make the global broker distribution score worse + if (globalBrokerImprovementToZero >= 0) break; + globalBrokerDistributionCouldImprove = true; + + final TreeSet toBrokerMoves = fromBrokerMoves.get(toBroker); + if (toBrokerMoves == null) continue; + + // Only get the best move, since all other moves would have a lower priority in the global best + // moves set anyway + final MoveFrom moveFrom = getBestMoveFrom(toBrokerMoves, fromBroker, toBroker); + + BigInteger leaderImprovementScore = BigInteger.ZERO; + if (moveFrom.replica.isLeader) leaderImprovementScore = potentialLeaderImprovement; + + final Move bestMoveForBrokerCombination = new Move(moveFrom.replica, fromBroker, toBroker, globalBrokerImprovement, moveFrom.rackImprovementScore, leaderImprovementScore); + + // No move if no score changes + if (bestMoveForBrokerCombination.didImprove()) { + // Use the best move that moves a replica from fromBroker to toBroker if we do not have a + // best move yet or if the move is better than the current best move + if (bestMove == null || comparator.compare(bestMoveForBrokerCombination, bestMove) < 0) + bestMove = bestMoveForBrokerCombination; + } + } + + // Since the fromBrokerIterator and toBrokerIterator are ordered (descending by replica count, ascending by replica count) + // we can be sure that we will not find another from-broker once we checked a from-broker for which there is no other broker + // for which moving a partition from the from-broker to the broker would improve the global broker distribution + if (!globalBrokerDistributionCouldImprove) break; + } + + return bestMove; + } + + public Map> reassign() { + // First assign unassigned replicas + do { + // Choose a move to execute for as long as there exists a move + final Move bestInitialAssignment = this.getBestInitialAssignment(); + if (bestInitialAssignment == null) break; + + this.assign(bestInitialAssignment); + } while (true); + + // Make sure all unassigned replicas have been assigned + assert this.unassignedReplicas.isEmpty(); + + do { + // Choose a move to execute for as long as there exists a move that improves + // the replica assignment + final Move bestMove = this.getBestMove(); + if (bestMove == null) break; + + this.move(bestMove); + } while (true); + + final Map> newState = new HashMap<>(); + + for (final Partition partition : this.partitions) { + final TopicPartition topicPartition = partition.id; + final List replicaAssignments = new ArrayList<>(partition.replicas.size()); + + if (newState.put(topicPartition, replicaAssignments) != null) + throw new RuntimeException(String.format("New state for partition %s already created", partition)); + + for (int i = 0; i < partition.replicas.size(); i++) { + final Replica replica = this.replicas.get(new ReplicaId(partition, i)); + if (replica == null) + throw new RuntimeException(String.format("Replica %s does not exist for partition %s", i, partition)); + replicaAssignments.add(replica.currentBroker.id); + } + } + + return newState; + } + + private BigInteger computeCurrentGlobalBrokerScore() { + // The count per broker can grow as large as the number of replicas in + // the cluster. + // Using long arithmetic as for partitionBrokerScore or rackScore + // might not be safe for the globalBrokerScore. + // Therefore, BigIntegers are used instead + // Mathematically using just the square of the count is as good a + // metric for ranking the distribution scores as the squared + // residuals + BigInteger brokerScore = BigInteger.ZERO; + for (final BrokerMetadata broker : this.sortedBrokersByAssignments) { + final long replicasOnBroker = (long) this.brokerToAssignments.getOrDefault(broker.id, 0); + final BigInteger thisScore = BigInteger.valueOf(replicasOnBroker).pow(2); + brokerScore = brokerScore.add(thisScore); + } + return brokerScore; + } + + private BigInteger computeNewGlobalBrokerScore(BrokerMetadata from, BrokerMetadata to) { + // The count per broker can grow as large as the number of replicas in + // the cluster. + // Using long arithmetic as for partitionBrokerScore or rackScore + // might not be safe for the globalBrokerScore. + // Therefore, BigIntegers are used instead + // Mathematically using just the square of the count is as good a + // metric for ranking the distribution scores as the squared + // residuals + BigInteger brokerScore = BigInteger.ZERO; + for (final BrokerMetadata broker : this.sortedBrokersByAssignments) { + long replicasOnBroker = this.brokerToAssignments.getOrDefault(broker.id, 0); + // Assume that the move was executed and remove a replica from the from-rack count + // and add a replica to the to-rack count + if (broker.equals(from)) replicasOnBroker--; + if (broker.equals(to)) replicasOnBroker++; + + final BigInteger thisScore = BigInteger.valueOf(replicasOnBroker).pow(2); + brokerScore = brokerScore.add(thisScore); + } + return brokerScore; + } + + private BigInteger computeNewGlobalBrokerScoreForUnassignedReplica(BrokerMetadata to) { + // The count per broker can grow as large as the number of replicas in + // the cluster. + // Using long arithmetic as for partitionBrokerScore or rackScore + // might not be safe for the globalBrokerScore. + // Therefore, BigIntegers are used instead + // Mathematically using just the square of the count is as good a + // metric for ranking the distribution scores as the squared + // residuals + BigInteger brokerScore = BigInteger.ZERO; + for (final BrokerMetadata broker : this.sortedBrokersByAssignments) { + long replicasOnBroker = this.brokerToAssignments.getOrDefault(broker.id, 0); + // Assume that the move was executed and add a replica to the to-rack count + if (broker.equals(to)) replicasOnBroker++; + + final BigInteger thisScore = BigInteger.valueOf(replicasOnBroker).pow(2); + brokerScore = brokerScore.add(thisScore); + } + return brokerScore; + } + + private BigInteger computeCurrentLeaderScore() { + // The leader count per broker can grow as large as the number of + // partitions in the cluster. + // Using long arithmetic as for partitionBrokerScore or rackScore + // might not be safe for the globalBrokerLeaderScore. + // Therefore, BigIntegers are used instead + // Mathematically using just the square of the count is as good a + // metric for ranking the distribution scores as the squared + // residuals + BigInteger brokerScore = BigInteger.ZERO; + for (final BrokerMetadata broker : this.sortedBrokersByAssignments) { + final long replicasOnBroker = (long) this.brokerToLeaders.getOrDefault(broker.id, 0); + final BigInteger thisScore = BigInteger.valueOf(replicasOnBroker).pow(2); + brokerScore = brokerScore.add(thisScore); + } + return brokerScore; + } + + private BigInteger computeNewLeaderScore(BrokerMetadata fromBroker, BrokerMetadata to) { + // The leader count per broker can grow as large as the number of + // partitions in the cluster. + // Using long arithmetic as for partitionBrokerScore or rackScore + // might not be safe for the globalBrokerLeaderScore. + // Therefore, BigIntegers are used instead + // Mathematically using just the square of the count is as good a + // metric for ranking the distribution scores as the squared + // residuals + BigInteger brokerScore = BigInteger.ZERO; + for (final BrokerMetadata broker : this.sortedBrokersByAssignments) { + long replicasOnBroker = this.brokerToLeaders.getOrDefault(broker.id, 0); + // Assume that the move was executed and remove a replica from the from-rack count + // and add a replica to the to-rack count + if (broker.equals(fromBroker)) replicasOnBroker--; + if (broker.equals(to)) replicasOnBroker++; + + final BigInteger thisScore = BigInteger.valueOf(replicasOnBroker).pow(2); + brokerScore = brokerScore.add(thisScore); + } + return brokerScore; + } + + private BigInteger computeNewLeaderScoreForUnassignedReplica(BrokerMetadata to) { + // The leader count per broker can grow as large as the number of + // partitions in the cluster. + // Using long arithmetic as for partitionBrokerScore or rackScore + // might not be safe for the globalBrokerLeaderScore. + // Therefore, BigIntegers are used instead + // Mathematically using just the square of the count is as good a + // metric for ranking the distribution scores as the squared + // residuals + BigInteger brokerScore = BigInteger.ZERO; + for (final BrokerMetadata broker : this.sortedBrokersByAssignments) { + long replicasOnBroker = this.brokerToLeaders.getOrDefault(broker.id, 0); + // Assume that the move was executed and remove a replica from the from-rack count + // and add a replica to the to-rack count + if (broker.equals(to)) replicasOnBroker++; + + final BigInteger thisScore = BigInteger.valueOf(replicasOnBroker).pow(2); + brokerScore = brokerScore.add(thisScore); + } + return brokerScore; + } + + private record BrokerReplicaCountComparator( + Map assignments) implements Comparator, Serializable { + private static final long serialVersionUID = 1L; + + @Override + public int compare(BrokerMetadata o1, BrokerMetadata o2) { + int ret = this.assignments.getOrDefault(o1.id, 0).compareTo(this.assignments.getOrDefault(o2.id, 0)); + if (ret != 0) return ret; + return o1.id - o2.id; + } + } + + private static class MovableFromScoreComparator implements Comparator, Serializable { + private static final long serialVersionUID = 1L; + + @Override + public int compare(MoveFrom o1, MoveFrom o2) { + // Prefer already moved replicas + final int o1Moved = o1.replica.moved ? 0 : 1; + final int o2Moved = o2.replica.moved ? 0 : 1; + int ret = o1Moved - o2Moved; + if (ret != 0) return ret; + + ret = o1.rackImprovementScore.compareTo(o2.rackImprovementScore); + if (ret != 0) return ret; + + ret = o1.to.id - o2.to.id; + if (ret != 0) return ret; + + return o1.replica.compareTo(o2.replica); + } + } + + private static class MovableScoreComparator implements Comparator, Serializable { + private static final long serialVersionUID = 1L; + + private int compareImprovementScores(Move o1, Move o2) { + int ret = o1.globalBrokerImprovementScore.compareTo(o2.globalBrokerImprovementScore); + if (ret != 0) return ret; + ret = o1.rackImprovementScore.compareTo(o2.rackImprovementScore); + if (ret != 0) return ret; + return o1.leaderImprovementScore.compareTo(o2.leaderImprovementScore); + } + + @Override + public int compare(Move o1, Move o2) { + // Prefer already moved replicas that improve at least one score over unmoved replicas + final int o1Moved = o1.replica.moved ? 0 : 1; + final int o2Moved = o2.replica.moved ? 0 : 1; + int ret = o1Moved - o2Moved; + if (ret != 0) return ret; + + ret = this.compareImprovementScores(o1, o2); + if (ret != 0) return ret; + + ret = o1.to.id - o2.to.id; + if (ret != 0) return ret; + + return o1.replica.compareTo(o2.replica); + } + } + + private record ReplicaId(Partition partition, int id) implements Comparable { + @Override + public String toString() { + return "ReplicaId{" + "partition=" + partition + ", id=" + id + '}'; + } + + @Override + public int compareTo(ReplicaId o) { + int ret = this.partition.compareTo(o.partition); + if (ret != 0) return ret; + return this.id - o.id; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + ReplicaId replicaId = (ReplicaId) o; + return id == replicaId.id && Objects.equals(partition, replicaId.partition); + } + + @Override + public int hashCode() { + return Objects.hash(partition, id); + } + } + + private static class Partition implements Comparable { + final TopicPartition id; + + final Set replicas; + + final Map, Integer> replicaCountPerRack; + final Map replicaCountPerBroker; + + public Partition(TopicPartition id) { + this.id = id; + this.replicas = new HashSet<>(); + this.replicaCountPerRack = new HashMap<>(); + this.replicaCountPerBroker = new HashMap<>(); + } + + public void initPartitionCounts(Replica replica) { + if (!replica.id.partition.equals(this)) + throw new RuntimeException(String.format("Replica %s does not belong to partition %s", replica.id, this)); + if (replica.currentBroker == null) return; + + final BrokerMetadata newBroker = replica.currentBroker; + final Optional newRack = newBroker.rack; + + this.replicaCountPerRack.compute(newRack, (key, value) -> value == null ? 1 : value + 1); + this.replicaCountPerBroker.compute(newBroker, (key, value) -> value == null ? 1 : value + 1); + } + + public void updatePartitionCounts(Move move) { + if (!move.replica.id.partition.equals(this)) + throw new RuntimeException(String.format("Replica %s does not belong to partition %s", move.replica.id, this)); + + final BrokerMetadata previousBroker = move.from; + final Optional previousRack = previousBroker.rack; + final BrokerMetadata newBroker = move.to; + final Optional newRack = newBroker.rack; + + this.replicaCountPerRack.compute(previousRack, (key, value) -> value == null || value <= 1 ? null : value - 1); + this.replicaCountPerRack.compute(newRack, (key, value) -> value == null ? 1 : value + 1); + + this.replicaCountPerBroker.compute(previousBroker, (key, value) -> value == null || value <= 1 ? null : value - 1); + this.replicaCountPerBroker.compute(newBroker, (key, value) -> value == null ? 1 : value + 1); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + Partition partition = (Partition) o; + return Objects.equals(id, partition.id); + } + + @Override + public int hashCode() { + return Objects.hashCode(id); + } + + @Override + public int compareTo(Partition o) { + int ret = this.id.topic().compareTo(o.id.topic()); + if (ret != 0) return ret; + return this.id.partition() - o.id.partition(); + } + } + + private static class Replica implements Comparable { + + final ReplicaId id; + + BrokerMetadata currentBroker; + + Set possibleMoves; + boolean moved; + boolean isLeader; + + public Replica(ReplicaId id, BrokerMetadata currentBroker, boolean isLeader) { + this.id = id; + this.currentBroker = currentBroker; + this.moved = false; + this.isLeader = isLeader; + this.possibleMoves = null; + } + + @Override + public String toString() { + return "Replica{" + "id=" + id + ", currentBroker=" + currentBroker + '}'; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + Replica replica = (Replica) o; + return Objects.equals(id, replica.id); + } + + @Override + public int hashCode() { + return Objects.hashCode(id); + } + + @Override + public int compareTo(Replica o) { + return this.id.compareTo(o.id); + } + } + + private record MoveFrom(Replica replica, BrokerMetadata to, Long rackImprovementScore) { + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + MoveFrom moveFrom = (MoveFrom) o; + return Objects.equals(replica, moveFrom.replica) && Objects.equals(to, moveFrom.to); + } + + @Override + public int hashCode() { + return Objects.hash(replica, to); + } + + public boolean didImprove() { + return rackImprovementScore < 0; + } + } + + private record Move(Replica replica, BrokerMetadata from, BrokerMetadata to, + BigInteger globalBrokerImprovementScore, Long rackImprovementScore, + BigInteger leaderImprovementScore) { + + public boolean didImprove() { + return globalBrokerImprovementScore.compareTo(BigInteger.ZERO) < 0 || rackImprovementScore < 0 || leaderImprovementScore.compareTo(BigInteger.ZERO) < 0; + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java index 0dedf567c49d9..fac250c9e12ed 100644 --- a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java +++ b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java @@ -241,7 +241,8 @@ public void run(ExperimentDef config, Journal journal, boolean displayChartsOnSc cluster.brokers().values().stream() .map(server -> String.valueOf(server.replicaManager().localBrokerId())) .collect(Collectors.joining(",")), - true + true, + false ).getKey(); System.out.println("Starting Reassignment"); diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java index 2c30cb38fb4aa..e84f367ed3e04 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java @@ -175,7 +175,7 @@ public void testGenerateAssignmentWithBootstrapServer() throws Exception { String assignment = "{\"version\":1,\"partitions\":" + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + "]}"; - generateAssignment(admin, assignment, "1,2,3", false); + generateAssignment(admin, assignment, "1,2,3", false, false); Map finalAssignment = singletonMap(foo0, new PartitionReassignmentState(asList(0, 1, 2), asList(3, 1, 2), true)); waitForVerifyAssignment(admin, assignment, false, diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java index fb8fa19d6a8f5..9f53ef051bdbf 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java @@ -361,7 +361,7 @@ public void testGenerateAssignmentFailsWithoutEnoughReplicas() { addTopics(adminClient); assertStartsWith("Replication factor: 3 larger than available brokers: 2", assertThrows(InvalidReplicationFactorException.class, - () -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"bar\"}]}", "0,1", false), + () -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"bar\"}]}", "0,1", false, false), "Expected generateAssignment to fail").getMessage()); } } @@ -372,7 +372,7 @@ public void testGenerateAssignmentWithInvalidPartitionsFails() { addTopics(adminClient); assertStartsWith("Topic quux not found", assertThrows(ExecutionException.class, - () -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"quux\"}]}", "0,1", false), + () -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"quux\"}]}", "0,1", false, false), "Expected generateAssignment to fail").getCause().getMessage()); } } @@ -392,11 +392,11 @@ public void testGenerateAssignmentWithInconsistentRacks() throws Exception { addTopics(adminClient); assertStartsWith("Not all brokers have rack information.", assertThrows(AdminOperationException.class, - () -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", true), + () -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", true, false), "Expected generateAssignment to fail").getMessage()); // It should succeed when --disable-rack-aware is used. Entry>, Map>> - proposedCurrent = generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", false); + proposedCurrent = generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", false, false); Map> expCurrent = new HashMap<>(); @@ -416,7 +416,7 @@ public void testGenerateAssignmentWithFewerBrokers() throws Exception { Entry>, Map>> proposedCurrent = generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"bar\"}]}", - goalBrokers.stream().map(Object::toString).collect(Collectors.joining(",")), false); + goalBrokers.stream().map(Object::toString).collect(Collectors.joining(",")), false, false); Map> expCurrent = new HashMap<>(); @@ -434,6 +434,33 @@ public void testGenerateAssignmentWithFewerBrokers() throws Exception { } } + @Test + public void testGenerateStickyAssignmentWithFewerBrokers() throws Exception { + try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) { + addTopics(adminClient); + List goalBrokers = asList(0, 1, 3); + + Entry>, Map>> + proposedCurrent = generateAssignment(adminClient, + "{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"bar\"}]}", + goalBrokers.stream().map(Object::toString).collect(Collectors.joining(",")), false, true); + + Map> expCurrent = new HashMap<>(); + + expCurrent.put(new TopicPartition("foo", 0), asList(0, 1, 2)); + expCurrent.put(new TopicPartition("foo", 1), asList(1, 2, 3)); + expCurrent.put(new TopicPartition("bar", 0), asList(2, 3, 0)); + + assertEquals(expCurrent, proposedCurrent.getValue()); + + // The proposed assignment should only span the provided brokers + proposedCurrent.getKey().values().forEach(replicas -> + assertTrue(goalBrokers.containsAll(replicas), + "Proposed assignment " + proposedCurrent.getKey() + " puts replicas on brokers other than " + goalBrokers) + ); + } + } + @Test public void testCurrentPartitionReplicaAssignmentToString() throws Exception { Map> proposedParts = new HashMap<>(); diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/StickyPartitionReassignorUnitTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/StickyPartitionReassignorUnitTest.java new file mode 100644 index 0000000000000..6fa611c3459ec --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/StickyPartitionReassignorUnitTest.java @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * (c) Copyright 2025, SAP SE and apache/kafka contributors + */ +package org.apache.kafka.tools.reassign; + +import net.jqwik.api.Arbitraries; +import net.jqwik.api.Arbitrary; +import net.jqwik.api.Combinators; +import net.jqwik.api.ForAll; +import net.jqwik.api.Property; +import net.jqwik.api.Provide; +import org.apache.kafka.admin.BrokerMetadata; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Assertions; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.hamcrest.CoreMatchers.any; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo; +import static org.hamcrest.number.OrderingComparison.lessThan; +import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo; + + +public class StickyPartitionReassignorUnitTest { + + private static List brokersNoRackAwarenessFromBrokerLength(int brokersLength) { + final List brokers = new ArrayList<>(); + for (int j = 0; j < brokersLength; j++) { + brokers.add(new BrokerMetadata(j, Optional.empty())); + } + return brokers; + } + + private static List racksFromRacksLength(int racksLength) { + final List racks = new ArrayList<>(); + for (int i = 0; i < racksLength; i++) { + racks.add(i, Integer.toString(i)); + } + return racks; + } + + private static Arbitrary> assignBrokersToRacks(List racks, List brokers) { + if (racks.size() > brokers.size()) + throw new RuntimeException("Can not represent each rack with available brokers"); + // Fast track + if (racks.size() == brokers.size()) { + return Arbitraries.shuffle(racks).map(shuffledRacks -> { + final List newBrokers = new ArrayList<>(); + for (int i = 0; i < brokers.size(); i++) { + final String rack = racks.get(i); + final BrokerMetadata broker = brokers.get(i); + newBrokers.add(new BrokerMetadata(broker.id, Optional.of(rack))); + } + return newBrokers; + }); + } + + final Arbitrary rackArbitrary = Arbitraries.of(racks); + + return Arbitraries.shuffle(racks).flatMap(shuffledRacks -> { + final List> brokerArbitraries = new ArrayList<>(); + // Make sure that each rack is assigned to at least one broker + for (int i = 0; i < shuffledRacks.size(); i++) { + final String rack = shuffledRacks.get(i); + final BrokerMetadata broker = brokers.get(i); + brokerArbitraries.add(Arbitraries.just(new BrokerMetadata(broker.id, Optional.of(rack)))); + } + // Assign all other brokers a randomly selected rack + for (int i = shuffledRacks.size(); i < brokers.size(); i++) { + final BrokerMetadata broker = brokers.get(i); + brokerArbitraries.add(rackArbitrary.map(rack -> new BrokerMetadata(broker.id, Optional.of(rack)))); + } + + return Combinators.combine(brokerArbitraries).as(newBrokers -> newBrokers); + }); + } + + private static Arbitrary> brokerMetadataNoRackAwareness() { + return Arbitraries.integers().between(2, 20).map(StickyPartitionReassignorUnitTest::brokersNoRackAwarenessFromBrokerLength); + } + + private static Arbitrary> brokerMetadatas() { + return Arbitraries.integers().between(2, 15).map(StickyPartitionReassignorUnitTest::racksFromRacksLength).flatMap(racks -> Arbitraries.integers().between(racks.size(), 20).map(StickyPartitionReassignorUnitTest::brokersNoRackAwarenessFromBrokerLength).flatMap(brokers -> assignBrokersToRacks(racks, brokers))); + } + + private static Arbitrary> brokerMetadatasEquallyDistributedRacks() { + return Arbitraries.integers().between(2, 5).flatMap(racksLength -> Arbitraries.integers().between(1, 4).map(brokersPerRack -> { + final int brokersLength = racksLength * brokersPerRack; + final List brokers = new ArrayList<>(); + + for (int i = 0; i < brokersLength; i++) { + final int rack = i % racksLength; + brokers.add(new BrokerMetadata(i, Optional.of(Integer.toString(rack)))); + } + + return brokers; + })); + } + + private static Arbitrary> replicasFromBrokers(List brokers, int replicationFactor) { + if (brokers.size() < replicationFactor) + throw new RuntimeException("Replication factor is larger than available brokers"); + // Fast track + if (brokers.size() == replicationFactor) { + final List replicas = new ArrayList<>(); + for (final BrokerMetadata broker : brokers) { + replicas.add(broker.id); + } + return Arbitraries.shuffle(replicas); + } + + return Arbitraries.shuffle(brokers).flatMap(shuffledBrokers -> { + final List replicas = new ArrayList<>(); + for (int i = 0; i < replicationFactor; i++) { + final BrokerMetadata broker = shuffledBrokers.get(i); + replicas.add(broker.id); + } + return Arbitraries.shuffle(replicas); + }); + } + + private static Arbitrary>> generatePartitionReplicasFixedReplicationFactor(List brokers, int replicationFactor) { + return replicasFromBrokers(brokers, replicationFactor).list().ofMinSize(1).ofMaxSize(16); + } + + private static Arbitrary>> generatePartitionReplicas(List brokers, int maxReplicationFactor) { + if (brokers.isEmpty()) throw new RuntimeException("No brokers to assign partitions to"); + return Arbitraries.integers().between(1, maxReplicationFactor).flatMap(replicationFactor -> generatePartitionReplicasFixedReplicationFactor(brokers, replicationFactor)); + } + + private static Arbitrary>> generateAssignments(List brokers, int maxReplicationFactor) { + return generatePartitionReplicas(brokers, maxReplicationFactor).list().ofMinSize(1).ofMaxSize(100).map(topics -> { + final Map> map = new HashMap<>(); + + for (int t = 0; t < topics.size(); t++) { + // Names do not matter to the reassignment, so we are just stringifying the index + final String topic = Integer.toString(t); + final List> partitions = topics.get(t); + + for (int p = 0; p < partitions.size(); p++) { + final TopicPartition topicPartition = new TopicPartition(topic, p); + final List replicas = new ArrayList<>(partitions.get(p)); + map.put(topicPartition, replicas); + } + } + + return map; + }); + } + + private static Arbitrary metadataForBrokers(Arbitrary> brokerArbitrary) { + return brokerArbitrary.flatMap(brokers -> generateAssignments(brokers, brokers.size()).map(assignments -> new ClusterState(brokers, assignments))); + } + + private static Arbitrary metadataForBrokersBrokersRemoved(Arbitrary> brokerArbitrary) { + return brokerArbitrary.filter(brokers -> brokers.size() > 1).flatMap(brokers -> Arbitraries.integers().between(1, brokers.size() - 1).flatMap(brokersRemoved -> { + final List brokersDuringRebalancing = new ArrayList<>(); + for (int i = 0; i < brokers.size(); i++) { + if (i >= brokersRemoved) brokersDuringRebalancing.add(brokers.get(i)); + } + + return generateAssignments(brokers, brokersDuringRebalancing.size()).map(assignments -> new ClusterState(brokersDuringRebalancing, assignments)); + })); + } + + private static Map brokerMap(List brokers) { + final Map brokerMap = new HashMap<>(); + for (final BrokerMetadata broker : brokers) { + brokerMap.put(broker.id, broker); + } + return brokerMap; + } + + private static Map replicaCountPerBroker(Map> assignments) { + final Map replicaCountPerBroker = new HashMap<>(); + for (final Map.Entry> entry : assignments.entrySet()) { + for (final Integer nodeId : entry.getValue()) { + replicaCountPerBroker.compute(nodeId, (key, value) -> value == null ? 1 : value + 1); + } + } + return replicaCountPerBroker; + } + + private static Set> racks(List brokers) { + final Set> racks = new HashSet<>(); + for (final BrokerMetadata broker : brokers) { + racks.add(broker.rack); + } + return racks; + } + + private static long topicPartitionRackScore(Map brokers, Set> racks, List assignments) { + final Map, Integer> countPerRacks = new HashMap<>(); + for (final Integer nodeId : assignments) { + final BrokerMetadata broker = brokers.get(nodeId); + countPerRacks.compute(broker.rack, (key, value) -> value == null ? 1 : value + 1); + } + + long score = 0; + for (final Optional rack : racks) { + final Integer countForRack = countPerRacks.getOrDefault(rack, 0); + score += (long) Math.pow(countForRack, 2); + } + return score; + } + + private static long topicPartitionBrokerScore(Map brokers, List assignments) { + final Map countPerBroker = new HashMap<>(); + for (final Integer nodeId : assignments) { + countPerBroker.compute(nodeId, (key, value) -> value == null ? 1 : value + 1); + } + + long score = 0; + for (final Map.Entry entry : brokers.entrySet()) { + final Integer countForBroker = countPerBroker.getOrDefault(entry.getKey(), 0); + score += (long) Math.pow(countForBroker, 2); + } + return score; + } + + private static BigInteger globalBrokerScore(Map brokers, Map> assignments) { + final Map countPerBroker = replicaCountPerBroker(assignments); + + BigInteger score = BigInteger.ZERO; + for (final Map.Entry entry : brokers.entrySet()) { + final long countForBroker = (long) countPerBroker.getOrDefault(entry.getKey(), 0); + final BigInteger brokerScore = BigInteger.valueOf(countForBroker).pow(2); + score = score.add(brokerScore); + } + return score; + } + + private static int countMoves(Map> previous, Map> current) { + int moves = 0; + for (final Map.Entry> previousEntry : previous.entrySet()) { + final List previousReplicas = previousEntry.getValue(); + final List currentReplicas = current.get(previousEntry.getKey()); + if (currentReplicas == null || previousReplicas.size() != currentReplicas.size()) + throw new RuntimeException(); + + for (int i = 0; i < previousReplicas.size(); i++) { + final Integer previousNodeId = previousReplicas.get(i); + final Integer currentNodeId = currentReplicas.get(i); + if (previousNodeId == null || currentNodeId == null) throw new RuntimeException(); + if (!previousNodeId.equals(currentNodeId)) moves++; + } + } + return moves; + } + + @Provide + private Arbitrary metadataRackAware() { + return metadataForBrokers(brokerMetadatas()); + } + + @Provide + private Arbitrary metadataRackAwareBrokersRemoved() { + return metadataForBrokersBrokersRemoved(brokerMetadatas()); + } + + @Provide + private Arbitrary metadataRackAwareBrokersEquallyDistributed() { + return metadataForBrokers(brokerMetadatasEquallyDistributedRacks()); + } + + @Provide + private Arbitrary metadataNoRackAwareness() { + return metadataForBrokers(brokerMetadataNoRackAwareness()); + } + + @Provide + private Arbitrary metadataNoRackAwarenessBrokersRemoved() { + return metadataForBrokersBrokersRemoved(brokerMetadataNoRackAwareness()); + } + + @Provide + private Arbitrary anyMetadata() { + return Arbitraries.oneOf(metadataNoRackAwareness(), metadataRackAware()); + } + + @Provide + private Arbitrary anyMetadataBrokersRemoved() { + return Arbitraries.oneOf(metadataNoRackAwarenessBrokersRemoved(), metadataRackAwareBrokersRemoved()); + } + + + @Property + void topicPartitionRackDistributionIsNotWorseThanBefore(@ForAll("metadataRackAware") ClusterState metadata) { + final StickyPartitionReassignor assignor = new StickyPartitionReassignor(metadata.assignments, metadata.brokers); + final Map> newPartitionAssignments = assignor.reassign(); + + final Map brokers = brokerMap(metadata.brokers); + final Set> racks = racks(metadata.brokers); + + Assertions.assertEquals(metadata.assignments.size(), newPartitionAssignments.size()); + + for (final Map.Entry> currentEntry : metadata.assignments.entrySet()) { + final TopicPartition topicPartition = currentEntry.getKey(); + final List currentAssignments = currentEntry.getValue(); + final List newAssignments = newPartitionAssignments.get(topicPartition); + + Assertions.assertNotNull(newAssignments); + Assertions.assertEquals(currentAssignments.size(), newAssignments.size()); + + final long currentRackScore = topicPartitionRackScore(brokers, racks, currentAssignments); + final long newRackScore = topicPartitionRackScore(brokers, racks, newAssignments); + + Assertions.assertTrue(newRackScore <= currentRackScore); + } + } + + @Property + void topicPartitionBrokerDistributionIsNotWorseThanBefore(@ForAll("anyMetadata") ClusterState metadata) { + final StickyPartitionReassignor assignor = new StickyPartitionReassignor(metadata.assignments, metadata.brokers); + final Map> newPartitionAssignments = assignor.reassign(); + + final Map brokers = brokerMap(metadata.brokers); + + assertThat(newPartitionAssignments.size(), is(equalTo(metadata.assignments.size()))); + + for (final Map.Entry> currentEntry : metadata.assignments.entrySet()) { + final TopicPartition topicPartition = currentEntry.getKey(); + final List currentAssignments = currentEntry.getValue(); + final List newAssignments = newPartitionAssignments.get(topicPartition); + + assertThat(newAssignments, is(notNullValue())); + assertThat(newAssignments.size(), is(equalTo(currentAssignments.size()))); + + final long currentBrokerScore = topicPartitionBrokerScore(brokers, currentAssignments); + final long newBrokerScore = topicPartitionBrokerScore(brokers, newAssignments); + + assertThat(newBrokerScore, is(lessThanOrEqualTo(currentBrokerScore))); + } + } + + @Property + void globalBrokerDistributionIsNotWorseThanBefore(@ForAll("anyMetadata") ClusterState metadata) { + final Map brokers = brokerMap(metadata.brokers); + final BigInteger currentDistributionScore = globalBrokerScore(brokers, metadata.assignments); + + final StickyPartitionReassignor assignor = new StickyPartitionReassignor(metadata.assignments, metadata.brokers); + final Map> newPartitionAssignments = assignor.reassign(); + + assertThat(newPartitionAssignments.size(), is(equalTo(metadata.assignments.size()))); + + final BigInteger newDistributionScore = globalBrokerScore(brokers, newPartitionAssignments); + + assertThat(newDistributionScore, is(lessThanOrEqualTo(currentDistributionScore))); + } + + + @Property + void reachesEquilibriumIfRacksAreNotSet(@ForAll("metadataNoRackAwareness") ClusterState metadata) { + final StickyPartitionReassignor assignor = new StickyPartitionReassignor(metadata.assignments, metadata.brokers); + final Map> newPartitionAssignments = assignor.reassign(); + + final Map newCountPerBroker = replicaCountPerBroker(newPartitionAssignments); + Integer newMinCount = null; + Integer newMaxCount = null; + for (final BrokerMetadata broker : metadata.brokers) { + final int brokerReplicaCount = newCountPerBroker.getOrDefault(broker.id, 0); + if (newMinCount == null || newMinCount > brokerReplicaCount) newMinCount = brokerReplicaCount; + if (newMaxCount == null || newMaxCount < brokerReplicaCount) newMaxCount = brokerReplicaCount; + } + + assertThat(newPartitionAssignments.size(), is(equalTo(metadata.assignments.size()))); + + assertThat(newMaxCount, is(notNullValue())); + assertThat(newMinCount, is(notNullValue())); + + assertThat(newMinCount + 1, is(greaterThanOrEqualTo(newMaxCount))); + } + + @Property + void reachesBetterGlobalBrokerDistributionOrRequiresAtLeastSameMoveCountForSameGlobalBrokerDistribution(@ForAll("anyMetadata") ClusterState metadata) { + final Map brokers = brokerMap(metadata.brokers); + + final Map> oldPartitionAssignments = ReassignPartitionsCommand.calculateAssignment(metadata.assignments, metadata.brokers); + final int oldMovesCount = countMoves(metadata.assignments, oldPartitionAssignments); + final BigInteger oldDistributionScore = globalBrokerScore(brokers, oldPartitionAssignments); + + final StickyPartitionReassignor assignor = new StickyPartitionReassignor(metadata.assignments, metadata.brokers); + final Map> newPartitionAssignments = assignor.reassign(); + final int newMovesCount = countMoves(metadata.assignments, newPartitionAssignments); + final BigInteger newDistributionScore = globalBrokerScore(brokers, newPartitionAssignments); + + assertThat(newPartitionAssignments.size(), is(equalTo(metadata.assignments.size()))); + + final int globalBrokerScoreComparison = newDistributionScore.compareTo(oldDistributionScore); + + if (globalBrokerScoreComparison == 0) { + // If the distributions are the same, make sure that at most as many moves are used as with the old algorithm + assertThat(newMovesCount, is(lessThanOrEqualTo(oldMovesCount))); + } else { + assertThat(globalBrokerScoreComparison, is(lessThan(0))); + } + } + + @Property + void willMoveEveryReplicaAssignedToRemovedBroker(@ForAll("anyMetadataBrokersRemoved") ClusterState metadata) { + final StickyPartitionReassignor assignor = new StickyPartitionReassignor(metadata.assignments, metadata.brokers); + final Map> newPartitionAssignments = assignor.reassign(); + + final Map brokers = brokerMap(metadata.brokers); + + assertThat(newPartitionAssignments.size(), is(equalTo(metadata.assignments.size()))); + + for (final Map.Entry> currentEntry : metadata.assignments.entrySet()) { + final TopicPartition topicPartition = currentEntry.getKey(); + final List currentAssignments = currentEntry.getValue(); + final List newAssignments = newPartitionAssignments.get(topicPartition); + + assertThat(newAssignments, is(notNullValue())); + assertThat(newAssignments.size(), is(equalTo(currentAssignments.size()))); + + for (final int nodeId : newAssignments) { + assertThat(brokers, hasEntry(is(equalTo(nodeId)), is(any(BrokerMetadata.class)))); + } + } + } + + private record ClusterState(List brokers, Map> assignments) { + } +}