Skip to content

Commit 967ccb0

Browse files
committed
[server] Support generate and execute reblance plan
1 parent 94c0fbd commit 967ccb0

File tree

58 files changed

+5326
-41
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+5326
-41
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4747
import org.apache.fluss.rpc.gateway.TabletServerGateway;
4848
import org.apache.fluss.rpc.messages.AddServerTagRequest;
49+
import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
4950
import org.apache.fluss.rpc.messages.CreateAclsRequest;
5051
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
5152
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -70,6 +71,7 @@
7071
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
7172
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7273
import org.apache.fluss.rpc.messages.PbTablePath;
74+
import org.apache.fluss.rpc.messages.RebalanceRequest;
7375
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
7476
import org.apache.fluss.rpc.messages.TableExistsRequest;
7577
import org.apache.fluss.rpc.messages.TableExistsResponse;
@@ -488,7 +490,9 @@ public CompletableFuture<Void> removeServerTag(
488490
@Override
489491
public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
490492
List<GoalType> priorityGoals, boolean dryRun) {
491-
throw new UnsupportedOperationException("Support soon");
493+
RebalanceRequest request = new RebalanceRequest().setDryRun(dryRun);
494+
priorityGoals.forEach(goal -> request.addGoal(goal.value));
495+
return gateway.rebalance(request).thenApply(ClientRpcMessageUtils::toRebalancePlan);
492496
}
493497

494498
@Override
@@ -498,7 +502,8 @@ public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalan
498502

499503
@Override
500504
public CompletableFuture<Void> cancelRebalance() {
501-
throw new UnsupportedOperationException("Support soon");
505+
CancelRebalanceRequest request = new CancelRebalanceRequest();
506+
return gateway.cancelRebalance(request).thenApply(r -> null);
502507
}
503508

504509
@Override

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.client.metadata.LakeSnapshot;
2626
import org.apache.fluss.client.write.KvWriteBatch;
2727
import org.apache.fluss.client.write.ReadyWriteBatch;
28+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
2829
import org.apache.fluss.fs.FsPath;
2930
import org.apache.fluss.fs.FsPathAndFileName;
3031
import org.apache.fluss.fs.token.ObtainedSecurityToken;
@@ -51,10 +52,14 @@
5152
import org.apache.fluss.rpc.messages.PbPrefixLookupReqForBucket;
5253
import org.apache.fluss.rpc.messages.PbProduceLogReqForBucket;
5354
import org.apache.fluss.rpc.messages.PbPutKvReqForBucket;
55+
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
56+
import org.apache.fluss.rpc.messages.PbRebalancePlanForPartition;
57+
import org.apache.fluss.rpc.messages.PbRebalancePlanForTable;
5458
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
5559
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
5660
import org.apache.fluss.rpc.messages.ProduceLogRequest;
5761
import org.apache.fluss.rpc.messages.PutKvRequest;
62+
import org.apache.fluss.rpc.messages.RebalanceResponse;
5863

5964
import javax.annotation.Nullable;
6065

@@ -323,6 +328,48 @@ public static DropPartitionRequest makeDropPartitionRequest(
323328
return dropPartitionRequest;
324329
}
325330

331+
public static Map<TableBucket, RebalancePlanForBucket> toRebalancePlan(
332+
RebalanceResponse response) {
333+
Map<TableBucket, RebalancePlanForBucket> rebalancePlan = new HashMap<>();
334+
for (PbRebalancePlanForTable pbTable : response.getPlanForTablesList()) {
335+
long tableId = pbTable.getTableId();
336+
if (pbTable.getPartitionsPlansCount() == 0) {
337+
// none-partition table.
338+
for (PbRebalancePlanForBucket pbBucket : pbTable.getBucketsPlansList()) {
339+
int bucketId = pbBucket.getBucketId();
340+
rebalancePlan.put(
341+
new TableBucket(tableId, null, bucketId),
342+
toRebalancePlanForBucket(tableId, null, bucketId, pbBucket));
343+
}
344+
} else {
345+
// partition table.
346+
for (PbRebalancePlanForPartition pbPartition : pbTable.getPartitionsPlansList()) {
347+
long partitionId = pbPartition.getPartitionId();
348+
for (PbRebalancePlanForBucket pbBucket : pbPartition.getBucketsPlansList()) {
349+
int bucketId = pbBucket.getBucketId();
350+
rebalancePlan.put(
351+
new TableBucket(tableId, partitionId, bucketId),
352+
toRebalancePlanForBucket(tableId, partitionId, bucketId, pbBucket));
353+
}
354+
}
355+
}
356+
}
357+
return rebalancePlan;
358+
}
359+
360+
private static RebalancePlanForBucket toRebalancePlanForBucket(
361+
long tableId,
362+
@Nullable Long partitionId,
363+
int bucketId,
364+
PbRebalancePlanForBucket pbBucket) {
365+
return new RebalancePlanForBucket(
366+
new TableBucket(tableId, partitionId, bucketId),
367+
pbBucket.getOriginalLeader(),
368+
pbBucket.getNewLeader(),
369+
Arrays.stream(pbBucket.getOriginalReplicas()).boxed().collect(Collectors.toList()),
370+
Arrays.stream(pbBucket.getNewReplicas()).boxed().collect(Collectors.toList()));
371+
}
372+
326373
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
327374
return response.getPartitionsInfosList().stream()
328375
.map(

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ public List<Integer> getNewReplicas() {
7373
return newReplicas;
7474
}
7575

76+
public boolean isLeaderAction() {
77+
return originalLeader != newLeader;
78+
}
79+
7680
@Override
7781
public String toString() {
7882
return "RebalancePlanForBucket{"

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public Map<Long, TablePath> allTables() {
165165
return tablePathById;
166166
}
167167

168-
public Set<TableBucket> allBuckets() {
168+
public Set<TableBucket> getAllBuckets() {
169169
Set<TableBucket> allBuckets = new HashSet<>();
170170
for (Map.Entry<Long, Map<Integer, List<Integer>>> tableAssign :
171171
tableAssignments.entrySet()) {

0 commit comments

Comments
 (0)