Skip to content

Commit bacdb71

Browse files
committed
WIP
1 parent 9bae99e commit bacdb71

File tree

6 files changed

+147
-2
lines changed

6 files changed

+147
-2
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.fluss.rpc.messages.ListDatabasesResponse;
6767
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
6868
import org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
69+
import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest;
6970
import org.apache.fluss.rpc.messages.ListTablesRequest;
7071
import org.apache.fluss.rpc.messages.ListTablesResponse;
7172
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
@@ -497,7 +498,8 @@ public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
497498

498499
@Override
499500
public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess() {
500-
throw new UnsupportedOperationException("Support soon");
501+
return gateway.listRebalanceProcess(new ListRebalanceProcessRequest())
502+
.thenApply(ClientRpcMessageUtils::toRebalanceProcess);
501503
}
502504

503505
@Override

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.fluss.client.write.KvWriteBatch;
2727
import org.apache.fluss.client.write.ReadyWriteBatch;
2828
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
29+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
30+
import org.apache.fluss.cluster.rebalance.RebalanceStatusForBucket;
2931
import org.apache.fluss.fs.FsPath;
3032
import org.apache.fluss.fs.FsPathAndFileName;
3133
import org.apache.fluss.fs.token.ObtainedSecurityToken;
@@ -42,6 +44,7 @@
4244
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotResponse;
4345
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
4446
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
47+
import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse;
4548
import org.apache.fluss.rpc.messages.LookupRequest;
4649
import org.apache.fluss.rpc.messages.MetadataRequest;
4750
import org.apache.fluss.rpc.messages.PbKeyValue;
@@ -55,6 +58,9 @@
5558
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
5659
import org.apache.fluss.rpc.messages.PbRebalancePlanForPartition;
5760
import org.apache.fluss.rpc.messages.PbRebalancePlanForTable;
61+
import org.apache.fluss.rpc.messages.PbRebalanceProcessForBucket;
62+
import org.apache.fluss.rpc.messages.PbRebalanceProcessForPartition;
63+
import org.apache.fluss.rpc.messages.PbRebalanceProcessForTable;
5864
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
5965
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
6066
import org.apache.fluss.rpc.messages.ProduceLogRequest;
@@ -370,6 +376,53 @@ private static RebalancePlanForBucket toRebalancePlanForBucket(
370376
Arrays.stream(pbBucket.getNewReplicas()).boxed().collect(Collectors.toList()));
371377
}
372378

379+
public static Map<TableBucket, RebalanceResultForBucket> toRebalanceProcess(
380+
ListRebalanceProcessResponse response) {
381+
Map<TableBucket, RebalanceResultForBucket> rebalanceProcess = new HashMap<>();
382+
383+
for (PbRebalanceProcessForTable pbRebalanceProcessForTable :
384+
response.getProcessForTablesList()) {
385+
long tableId = pbRebalanceProcessForTable.getTableId();
386+
387+
for (PbRebalanceProcessForPartition pbRebalanceProcessForPartition :
388+
pbRebalanceProcessForTable.getPartitionsProcessesList()) {
389+
long partitionId = pbRebalanceProcessForPartition.getPartitionId();
390+
391+
for (PbRebalanceProcessForBucket pbRebalanceProcessForBucket :
392+
pbRebalanceProcessForPartition.getBucketsProcessesList()) {
393+
int bucketId = pbRebalanceProcessForBucket.getBucketId();
394+
rebalanceProcess.put(
395+
new TableBucket(tableId, partitionId, bucketId),
396+
toRebalanceResultForBucket(pbRebalanceProcessForBucket));
397+
}
398+
}
399+
400+
for (PbRebalanceProcessForBucket pbRebalanceProcessForBucket :
401+
pbRebalanceProcessForTable.getBucketsProcessesList()) {
402+
int bucketId = pbRebalanceProcessForBucket.getBucketId();
403+
rebalanceProcess.put(
404+
new TableBucket(tableId, null, bucketId),
405+
toRebalanceResultForBucket(pbRebalanceProcessForBucket));
406+
}
407+
}
408+
409+
return rebalanceProcess;
410+
}
411+
412+
private static RebalanceResultForBucket toRebalanceResultForBucket(
413+
PbRebalanceProcessForBucket pbRebalanceProcessForBucket) {
414+
return new RebalanceResultForBucket(
415+
pbRebalanceProcessForBucket.getOriginalReplicas()[0],
416+
pbRebalanceProcessForBucket.getNewReplicas()[0],
417+
Arrays.stream(pbRebalanceProcessForBucket.getOriginalReplicas())
418+
.boxed()
419+
.collect(Collectors.toList()),
420+
Arrays.stream(pbRebalanceProcessForBucket.getNewReplicas())
421+
.boxed()
422+
.collect(Collectors.toList()),
423+
RebalanceStatusForBucket.of(pbRebalanceProcessForBucket.getRebalanceStatus()));
424+
}
425+
373426
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
374427
return response.getPartitionsInfosList().stream()
375428
.map(

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ public List<Integer> targetReplicas() {
6666
return targetReplicas;
6767
}
6868

69+
public RebalanceStatusForBucket status() {
70+
return rebalanceStatusForBucket;
71+
}
72+
6973
public RebalanceResultForBucket markFailed() {
7074
this.rebalanceStatusForBucket = FAILED;
7175
return this;

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public enum RebalanceStatusForBucket {
3737
this.code = code;
3838
}
3939

40+
public int getCode() {
41+
return code;
42+
}
43+
4044
public static RebalanceStatusForBucket of(int code) {
4145
for (RebalanceStatusForBucket status : RebalanceStatusForBucket.values()) {
4246
if (status.code == code) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@
149149
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getPartitionSpec;
150150
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeCreateAclsResponse;
151151
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeDropAclsResponse;
152+
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeListRebalanceProcessResponse;
152153
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeRebalanceRespose;
153154
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
154155
import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment;
@@ -663,7 +664,17 @@ public CompletableFuture<RebalanceResponse> rebalance(RebalanceRequest request)
663664
@Override
664665
public CompletableFuture<ListRebalanceProcessResponse> listRebalanceProcess(
665666
ListRebalanceProcessRequest request) {
666-
throw new UnsupportedOperationException("Support soon!");
667+
if (authorizer != null) {
668+
authorizer.authorize(currentSession(), OperationType.DESCRIBE, Resource.cluster());
669+
}
670+
671+
AccessContextEvent<ListRebalanceProcessResponse> accessContextEvent =
672+
new AccessContextEvent<>(
673+
ctx ->
674+
makeListRebalanceProcessResponse(
675+
ctx.getOngoingRebalanceTasks(),
676+
ctx.getFinishedRebalanceTasks()));
677+
return accessContextEvent.getResultFuture();
667678
}
668679

669680
@Override

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.cluster.ServerNode;
2222
import org.apache.fluss.cluster.ServerType;
2323
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
24+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
2425
import org.apache.fluss.config.ConfigOptions;
2526
import org.apache.fluss.fs.FsPath;
2627
import org.apache.fluss.fs.token.ObtainedSecurityToken;
@@ -69,6 +70,7 @@
6970
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
7071
import org.apache.fluss.rpc.messages.ListOffsetsResponse;
7172
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
73+
import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse;
7274
import org.apache.fluss.rpc.messages.LookupRequest;
7375
import org.apache.fluss.rpc.messages.LookupResponse;
7476
import org.apache.fluss.rpc.messages.MetadataResponse;
@@ -112,6 +114,8 @@
112114
import org.apache.fluss.rpc.messages.PbPutKvRespForBucket;
113115
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
114116
import org.apache.fluss.rpc.messages.PbRebalancePlanForTable;
117+
import org.apache.fluss.rpc.messages.PbRebalanceProcessForBucket;
118+
import org.apache.fluss.rpc.messages.PbRebalanceProcessForTable;
115119
import org.apache.fluss.rpc.messages.PbRemoteLogSegment;
116120
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
117121
import org.apache.fluss.rpc.messages.PbServerNode;
@@ -176,6 +180,7 @@
176180
import java.util.Optional;
177181
import java.util.OptionalInt;
178182
import java.util.Set;
183+
import java.util.function.BiConsumer;
179184
import java.util.stream.Collectors;
180185
import java.util.stream.Stream;
181186

@@ -1721,6 +1726,72 @@ private static PbRebalancePlanForBucket toPbRebalancePlanForBucket(
17211726
return pbRebalancePlanForBucket;
17221727
}
17231728

1729+
public static ListRebalanceProcessResponse makeListRebalanceProcessResponse(
1730+
Map<TableBucket, RebalanceResultForBucket> ongoingRebalanceTasks,
1731+
Map<TableBucket, RebalanceResultForBucket> finishedRebalanceTasks) {
1732+
ListRebalanceProcessResponse response = new ListRebalanceProcessResponse();
1733+
1734+
Map<Long, List<PbRebalanceProcessForBucket>> processForTables = new HashMap<>();
1735+
Map<Long, Map<Long, List<PbRebalanceProcessForBucket>>> processForPartitions =
1736+
new HashMap<>();
1737+
1738+
BiConsumer<TableBucket, RebalanceResultForBucket> collectProcessResult =
1739+
(tableBucket, rebalanceResultForBucket) -> {
1740+
if (tableBucket.getPartitionId() == null) {
1741+
processForTables
1742+
.computeIfAbsent(tableBucket.getTableId(), k -> new ArrayList<>())
1743+
.add(
1744+
toPbRebalanceProcessForBucket(
1745+
tableBucket, rebalanceResultForBucket));
1746+
} else {
1747+
processForPartitions
1748+
.computeIfAbsent(tableBucket.getTableId(), k -> new HashMap<>())
1749+
.computeIfAbsent(
1750+
tableBucket.getPartitionId(), k -> new ArrayList<>())
1751+
.add(
1752+
toPbRebalanceProcessForBucket(
1753+
tableBucket, rebalanceResultForBucket));
1754+
}
1755+
};
1756+
1757+
ongoingRebalanceTasks.forEach(collectProcessResult);
1758+
finishedRebalanceTasks.forEach(collectProcessResult);
1759+
1760+
processForTables.forEach(
1761+
(tableId, processForBuckets) ->
1762+
response.addProcessForTable()
1763+
.setTableId(tableId)
1764+
.addAllBucketsProcesses(processForBuckets));
1765+
processForPartitions.forEach(
1766+
(tableId, processForPartition) -> {
1767+
PbRebalanceProcessForTable processForTable =
1768+
response.addProcessForTable().setTableId(tableId);
1769+
processForPartition.forEach(
1770+
(partitionId, processForBuckets) ->
1771+
processForTable
1772+
.addPartitionsProcess()
1773+
.setPartitionId(partitionId)
1774+
.addAllBucketsProcesses(processForBuckets));
1775+
});
1776+
1777+
return response;
1778+
}
1779+
1780+
private static PbRebalanceProcessForBucket toPbRebalanceProcessForBucket(
1781+
TableBucket tableBucket, RebalanceResultForBucket rebalanceResultForBucket) {
1782+
return new PbRebalanceProcessForBucket()
1783+
.setBucketId(tableBucket.getBucket())
1784+
.setOriginalReplicas(
1785+
rebalanceResultForBucket.replicas().stream()
1786+
.mapToInt(Integer::intValue)
1787+
.toArray())
1788+
.setNewReplicas(
1789+
rebalanceResultForBucket.targetReplicas().stream()
1790+
.mapToInt(Integer::intValue)
1791+
.toArray())
1792+
.setRebalanceStatus(rebalanceResultForBucket.status().getCode());
1793+
}
1794+
17241795
private static <T> Map<TableBucket, T> mergeResponse(
17251796
Map<TableBucket, T> response, Map<TableBucket, T> errors) {
17261797
if (errors.isEmpty()) {

0 commit comments

Comments
 (0)