Skip to content

Commit ac2f7eb

Browse files
committed
[server] Introduce new rebalance API
1 parent 54b515b commit ac2f7eb

File tree

29 files changed

+1577
-2
lines changed

29 files changed

+1577
-2
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
import com.alibaba.fluss.client.metadata.KvSnapshots;
2323
import com.alibaba.fluss.client.metadata.LakeSnapshot;
2424
import com.alibaba.fluss.cluster.ServerNode;
25+
import com.alibaba.fluss.cluster.maintencance.GoalType;
26+
import com.alibaba.fluss.cluster.maintencance.RebalancePlanForBucket;
27+
import com.alibaba.fluss.cluster.maintencance.ServerTag;
2528
import com.alibaba.fluss.config.ConfigOptions;
29+
import com.alibaba.fluss.exception.AuthorizationException;
2630
import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
2731
import com.alibaba.fluss.exception.DatabaseNotEmptyException;
2832
import com.alibaba.fluss.exception.DatabaseNotExistException;
@@ -31,10 +35,15 @@
3135
import com.alibaba.fluss.exception.InvalidReplicationFactorException;
3236
import com.alibaba.fluss.exception.InvalidTableException;
3337
import com.alibaba.fluss.exception.KvSnapshotNotExistException;
38+
import com.alibaba.fluss.exception.NoRebalanceInProgressException;
3439
import com.alibaba.fluss.exception.NonPrimaryKeyTableException;
3540
import com.alibaba.fluss.exception.PartitionAlreadyExistsException;
3641
import com.alibaba.fluss.exception.PartitionNotExistException;
42+
import com.alibaba.fluss.exception.RebalanceFailureException;
3743
import com.alibaba.fluss.exception.SchemaNotExistException;
44+
import com.alibaba.fluss.exception.ServerNotExistException;
45+
import com.alibaba.fluss.exception.ServerTagAlreadyExistException;
46+
import com.alibaba.fluss.exception.ServerTagNotExistException;
3847
import com.alibaba.fluss.exception.TableAlreadyExistException;
3948
import com.alibaba.fluss.exception.TableNotExistException;
4049
import com.alibaba.fluss.exception.TableNotPartitionedException;
@@ -52,9 +61,11 @@
5261
import com.alibaba.fluss.metadata.TablePath;
5362
import com.alibaba.fluss.security.acl.AclBinding;
5463
import com.alibaba.fluss.security.acl.AclBindingFilter;
64+
import com.alibaba.fluss.shaded.netty4.io.netty.util.concurrent.CompleteFuture;
5565

5666
import java.util.Collection;
5767
import java.util.List;
68+
import java.util.Map;
5869
import java.util.concurrent.CompletableFuture;
5970

6071
/**
@@ -450,4 +461,90 @@ ListOffsetsResult listOffsets(
450461
* @return A CompletableFuture indicating completion of the operation.
451462
*/
452463
DropAclsResult dropAcls(Collection<AclBindingFilter> filters);
464+
465+
/**
466+
* Add server tag to the specified tabletServers, one tabletServer can only have one serverTag.
467+
*
468+
* <p>If one tabletServer failed adding tag, none of the tags will take effect.
469+
*
470+
* <ul>
471+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
472+
* access to the cluster.
473+
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
474+
* exist.
475+
* <li>{@link ServerTagAlreadyExistException} If the server tag already exists when {@code
476+
* overWriteIfExists} is false.
477+
* </ul>
478+
*
479+
* @param tabletServers the tabletServers we want to add server tags.
480+
* @param serverTag the server tag to be added.
481+
*/
482+
CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag);
483+
484+
/**
485+
* Remove server tag from the specified tabletServers.
486+
*
487+
* <p>If one tabletServer failed removing tag, none of the tags will be removed.
488+
*
489+
* <ul>
490+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
491+
* access to the cluster.
492+
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
493+
* exist.
494+
* <li>{@link ServerTagNotExistException} If the server tag does not exist when {@code
495+
* overWriteIfExists} is false.
496+
* </ul>
497+
*
498+
* @param tabletServers the tabletServers we want to remove server tags.
499+
*/
500+
CompletableFuture<Void> removeServerTag(List<Integer> tabletServers, ServerTag serverTag);
501+
502+
/**
503+
* Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's
504+
* bucket load.
505+
*
506+
* <p>More details, Fluss collects the cluster's load information and optimizes to perform load
507+
* balancing according to the user-defined {@code priorityGoals}.
508+
*
509+
* <p>Currently, Fluss only supports one active rebalance task in the cluster. If an uncompleted
510+
* rebalance task exists, an {@link RebalanceFailureException} will be thrown.
511+
*
512+
* <ul>
513+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
514+
* access to the cluster.
515+
* <li>{@link RebalanceFailureException} If the rebalance failed. Such as there is an ongoing
516+
* execution.
517+
* </ul>
518+
*
519+
* @param priorityGoals the goals to be optimized.
520+
* @param dryRun Calculate and return the rebalance optimization proposal, but do not execute
521+
* it.
522+
* @return the generated rebalance plan for all the tableBuckets which need to do rebalance.
523+
*/
524+
CompleteFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
525+
List<GoalType> priorityGoals, boolean dryRun);
526+
527+
/**
528+
* List the rebalance process.
529+
*
530+
* <ul>
531+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
532+
* access to the cluster.
533+
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
534+
* </ul>
535+
*
536+
* @return the rebalance process for all the tableBuckets doing rebalance.
537+
*/
538+
CompleteFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess();
539+
540+
/**
541+
* Cannel the rebalance task.
542+
*
543+
* <ul>
544+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
545+
* access to the cluster.
546+
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
547+
* </ul>
548+
*/
549+
CompletableFuture<Void> cancelRebalance();
453550
}

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import com.alibaba.fluss.client.utils.ClientRpcMessageUtils;
2525
import com.alibaba.fluss.cluster.Cluster;
2626
import com.alibaba.fluss.cluster.ServerNode;
27+
import com.alibaba.fluss.cluster.maintencance.GoalType;
28+
import com.alibaba.fluss.cluster.maintencance.RebalancePlanForBucket;
29+
import com.alibaba.fluss.cluster.maintencance.ServerTag;
2730
import com.alibaba.fluss.metadata.DatabaseDescriptor;
2831
import com.alibaba.fluss.metadata.DatabaseInfo;
2932
import com.alibaba.fluss.metadata.PartitionInfo;
@@ -69,6 +72,7 @@
6972
import com.alibaba.fluss.rpc.protocol.ApiError;
7073
import com.alibaba.fluss.security.acl.AclBinding;
7174
import com.alibaba.fluss.security.acl.AclBindingFilter;
75+
import com.alibaba.fluss.shaded.netty4.io.netty.util.concurrent.CompleteFuture;
7276
import com.alibaba.fluss.utils.MapUtils;
7377

7478
import javax.annotation.Nullable;
@@ -465,6 +469,33 @@ public DropAclsResult dropAcls(Collection<AclBindingFilter> filters) {
465469
return result;
466470
}
467471

472+
@Override
473+
public CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag) {
474+
throw new UnsupportedOperationException("Support soon");
475+
}
476+
477+
@Override
478+
public CompletableFuture<Void> removeServerTag(
479+
List<Integer> tabletServers, ServerTag serverTag) {
480+
throw new UnsupportedOperationException("Support soon");
481+
}
482+
483+
@Override
484+
public CompleteFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
485+
List<GoalType> priorityGoals, boolean dryRun) {
486+
throw new UnsupportedOperationException("Support soon");
487+
}
488+
489+
@Override
490+
public CompleteFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess() {
491+
throw new UnsupportedOperationException("Support soon");
492+
}
493+
494+
@Override
495+
public CompletableFuture<Void> cancelRebalance() {
496+
throw new UnsupportedOperationException("Support soon");
497+
}
498+
468499
@Override
469500
public void close() {
470501
// nothing to do yet
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.client.admin;
19+
20+
import com.alibaba.fluss.annotation.PublicEvolving;
21+
import com.alibaba.fluss.cluster.maintencance.RebalanceStatusForBucket;
22+
23+
import java.util.List;
24+
25+
/**
26+
* Result of rebalance process for a tabletBucket.
27+
*
28+
* @since 0.8
29+
*/
30+
@PublicEvolving
31+
public class RebalanceResultForBucket {
32+
33+
private final List<Integer> originReplicas;
34+
private final List<Integer> newReplicas;
35+
private final RebalanceStatusForBucket rebalanceStatusForBucket;
36+
37+
public RebalanceResultForBucket(
38+
List<Integer> originReplicas,
39+
List<Integer> newReplicas,
40+
RebalanceStatusForBucket rebalanceStatusForBucket) {
41+
this.originReplicas = originReplicas;
42+
this.newReplicas = newReplicas;
43+
this.rebalanceStatusForBucket = rebalanceStatusForBucket;
44+
}
45+
46+
public List<Integer> replicas() {
47+
return originReplicas;
48+
}
49+
50+
public List<Integer> newReplicas() {
51+
return newReplicas;
52+
}
53+
54+
public RebalanceStatusForBucket rebalanceStatus() {
55+
return rebalanceStatusForBucket;
56+
}
57+
58+
@Override
59+
public String toString() {
60+
return "BucketReassignment{"
61+
+ "replicas="
62+
+ originReplicas
63+
+ ", newReplicas="
64+
+ newReplicas
65+
+ ", rebalanceStatus="
66+
+ rebalanceStatusForBucket
67+
+ '}';
68+
}
69+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.cluster.maintencance;
19+
20+
import com.alibaba.fluss.annotation.PublicEvolving;
21+
22+
import java.util.Arrays;
23+
24+
/**
25+
* The type of goal to optimize.
26+
*
27+
* @since 0.8
28+
*/
29+
@PublicEvolving
30+
public enum GoalType {
31+
REPLICA_DISTRIBUTION_GOAL(0),
32+
PREFERRED_LEADER_GOAL(1);
33+
34+
public final int value;
35+
36+
GoalType(int value) {
37+
this.value = value;
38+
}
39+
40+
public static GoalType valueOf(int value) {
41+
if (value == REPLICA_DISTRIBUTION_GOAL.value) {
42+
return REPLICA_DISTRIBUTION_GOAL;
43+
} else if (value == PREFERRED_LEADER_GOAL.value) {
44+
return PREFERRED_LEADER_GOAL;
45+
} else {
46+
throw new IllegalArgumentException(
47+
String.format(
48+
"Value %s must be one of %s", value, Arrays.asList(GoalType.values())));
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)