Skip to content

Commit c70310a

Browse files
committed
[server] Introduce new rebalance API
1 parent 54b515b commit c70310a

30 files changed

+1619
-2
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import com.alibaba.fluss.client.metadata.KvSnapshots;
2323
import com.alibaba.fluss.client.metadata.LakeSnapshot;
2424
import com.alibaba.fluss.cluster.ServerNode;
25+
import com.alibaba.fluss.cluster.ServerTag;
2526
import com.alibaba.fluss.config.ConfigOptions;
27+
import com.alibaba.fluss.exception.ClusterAuthorizationException;
2628
import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
2729
import com.alibaba.fluss.exception.DatabaseNotEmptyException;
2830
import com.alibaba.fluss.exception.DatabaseNotExistException;
@@ -31,15 +33,20 @@
3133
import com.alibaba.fluss.exception.InvalidReplicationFactorException;
3234
import com.alibaba.fluss.exception.InvalidTableException;
3335
import com.alibaba.fluss.exception.KvSnapshotNotExistException;
36+
import com.alibaba.fluss.exception.NoRebalanceInProgressException;
3437
import com.alibaba.fluss.exception.NonPrimaryKeyTableException;
3538
import com.alibaba.fluss.exception.PartitionAlreadyExistsException;
3639
import com.alibaba.fluss.exception.PartitionNotExistException;
40+
import com.alibaba.fluss.exception.RebalanceFailureException;
3741
import com.alibaba.fluss.exception.SchemaNotExistException;
42+
import com.alibaba.fluss.exception.ServerNotExistException;
43+
import com.alibaba.fluss.exception.ServerTagAlreadyExistException;
3844
import com.alibaba.fluss.exception.TableAlreadyExistException;
3945
import com.alibaba.fluss.exception.TableNotExistException;
4046
import com.alibaba.fluss.exception.TableNotPartitionedException;
4147
import com.alibaba.fluss.exception.TooManyBucketsException;
4248
import com.alibaba.fluss.exception.TooManyPartitionsException;
49+
import com.alibaba.fluss.maintenance.GoalType;
4350
import com.alibaba.fluss.metadata.DatabaseDescriptor;
4451
import com.alibaba.fluss.metadata.DatabaseInfo;
4552
import com.alibaba.fluss.metadata.PartitionInfo;
@@ -450,4 +457,89 @@ ListOffsetsResult listOffsets(
450457
* @return A CompletableFuture indicating completion of the operation.
451458
*/
452459
DropAclsResult dropAcls(Collection<AclBindingFilter> filters);
460+
461+
/**
462+
* Add server tag to the specified tabletServers, one tabletServer can only have one serverTag.
463+
*
464+
* <p>If the operation fails, none of the tags will take effect.
465+
*
466+
* <ul>
467+
* <li>{@link ClusterAuthorizationException} If the authenticated user doesn't have reset
468+
* config access to the cluster.
469+
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
470+
* exist.
471+
* <li>{@link ServerTagAlreadyExistException} If the server tag already exists when {@code
472+
* overWriteIfExists} is false.
473+
* </ul>
474+
*
475+
* @param tabletServers the tabletServers we want to add server tags.
476+
* @param serverTag the server tag to be added.
477+
* @param overWriteIfExists If true, overwrite the existing server tag.
478+
*/
479+
CompletableFuture<Void> addServerTag(
480+
List<Integer> tabletServers, ServerTag serverTag, boolean overWriteIfExists);
481+
482+
/**
483+
* Remove server tag from the specified tabletServers.
484+
*
485+
* <p>If the operation fails, none of the tags will be removed.
486+
*
487+
* <ul>
488+
* <li>{@link ClusterAuthorizationException} If the authenticated user doesn't have reset
489+
* config access to the cluster.
490+
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
491+
* exist.
492+
* </ul>
493+
*
494+
* @param tabletServers the tabletServers we want to remove server tags.
495+
*/
496+
CompletableFuture<Void> removeServerTag(List<Integer> tabletServers);
497+
498+
/**
499+
* Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's
500+
* bucket load.
501+
*
502+
* <p>More details, Fluss collects the cluster's load information and optimizes to perform load
503+
* balancing according to the user-defined {@code priorityGoals}.
504+
*
505+
* <p>Currently, Fluss only supports one active rebalance task in the cluster. If an uncompleted
506+
* rebalance task exists, an {@link RebalanceFailureException} will be thrown.
507+
*
508+
* <ul>
509+
* <li>{@link ClusterAuthorizationException} If the authenticated user doesn't have reset
510+
* config access to the cluster.
511+
* <li>{@link RebalanceFailureException} If the rebalance failed. Such as there is an ongoing
512+
* execution.
513+
* </ul>
514+
*
515+
* @param priorityGoals the goals to be optimized.
516+
* @param dryRun Calculate and return the rebalance optimization proposal, but do not execute
517+
* it.
518+
* @return the generated rebalance plan.
519+
*/
520+
RebalancePlanResult rebalance(List<GoalType> priorityGoals, boolean dryRun);
521+
522+
/**
523+
* List the rebalance process.
524+
*
525+
* <ul>
526+
* <li>{@link ClusterAuthorizationException} If the authenticated user doesn't have reset
527+
* config access to the cluster.
528+
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
529+
* </ul>
530+
*
531+
* @return the rebalance process for all the tableBuckets which need to do rebalance.
532+
*/
533+
ListRebalanceResult listRebalanceProcess();
534+
535+
/**
536+
* Stop the rebalance process.
537+
*
538+
* <ul>
539+
* <li>{@link ClusterAuthorizationException} If the authenticated user doesn't have reset
540+
* config access to the cluster.
541+
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
542+
* </ul>
543+
*/
544+
CompletableFuture<Void> stopRebalance();
453545
}

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import com.alibaba.fluss.client.utils.ClientRpcMessageUtils;
2525
import com.alibaba.fluss.cluster.Cluster;
2626
import com.alibaba.fluss.cluster.ServerNode;
27+
import com.alibaba.fluss.cluster.ServerTag;
28+
import com.alibaba.fluss.maintenance.GoalType;
2729
import com.alibaba.fluss.metadata.DatabaseDescriptor;
2830
import com.alibaba.fluss.metadata.DatabaseInfo;
2931
import com.alibaba.fluss.metadata.PartitionInfo;
@@ -465,6 +467,32 @@ public DropAclsResult dropAcls(Collection<AclBindingFilter> filters) {
465467
return result;
466468
}
467469

470+
@Override
471+
public CompletableFuture<Void> addServerTag(
472+
List<Integer> tabletServers, ServerTag serverTag, boolean overWriteIfExists) {
473+
throw new UnsupportedOperationException("Support soon");
474+
}
475+
476+
@Override
477+
public CompletableFuture<Void> removeServerTag(List<Integer> tabletServers) {
478+
throw new UnsupportedOperationException("Support soon");
479+
}
480+
481+
@Override
482+
public RebalancePlanResult rebalance(List<GoalType> priorityGoals, boolean dryRun) {
483+
throw new UnsupportedOperationException("Support soon");
484+
}
485+
486+
@Override
487+
public ListRebalanceResult listRebalanceProcess() {
488+
throw new UnsupportedOperationException("Support soon");
489+
}
490+
491+
@Override
492+
public CompletableFuture<Void> stopRebalance() {
493+
throw new UnsupportedOperationException("Support soon");
494+
}
495+
468496
@Override
469497
public void close() {
470498
// nothing to do yet
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.client.admin;
19+
20+
import com.alibaba.fluss.annotation.PublicEvolving;
21+
import com.alibaba.fluss.metadata.TableBucket;
22+
23+
import java.util.Map;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.stream.Collectors;
26+
27+
/**
28+
* The result of {@link Admin#listRebalanceProcess()}.
29+
*
30+
* @since 0.8
31+
*/
32+
@PublicEvolving
33+
public class ListRebalanceResult {
34+
private final Map<TableBucket, CompletableFuture<RebalanceResultForBucket>> futures;
35+
36+
public ListRebalanceResult(
37+
Map<TableBucket, CompletableFuture<RebalanceResultForBucket>> futures) {
38+
this.futures = futures;
39+
}
40+
41+
public CompletableFuture<RebalanceResultForBucket> bucketResult(TableBucket tableBucket) {
42+
return futures.get(tableBucket);
43+
}
44+
45+
public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> all() {
46+
return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]))
47+
.thenApply(
48+
v ->
49+
futures.entrySet().stream()
50+
.collect(
51+
Collectors.toMap(
52+
Map.Entry::getKey,
53+
e -> e.getValue().join())));
54+
}
55+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.client.admin;
19+
20+
import com.alibaba.fluss.annotation.PublicEvolving;
21+
import com.alibaba.fluss.maintenance.RebalancePlanForBucket;
22+
import com.alibaba.fluss.metadata.TableBucket;
23+
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.stream.Collectors;
28+
29+
/**
30+
* The result of {@link Admin#rebalance(List, boolean)}.
31+
*
32+
* @since 0.8
33+
*/
34+
@PublicEvolving
35+
public class RebalancePlanResult {
36+
private final Map<TableBucket, CompletableFuture<RebalancePlanForBucket>> futures;
37+
38+
public RebalancePlanResult(
39+
Map<TableBucket, CompletableFuture<RebalancePlanForBucket>> futures) {
40+
this.futures = futures;
41+
}
42+
43+
public CompletableFuture<RebalancePlanForBucket> bucketResult(TableBucket tableBucket) {
44+
return futures.get(tableBucket);
45+
}
46+
47+
public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> all() {
48+
return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]))
49+
.thenApply(
50+
v ->
51+
futures.entrySet().stream()
52+
.collect(
53+
Collectors.toMap(
54+
Map.Entry::getKey,
55+
e -> e.getValue().join())));
56+
}
57+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.client.admin;
19+
20+
import com.alibaba.fluss.annotation.PublicEvolving;
21+
import com.alibaba.fluss.maintenance.RebalanceStatus;
22+
23+
import java.util.List;
24+
25+
/**
26+
* Result of rebalance process for a tabletBucket.
27+
*
28+
* @since 0.8
29+
*/
30+
@PublicEvolving
31+
public class RebalanceResultForBucket {
32+
33+
private final List<Integer> replicas;
34+
private final List<Integer> newReplicas;
35+
private final List<Integer> removingReplicas;
36+
private final RebalanceStatus rebalanceStatus;
37+
38+
public RebalanceResultForBucket(
39+
List<Integer> replicas,
40+
List<Integer> newReplicas,
41+
List<Integer> removingReplicas,
42+
RebalanceStatus rebalanceStatus) {
43+
this.replicas = replicas;
44+
this.newReplicas = newReplicas;
45+
this.removingReplicas = removingReplicas;
46+
this.rebalanceStatus = rebalanceStatus;
47+
}
48+
49+
public List<Integer> replicas() {
50+
return replicas;
51+
}
52+
53+
public List<Integer> newReplicas() {
54+
return newReplicas;
55+
}
56+
57+
public List<Integer> removingReplicas() {
58+
return removingReplicas;
59+
}
60+
61+
public RebalanceStatus rebalanceStatus() {
62+
return rebalanceStatus;
63+
}
64+
65+
@Override
66+
public String toString() {
67+
return "BucketReassignment{"
68+
+ "replicas="
69+
+ replicas
70+
+ ", newReplicas="
71+
+ newReplicas
72+
+ ", removingReplicas="
73+
+ removingReplicas
74+
+ ", rebalanceStatus="
75+
+ rebalanceStatus
76+
+ '}';
77+
}
78+
}

0 commit comments

Comments
 (0)