Skip to content

Commit 780e238

Browse files
committed
[server] Support generate and execute reblance plan
1 parent 98b0dc4 commit 780e238

File tree

74 files changed

+5509
-155
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+5509
-155
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@
2222
import org.apache.fluss.client.metadata.KvSnapshots;
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
2424
import org.apache.fluss.cluster.ServerNode;
25-
import org.apache.fluss.cluster.maintencance.GoalType;
26-
import org.apache.fluss.cluster.maintencance.RebalancePlanForBucket;
27-
import org.apache.fluss.cluster.maintencance.ServerTag;
25+
import org.apache.fluss.cluster.rebalance.GoalType;
26+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
27+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
28+
import org.apache.fluss.cluster.rebalance.ServerTag;
2829
import org.apache.fluss.config.ConfigOptions;
2930
import org.apache.fluss.exception.AuthorizationException;
3031
import org.apache.fluss.exception.DatabaseAlreadyExistException;
@@ -62,7 +63,6 @@
6263
import org.apache.fluss.metadata.TablePath;
6364
import org.apache.fluss.security.acl.AclBinding;
6465
import org.apache.fluss.security.acl.AclBindingFilter;
65-
import org.apache.fluss.shaded.netty4.io.netty.util.concurrent.CompleteFuture;
6666

6767
import java.util.Collection;
6868
import java.util.List;
@@ -523,7 +523,7 @@ ListOffsetsResult listOffsets(
523523
* it.
524524
* @return the generated rebalance plan for all the tableBuckets which need to do rebalance.
525525
*/
526-
CompleteFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
526+
CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
527527
List<GoalType> priorityGoals, boolean dryRun);
528528

529529
/**
@@ -537,7 +537,7 @@ CompleteFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
537537
*
538538
* @return the rebalance process for all the tableBuckets doing rebalance.
539539
*/
540-
CompleteFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess();
540+
CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess();
541541

542542
/**
543543
* Cannel the rebalance task.

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@
2424
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
2525
import org.apache.fluss.cluster.Cluster;
2626
import org.apache.fluss.cluster.ServerNode;
27-
import org.apache.fluss.cluster.maintencance.GoalType;
28-
import org.apache.fluss.cluster.maintencance.RebalancePlanForBucket;
29-
import org.apache.fluss.cluster.maintencance.ServerTag;
27+
import org.apache.fluss.cluster.rebalance.GoalType;
28+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
29+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
30+
import org.apache.fluss.cluster.rebalance.ServerTag;
3031
import org.apache.fluss.exception.LeaderNotAvailableException;
3132
import org.apache.fluss.metadata.DatabaseDescriptor;
3233
import org.apache.fluss.metadata.DatabaseInfo;
@@ -45,6 +46,7 @@
4546
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4647
import org.apache.fluss.rpc.gateway.TabletServerGateway;
4748
import org.apache.fluss.rpc.messages.AddServerTagRequest;
49+
import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
4850
import org.apache.fluss.rpc.messages.CreateAclsRequest;
4951
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
5052
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -69,13 +71,13 @@
6971
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
7072
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7173
import org.apache.fluss.rpc.messages.PbTablePath;
74+
import org.apache.fluss.rpc.messages.RebalanceRequest;
7275
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
7376
import org.apache.fluss.rpc.messages.TableExistsRequest;
7477
import org.apache.fluss.rpc.messages.TableExistsResponse;
7578
import org.apache.fluss.rpc.protocol.ApiError;
7679
import org.apache.fluss.security.acl.AclBinding;
7780
import org.apache.fluss.security.acl.AclBindingFilter;
78-
import org.apache.fluss.shaded.netty4.io.netty.util.concurrent.CompleteFuture;
7981
import org.apache.fluss.utils.MapUtils;
8082

8183
import javax.annotation.Nullable;
@@ -486,19 +488,22 @@ public CompletableFuture<Void> removeServerTag(
486488
}
487489

488490
@Override
489-
public CompleteFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
491+
public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
490492
List<GoalType> priorityGoals, boolean dryRun) {
491-
throw new UnsupportedOperationException("Support soon");
493+
RebalanceRequest request = new RebalanceRequest().setDryRun(dryRun);
494+
priorityGoals.forEach(goal -> request.addGoal(goal.value));
495+
return gateway.rebalance(request).thenApply(ClientRpcMessageUtils::toRebalancePlan);
492496
}
493497

494498
@Override
495-
public CompleteFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess() {
499+
public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess() {
496500
throw new UnsupportedOperationException("Support soon");
497501
}
498502

499503
@Override
500504
public CompletableFuture<Void> cancelRebalance() {
501-
throw new UnsupportedOperationException("Support soon");
505+
CancelRebalanceRequest request = new CancelRebalanceRequest();
506+
return gateway.cancelRebalance(request).thenApply(r -> null);
502507
}
503508

504509
@Override

fluss-client/src/main/java/org/apache/fluss/client/admin/RebalanceResultForBucket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.fluss.client.admin;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21-
import org.apache.fluss.cluster.maintencance.RebalanceStatusForBucket;
21+
import org.apache.fluss.cluster.rebalance.RebalanceStatusForBucket;
2222

2323
import java.util.List;
2424

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.client.metadata.LakeSnapshot;
2626
import org.apache.fluss.client.write.KvWriteBatch;
2727
import org.apache.fluss.client.write.ReadyWriteBatch;
28+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
2829
import org.apache.fluss.fs.FsPath;
2930
import org.apache.fluss.fs.FsPathAndFileName;
3031
import org.apache.fluss.fs.token.ObtainedSecurityToken;
@@ -51,10 +52,14 @@
5152
import org.apache.fluss.rpc.messages.PbPrefixLookupReqForBucket;
5253
import org.apache.fluss.rpc.messages.PbProduceLogReqForBucket;
5354
import org.apache.fluss.rpc.messages.PbPutKvReqForBucket;
55+
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
56+
import org.apache.fluss.rpc.messages.PbRebalancePlanForPartition;
57+
import org.apache.fluss.rpc.messages.PbRebalancePlanForTable;
5458
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
5559
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
5660
import org.apache.fluss.rpc.messages.ProduceLogRequest;
5761
import org.apache.fluss.rpc.messages.PutKvRequest;
62+
import org.apache.fluss.rpc.messages.RebalanceResponse;
5863

5964
import javax.annotation.Nullable;
6065

@@ -323,6 +328,48 @@ public static DropPartitionRequest makeDropPartitionRequest(
323328
return dropPartitionRequest;
324329
}
325330

331+
public static Map<TableBucket, RebalancePlanForBucket> toRebalancePlan(
332+
RebalanceResponse response) {
333+
Map<TableBucket, RebalancePlanForBucket> rebalancePlan = new HashMap<>();
334+
for (PbRebalancePlanForTable pbTable : response.getPlanForTablesList()) {
335+
long tableId = pbTable.getTableId();
336+
if (pbTable.getPartitionsPlansCount() == 0) {
337+
// none-partition table.
338+
for (PbRebalancePlanForBucket pbBucket : pbTable.getBucketsPlansList()) {
339+
int bucketId = pbBucket.getBucketId();
340+
rebalancePlan.put(
341+
new TableBucket(tableId, null, bucketId),
342+
toRebalancePlanForBucket(tableId, null, bucketId, pbBucket));
343+
}
344+
} else {
345+
// partition table.
346+
for (PbRebalancePlanForPartition pbPartition : pbTable.getPartitionsPlansList()) {
347+
long partitionId = pbPartition.getPartitionId();
348+
for (PbRebalancePlanForBucket pbBucket : pbPartition.getBucketsPlansList()) {
349+
int bucketId = pbBucket.getBucketId();
350+
rebalancePlan.put(
351+
new TableBucket(tableId, partitionId, bucketId),
352+
toRebalancePlanForBucket(tableId, partitionId, bucketId, pbBucket));
353+
}
354+
}
355+
}
356+
}
357+
return rebalancePlan;
358+
}
359+
360+
private static RebalancePlanForBucket toRebalancePlanForBucket(
361+
long tableId,
362+
@Nullable Long partitionId,
363+
int bucketId,
364+
PbRebalancePlanForBucket pbBucket) {
365+
return new RebalancePlanForBucket(
366+
new TableBucket(tableId, partitionId, bucketId),
367+
pbBucket.getOriginalLeader(),
368+
pbBucket.getNewLeader(),
369+
Arrays.stream(pbBucket.getOriginalReplicas()).boxed().collect(Collectors.toList()),
370+
Arrays.stream(pbBucket.getNewReplicas()).boxed().collect(Collectors.toList()));
371+
}
372+
326373
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
327374
return response.getPartitionsInfosList().stream()
328375
.map(

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.fluss.client.table.Table;
2525
import org.apache.fluss.client.table.writer.UpsertWriter;
2626
import org.apache.fluss.cluster.ServerNode;
27-
import org.apache.fluss.cluster.maintencance.ServerTag;
27+
import org.apache.fluss.cluster.rebalance.ServerTag;
2828
import org.apache.fluss.config.AutoPartitionTimeUnit;
2929
import org.apache.fluss.config.ConfigOptions;
3030
import org.apache.fluss.config.Configuration;

fluss-common/src/main/java/org/apache/fluss/cluster/maintencance/GoalType.java renamed to fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.fluss.cluster.maintencance;
18+
package org.apache.fluss.cluster.rebalance;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121

@@ -28,8 +28,20 @@
2828
*/
2929
@PublicEvolving
3030
public enum GoalType {
31+
/**
32+
* Goal to generate replica movement tasks to ensure that the number of replicas on each
33+
* tabletServer is near balanced.
34+
*/
3135
REPLICA_DISTRIBUTION_GOAL(0),
32-
PREFERRED_LEADER_GOAL(1);
36+
37+
/**
38+
* Goal to generate leadership movement and leader replica movement tasks to ensure that the
39+
* number of leader replicas on each tabletServer is near balanced.
40+
*/
41+
LEADER_REPLICA_DISTRIBUTION_GOAL(1),
42+
43+
/** Goal to move the leaders to the first replica of each tableBuckets. */
44+
PREFERRED_LEADER_GOAL(2);
3345

3446
public final int value;
3547

@@ -40,6 +52,8 @@ public enum GoalType {
4052
public static GoalType valueOf(int value) {
4153
if (value == REPLICA_DISTRIBUTION_GOAL.value) {
4254
return REPLICA_DISTRIBUTION_GOAL;
55+
} else if (value == LEADER_REPLICA_DISTRIBUTION_GOAL.value) {
56+
return LEADER_REPLICA_DISTRIBUTION_GOAL;
4357
} else if (value == PREFERRED_LEADER_GOAL.value) {
4458
return PREFERRED_LEADER_GOAL;
4559
} else {

fluss-common/src/main/java/org/apache/fluss/cluster/maintencance/RebalancePlanForBucket.java renamed to fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,41 +15,46 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.fluss.cluster.maintencance;
18+
package org.apache.fluss.cluster.rebalance;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
2122

2223
import java.util.List;
2324
import java.util.Objects;
2425

2526
/**
26-
* a Generated rebalance plan for a tableBucket.
27+
* a generated rebalance plan for a tableBucket.
2728
*
2829
* @since 0.8
2930
*/
3031
@PublicEvolving
3132
public class RebalancePlanForBucket {
32-
private final int bucketId;
33+
private final TableBucket tableBucket;
3334
private final int originalLeader;
3435
private final int newLeader;
3536
private final List<Integer> originReplicas;
3637
private final List<Integer> newReplicas;
3738

3839
public RebalancePlanForBucket(
39-
int bucketId,
40+
TableBucket tableBucket,
4041
int originalLeader,
4142
int newLeader,
4243
List<Integer> originReplicas,
4344
List<Integer> newReplicas) {
44-
this.bucketId = bucketId;
45+
this.tableBucket = tableBucket;
4546
this.originalLeader = originalLeader;
4647
this.newLeader = newLeader;
4748
this.originReplicas = originReplicas;
4849
this.newReplicas = newReplicas;
4950
}
5051

52+
public TableBucket getTableBucket() {
53+
return tableBucket;
54+
}
55+
5156
public int getBucketId() {
52-
return bucketId;
57+
return tableBucket.getBucket();
5358
}
5459

5560
public Integer getOriginalLeader() {
@@ -68,11 +73,15 @@ public List<Integer> getNewReplicas() {
6873
return newReplicas;
6974
}
7075

76+
public boolean isLeaderAction() {
77+
return originalLeader != newLeader;
78+
}
79+
7180
@Override
7281
public String toString() {
7382
return "RebalancePlanForBucket{"
74-
+ "bucketId="
75-
+ bucketId
83+
+ "tableBucket="
84+
+ tableBucket
7685
+ ", originalLeader="
7786
+ originalLeader
7887
+ ", newLeader="
@@ -93,7 +102,7 @@ public boolean equals(Object o) {
93102
return false;
94103
}
95104
RebalancePlanForBucket that = (RebalancePlanForBucket) o;
96-
return bucketId == that.bucketId
105+
return Objects.equals(tableBucket, that.tableBucket)
97106
&& originalLeader == that.originalLeader
98107
&& newLeader == that.newLeader
99108
&& Objects.equals(originReplicas, that.originReplicas)
@@ -102,6 +111,6 @@ public boolean equals(Object o) {
102111

103112
@Override
104113
public int hashCode() {
105-
return Objects.hash(bucketId, originalLeader, newLeader, originReplicas, newReplicas);
114+
return Objects.hash(tableBucket, originalLeader, newLeader, originReplicas, newReplicas);
106115
}
107116
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.cluster.rebalance;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
import java.util.List;
24+
25+
/**
26+
* Status of rebalance process for a tabletBucket.
27+
*
28+
* @since 0.8
29+
*/
30+
@PublicEvolving
31+
public class RebalanceResultForBucket {
32+
private final RebalancePlanForBucket rebalancePlanForBucket;
33+
private RebalanceStatusForBucket rebalanceStatusForBucket;
34+
35+
public RebalanceResultForBucket(
36+
RebalancePlanForBucket rebalancePlanForBucket,
37+
RebalanceStatusForBucket rebalanceStatusForBucket) {
38+
this.rebalancePlanForBucket = rebalancePlanForBucket;
39+
this.rebalanceStatusForBucket = rebalanceStatusForBucket;
40+
}
41+
42+
public TableBucket tableBucket() {
43+
return rebalancePlanForBucket.getTableBucket();
44+
}
45+
46+
public RebalancePlanForBucket planForBucket() {
47+
return rebalancePlanForBucket;
48+
}
49+
50+
public List<Integer> newReplicas() {
51+
return rebalancePlanForBucket.getNewReplicas();
52+
}
53+
54+
public RebalanceResultForBucket setNewStatus(RebalanceStatusForBucket status) {
55+
this.rebalanceStatusForBucket = status;
56+
return this;
57+
}
58+
59+
public RebalanceStatusForBucket status() {
60+
return rebalanceStatusForBucket;
61+
}
62+
63+
public static RebalanceResultForBucket of(
64+
RebalancePlanForBucket planForBucket, RebalanceStatusForBucket status) {
65+
return new RebalanceResultForBucket(planForBucket, status);
66+
}
67+
68+
@Override
69+
public String toString() {
70+
return "RebalanceResultForBucket{"
71+
+ "rebalancePlanForBucket="
72+
+ rebalancePlanForBucket
73+
+ ", rebalanceStatusForBucket="
74+
+ rebalanceStatusForBucket
75+
+ '}';
76+
}
77+
}

0 commit comments

Comments
 (0)