Skip to content

Commit cd24c1c

Browse files
author
Jialun Peng
committed
KAFKA-19048: Minimal Movement Replica Balancing algorithm
The new replica rebalancing strategy aims to achieve the following objectives: 1. Minimal Movement: Minimize the number of replica relocations during rebalancing. 2. Replica Balancing: Ensure that replicas are evenly distributed across brokers. 3. Anti-Affinity Support: Support rack-aware allocation when enabled. 4. Leader Balancing: Distribute leader replicas evenly across brokers. 5. ISR Order Optimization: Optimize adjacency relationships to prevent failover traffic concentration in case of broker failures.
1 parent 998f486 commit cd24c1c

File tree

4 files changed

+110
-88
lines changed

4 files changed

+110
-88
lines changed

metadata/src/main/java/org/apache/kafka/metadata/MinimalMovementReplicaBalancer.java

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.kafka.metadata;
219

3-
import org.apache.kafka.admin.BrokerMetadata;
420
import org.apache.kafka.common.errors.BrokerNotAvailableException;
521
import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
622
import org.apache.kafka.common.errors.InvalidPartitionsException;
723
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
824
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
25+
import org.apache.kafka.metadata.placement.UsableBroker;
926

1027
import java.util.ArrayList;
1128
import java.util.Collections;
@@ -60,7 +77,7 @@
6077
*
6178
* <li><b>Node-Level Replica Distribution:</b>
6279
* <ul>
63-
<p>If the number of replicas assigned to a rack is not a multiple of the number of nodes in that rack, some nodes will host one additional replica compared to others.</p>
80+
* <p>If the number of replicas assigned to a rack is not a multiple of the number of nodes in that rack, some nodes will host one additional replica compared to others.</p>
6481
* <li><b>Rack Count = Replication Factor:</b>
6582
* <ul>
6683
* <li>If all racks contain an equal number of nodes, each node will have the same number of replicas.</li>
@@ -113,7 +130,7 @@ public class MinimalMovementReplicaBalancer {
113130
*/
114131
private final int replicationFactor;
115132

116-
private final List<BrokerMetadata> brokerMetadatas;
133+
private final List<UsableBroker> usableBrokers;
117134

118135
/**
119136
* Total number of replicas across all partitions (calculated as partitionCount * replicationFactor).
@@ -177,7 +194,7 @@ public class MinimalMovementReplicaBalancer {
177194
*/
178195
private final Map<String, Integer> rackToRemainderReplicaCount = new HashMap<>();
179196

180-
private final static String COMMON_REMAINDER_RACK = UUID.randomUUID().toString();
197+
private static final String COMMON_REMAINDER_RACK = UUID.randomUUID().toString();
181198

182199
/**
183200
* <p>Partition assignments per broker, tracking partition locations.</p>
@@ -205,20 +222,20 @@ public class MinimalMovementReplicaBalancer {
205222

206223
public static final String BROKER_ADJACENCY_SYMBOL = "->";
207224

208-
public MinimalMovementReplicaBalancer(Map<Integer, List<Integer>> assignment, List<Integer> targetBrokerIds, List<BrokerMetadata> brokerMetadatas, boolean enableRackAwareness) {
225+
public MinimalMovementReplicaBalancer(Map<Integer, List<Integer>> assignment, List<Integer> targetBrokerIds, List<UsableBroker> usableBrokers, boolean enableRackAwareness) {
209226
if (assignment == null || assignment.isEmpty()) {
210227
throw new InvalidReplicaAssignmentException("assignment is empty.");
211228
}
212229
if (targetBrokerIds == null) {
213230
throw new BrokerNotAvailableException("targetBrokerIds is null.");
214231
}
215232
// Nodes with unclear rack information in the proxy metadata are not considered as anti-affinity.
216-
if (brokerMetadatas == null) {
217-
this.brokerMetadatas = new ArrayList<>();
233+
if (usableBrokers == null) {
234+
this.usableBrokers = new ArrayList<>();
218235
} else {
219-
this.brokerMetadatas = new ArrayList<>(brokerMetadatas);
236+
this.usableBrokers = new ArrayList<>(usableBrokers);
220237
}
221-
if (this.brokerMetadatas.stream().map(broker -> broker.id).collect(Collectors.toSet()).size() < this.brokerMetadatas.size()) {
238+
if (this.usableBrokers.stream().map(UsableBroker::id).collect(Collectors.toSet()).size() < this.usableBrokers.size()) {
222239
throw new DuplicateBrokerRegistrationException("Duplicate broker ID found.");
223240
}
224241
this.assignment = assignment.entrySet().stream()
@@ -245,7 +262,7 @@ private void verifyingParameters() {
245262
.distinct()
246263
.count();
247264
if (replicationFactor > rackCount)
248-
throw new InvalidReplicationFactorException("Replication factor: " + replicationFactor + " larger than available brokers or rack: " + rackCount + ".");
265+
throw new InvalidReplicationFactorException("Replication factor: " + replicationFactor + " larger than available brokers or rack num: " + rackCount + ".");
249266
}
250267

251268
private void initializeClusterState() {
@@ -567,8 +584,8 @@ private boolean hasReplicaMoved(int brokerId, List<Integer> replicas, boolean ha
567584
*/
568585
private void balanceLeadersAcrossBrokers() {
569586
Map<Integer, Integer> brokerLeaderCount = new HashMap<>();
570-
for (Integer partition : assignment.keySet()) {
571-
List<Integer> replicas = assignment.get(partition);
587+
for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) {
588+
List<Integer> replicas = entry.getValue();
572589
int leaderId = replicas.get(0);
573590
Integer currentLeaderCount = brokerLeaderCount.getOrDefault(leaderId, 1);
574591
Integer idealLeaderCount = brokerIdealLeaderCount.get(leaderId);
@@ -772,16 +789,16 @@ private void identifyBrokerPartitionAssignments() {
772789
* This unifies the logic for both rack-aware and non-rack-aware scenarios.
773790
*/
774791
private void computeBrokerRackMap() {
775-
Set<Integer> existingBrokerIds = brokerMetadatas.stream()
776-
.map(b -> b.id)
792+
Set<Integer> existingBrokerIds = usableBrokers.stream()
793+
.map(UsableBroker::id)
777794
.collect(Collectors.toSet());
778-
involvedBrokerIds.stream().filter(brokerId -> !existingBrokerIds.contains(brokerId)).forEach(unknownBrokerId -> brokerMetadatas.add(new BrokerMetadata(unknownBrokerId, Optional.of(UUID.randomUUID().toString()))));
779-
brokerRackMapping = brokerMetadatas.stream()
780-
.filter(b -> involvedBrokerIds.contains(b.id))
795+
involvedBrokerIds.stream().filter(brokerId -> !existingBrokerIds.contains(brokerId)).forEach(unknownBrokerId -> usableBrokers.add(new UsableBroker(unknownBrokerId, Optional.of(UUID.randomUUID().toString()), false)));
796+
brokerRackMapping = usableBrokers.stream()
797+
.filter(b -> involvedBrokerIds.contains(b.id()))
781798
.collect(Collectors.toMap(
782-
broker -> broker.id,
799+
UsableBroker::id,
783800
broker -> enableRackAwareness
784-
? broker.rack.orElse(UUID.randomUUID().toString())
801+
? broker.rack().orElse(UUID.randomUUID().toString())
785802
: UUID.randomUUID().toString()
786803
));
787804
}

0 commit comments

Comments
 (0)