Skip to content

Commit d67fc39

Browse files
committed
[server] Support generate and execute reblance plan
1 parent 8d6fcac commit d67fc39

File tree

76 files changed

+5841
-152
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+5841
-152
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@
2222
import org.apache.fluss.client.metadata.KvSnapshots;
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
2424
import org.apache.fluss.cluster.ServerNode;
25-
import org.apache.fluss.cluster.maintencance.GoalType;
26-
import org.apache.fluss.cluster.maintencance.RebalancePlanForBucket;
27-
import org.apache.fluss.cluster.maintencance.ServerTag;
25+
import org.apache.fluss.cluster.rebalance.GoalType;
26+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
27+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
28+
import org.apache.fluss.cluster.rebalance.ServerTag;
2829
import org.apache.fluss.config.ConfigOptions;
2930
import org.apache.fluss.exception.AuthorizationException;
3031
import org.apache.fluss.exception.DatabaseAlreadyExistException;
@@ -62,7 +63,6 @@
6263
import org.apache.fluss.metadata.TablePath;
6364
import org.apache.fluss.security.acl.AclBinding;
6465
import org.apache.fluss.security.acl.AclBindingFilter;
65-
import org.apache.fluss.shaded.netty4.io.netty.util.concurrent.CompleteFuture;
6666

6767
import java.util.Collection;
6868
import java.util.List;
@@ -523,7 +523,7 @@ ListOffsetsResult listOffsets(
523523
* it.
524524
* @return the generated rebalance plan for all the tableBuckets which need to do rebalance.
525525
*/
526-
CompleteFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
526+
CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
527527
List<GoalType> priorityGoals, boolean dryRun);
528528

529529
/**
@@ -537,7 +537,7 @@ CompleteFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
537537
*
538538
* @return the rebalance process for all the tableBuckets doing rebalance.
539539
*/
540-
CompleteFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess();
540+
CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess();
541541

542542
/**
543543
* Cannel the rebalance task.

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@
2424
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
2525
import org.apache.fluss.cluster.Cluster;
2626
import org.apache.fluss.cluster.ServerNode;
27-
import org.apache.fluss.cluster.maintencance.GoalType;
28-
import org.apache.fluss.cluster.maintencance.RebalancePlanForBucket;
29-
import org.apache.fluss.cluster.maintencance.ServerTag;
27+
import org.apache.fluss.cluster.rebalance.GoalType;
28+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
29+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
30+
import org.apache.fluss.cluster.rebalance.ServerTag;
3031
import org.apache.fluss.exception.LeaderNotAvailableException;
3132
import org.apache.fluss.metadata.DatabaseDescriptor;
3233
import org.apache.fluss.metadata.DatabaseInfo;
@@ -45,6 +46,7 @@
4546
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4647
import org.apache.fluss.rpc.gateway.TabletServerGateway;
4748
import org.apache.fluss.rpc.messages.AddServerTagRequest;
49+
import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
4850
import org.apache.fluss.rpc.messages.CreateAclsRequest;
4951
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
5052
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -69,13 +71,13 @@
6971
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
7072
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7173
import org.apache.fluss.rpc.messages.PbTablePath;
74+
import org.apache.fluss.rpc.messages.RebalanceRequest;
7275
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
7376
import org.apache.fluss.rpc.messages.TableExistsRequest;
7477
import org.apache.fluss.rpc.messages.TableExistsResponse;
7578
import org.apache.fluss.rpc.protocol.ApiError;
7679
import org.apache.fluss.security.acl.AclBinding;
7780
import org.apache.fluss.security.acl.AclBindingFilter;
78-
import org.apache.fluss.shaded.netty4.io.netty.util.concurrent.CompleteFuture;
7981
import org.apache.fluss.utils.MapUtils;
8082

8183
import javax.annotation.Nullable;
@@ -486,19 +488,22 @@ public CompletableFuture<Void> removeServerTag(
486488
}
487489

488490
@Override
489-
public CompleteFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
491+
public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
490492
List<GoalType> priorityGoals, boolean dryRun) {
491-
throw new UnsupportedOperationException("Support soon");
493+
RebalanceRequest request = new RebalanceRequest().setDryRun(dryRun);
494+
priorityGoals.forEach(goal -> request.addGoal(goal.value));
495+
return gateway.rebalance(request).thenApply(ClientRpcMessageUtils::toRebalancePlan);
492496
}
493497

494498
@Override
495-
public CompleteFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess() {
499+
public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess() {
496500
throw new UnsupportedOperationException("Support soon");
497501
}
498502

499503
@Override
500504
public CompletableFuture<Void> cancelRebalance() {
501-
throw new UnsupportedOperationException("Support soon");
505+
CancelRebalanceRequest request = new CancelRebalanceRequest();
506+
return gateway.cancelRebalance(request).thenApply(r -> null);
502507
}
503508

504509
@Override

fluss-client/src/main/java/org/apache/fluss/client/admin/RebalanceResultForBucket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.fluss.client.admin;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21-
import org.apache.fluss.cluster.maintencance.RebalanceStatusForBucket;
21+
import org.apache.fluss.cluster.rebalance.RebalanceStatusForBucket;
2222

2323
import java.util.List;
2424

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.client.metadata.LakeSnapshot;
2626
import org.apache.fluss.client.write.KvWriteBatch;
2727
import org.apache.fluss.client.write.ReadyWriteBatch;
28+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
2829
import org.apache.fluss.fs.FsPath;
2930
import org.apache.fluss.fs.FsPathAndFileName;
3031
import org.apache.fluss.fs.token.ObtainedSecurityToken;
@@ -51,10 +52,14 @@
5152
import org.apache.fluss.rpc.messages.PbPrefixLookupReqForBucket;
5253
import org.apache.fluss.rpc.messages.PbProduceLogReqForBucket;
5354
import org.apache.fluss.rpc.messages.PbPutKvReqForBucket;
55+
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
56+
import org.apache.fluss.rpc.messages.PbRebalancePlanForPartition;
57+
import org.apache.fluss.rpc.messages.PbRebalancePlanForTable;
5458
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
5559
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
5660
import org.apache.fluss.rpc.messages.ProduceLogRequest;
5761
import org.apache.fluss.rpc.messages.PutKvRequest;
62+
import org.apache.fluss.rpc.messages.RebalanceResponse;
5863

5964
import javax.annotation.Nullable;
6065

@@ -323,6 +328,48 @@ public static DropPartitionRequest makeDropPartitionRequest(
323328
return dropPartitionRequest;
324329
}
325330

331+
public static Map<TableBucket, RebalancePlanForBucket> toRebalancePlan(
332+
RebalanceResponse response) {
333+
Map<TableBucket, RebalancePlanForBucket> rebalancePlan = new HashMap<>();
334+
for (PbRebalancePlanForTable pbTable : response.getPlanForTablesList()) {
335+
long tableId = pbTable.getTableId();
336+
if (pbTable.getPartitionsPlansCount() == 0) {
337+
// none-partition table.
338+
for (PbRebalancePlanForBucket pbBucket : pbTable.getBucketsPlansList()) {
339+
int bucketId = pbBucket.getBucketId();
340+
rebalancePlan.put(
341+
new TableBucket(tableId, null, bucketId),
342+
toRebalancePlanForBucket(tableId, null, bucketId, pbBucket));
343+
}
344+
} else {
345+
// partition table.
346+
for (PbRebalancePlanForPartition pbPartition : pbTable.getPartitionsPlansList()) {
347+
long partitionId = pbPartition.getPartitionId();
348+
for (PbRebalancePlanForBucket pbBucket : pbPartition.getBucketsPlansList()) {
349+
int bucketId = pbBucket.getBucketId();
350+
rebalancePlan.put(
351+
new TableBucket(tableId, partitionId, bucketId),
352+
toRebalancePlanForBucket(tableId, partitionId, bucketId, pbBucket));
353+
}
354+
}
355+
}
356+
}
357+
return rebalancePlan;
358+
}
359+
360+
private static RebalancePlanForBucket toRebalancePlanForBucket(
361+
long tableId,
362+
@Nullable Long partitionId,
363+
int bucketId,
364+
PbRebalancePlanForBucket pbBucket) {
365+
return new RebalancePlanForBucket(
366+
new TableBucket(tableId, partitionId, bucketId),
367+
pbBucket.getOriginalLeader(),
368+
pbBucket.getNewLeader(),
369+
Arrays.stream(pbBucket.getOriginalReplicas()).boxed().collect(Collectors.toList()),
370+
Arrays.stream(pbBucket.getNewReplicas()).boxed().collect(Collectors.toList()));
371+
}
372+
326373
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
327374
return response.getPartitionsInfosList().stream()
328375
.map(

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.fluss.client.table.Table;
2525
import org.apache.fluss.client.table.writer.UpsertWriter;
2626
import org.apache.fluss.cluster.ServerNode;
27-
import org.apache.fluss.cluster.maintencance.ServerTag;
27+
import org.apache.fluss.cluster.rebalance.ServerTag;
2828
import org.apache.fluss.config.AutoPartitionTimeUnit;
2929
import org.apache.fluss.config.ConfigOptions;
3030
import org.apache.fluss.config.Configuration;

fluss-common/src/main/java/org/apache/fluss/cluster/maintencance/GoalType.java renamed to fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.fluss.cluster.maintencance;
18+
package org.apache.fluss.cluster.rebalance;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121

@@ -28,8 +28,20 @@
2828
*/
2929
@PublicEvolving
3030
public enum GoalType {
31+
/**
32+
* Goal to generate replica movement tasks to ensure that the number of replicas on each
33+
* tabletServer is near balanced.
34+
*/
3135
REPLICA_DISTRIBUTION_GOAL(0),
32-
PREFERRED_LEADER_GOAL(1);
36+
37+
/**
38+
* Goal to generate leadership movement and leader replica movement tasks to ensure that the
39+
* number of leader replicas on each tabletServer is near balanced.
40+
*/
41+
LEADER_REPLICA_DISTRIBUTION_GOAL(1),
42+
43+
/** Goal to move the leaders to the first replica of each tableBuckets. */
44+
PREFERRED_LEADER_GOAL(2);
3345

3446
public final int value;
3547

@@ -40,6 +52,8 @@ public enum GoalType {
4052
public static GoalType valueOf(int value) {
4153
if (value == REPLICA_DISTRIBUTION_GOAL.value) {
4254
return REPLICA_DISTRIBUTION_GOAL;
55+
} else if (value == LEADER_REPLICA_DISTRIBUTION_GOAL.value) {
56+
return LEADER_REPLICA_DISTRIBUTION_GOAL;
4357
} else if (value == PREFERRED_LEADER_GOAL.value) {
4458
return PREFERRED_LEADER_GOAL;
4559
} else {

fluss-common/src/main/java/org/apache/fluss/cluster/maintencance/RebalancePlanForBucket.java renamed to fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,41 +15,46 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.fluss.cluster.maintencance;
18+
package org.apache.fluss.cluster.rebalance;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
2122

2223
import java.util.List;
2324
import java.util.Objects;
2425

2526
/**
26-
* a Generated rebalance plan for a tableBucket.
27+
* a generated rebalance plan for a tableBucket.
2728
*
2829
* @since 0.8
2930
*/
3031
@PublicEvolving
3132
public class RebalancePlanForBucket {
32-
private final int bucketId;
33+
private final TableBucket tableBucket;
3334
private final int originalLeader;
3435
private final int newLeader;
3536
private final List<Integer> originReplicas;
3637
private final List<Integer> newReplicas;
3738

3839
public RebalancePlanForBucket(
39-
int bucketId,
40+
TableBucket tableBucket,
4041
int originalLeader,
4142
int newLeader,
4243
List<Integer> originReplicas,
4344
List<Integer> newReplicas) {
44-
this.bucketId = bucketId;
45+
this.tableBucket = tableBucket;
4546
this.originalLeader = originalLeader;
4647
this.newLeader = newLeader;
4748
this.originReplicas = originReplicas;
4849
this.newReplicas = newReplicas;
4950
}
5051

52+
public TableBucket getTableBucket() {
53+
return tableBucket;
54+
}
55+
5156
public int getBucketId() {
52-
return bucketId;
57+
return tableBucket.getBucket();
5358
}
5459

5560
public Integer getOriginalLeader() {
@@ -68,11 +73,15 @@ public List<Integer> getNewReplicas() {
6873
return newReplicas;
6974
}
7075

76+
public boolean isLeaderAction() {
77+
return originalLeader != newLeader;
78+
}
79+
7180
@Override
7281
public String toString() {
7382
return "RebalancePlanForBucket{"
74-
+ "bucketId="
75-
+ bucketId
83+
+ "tableBucket="
84+
+ tableBucket
7685
+ ", originalLeader="
7786
+ originalLeader
7887
+ ", newLeader="
@@ -93,7 +102,7 @@ public boolean equals(Object o) {
93102
return false;
94103
}
95104
RebalancePlanForBucket that = (RebalancePlanForBucket) o;
96-
return bucketId == that.bucketId
105+
return Objects.equals(tableBucket, that.tableBucket)
97106
&& originalLeader == that.originalLeader
98107
&& newLeader == that.newLeader
99108
&& Objects.equals(originReplicas, that.originReplicas)
@@ -102,6 +111,6 @@ public boolean equals(Object o) {
102111

103112
@Override
104113
public int hashCode() {
105-
return Objects.hash(bucketId, originalLeader, newLeader, originReplicas, newReplicas);
114+
return Objects.hash(tableBucket, originalLeader, newLeader, originReplicas, newReplicas);
106115
}
107116
}

0 commit comments

Comments
 (0)