Skip to content

Commit 746ec3c

Browse files
committed
[server] Introduce new rebalance API
1 parent 434a4f4 commit 746ec3c

File tree

27 files changed

+1562
-2
lines changed

27 files changed

+1562
-2
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,12 @@
2222
import org.apache.fluss.client.metadata.KvSnapshots;
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
2424
import org.apache.fluss.cluster.ServerNode;
25+
import org.apache.fluss.cluster.rebalance.GoalType;
26+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
27+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
28+
import org.apache.fluss.cluster.rebalance.ServerTag;
2529
import org.apache.fluss.config.ConfigOptions;
30+
import org.apache.fluss.exception.AuthorizationException;
2631
import org.apache.fluss.exception.DatabaseAlreadyExistException;
2732
import org.apache.fluss.exception.DatabaseNotEmptyException;
2833
import org.apache.fluss.exception.DatabaseNotExistException;
@@ -32,10 +37,15 @@
3237
import org.apache.fluss.exception.InvalidTableException;
3338
import org.apache.fluss.exception.KvSnapshotNotExistException;
3439
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
40+
import org.apache.fluss.exception.NoRebalanceInProgressException;
3541
import org.apache.fluss.exception.NonPrimaryKeyTableException;
3642
import org.apache.fluss.exception.PartitionAlreadyExistsException;
3743
import org.apache.fluss.exception.PartitionNotExistException;
44+
import org.apache.fluss.exception.RebalanceFailureException;
3845
import org.apache.fluss.exception.SchemaNotExistException;
46+
import org.apache.fluss.exception.ServerNotExistException;
47+
import org.apache.fluss.exception.ServerTagAlreadyExistException;
48+
import org.apache.fluss.exception.ServerTagNotExistException;
3949
import org.apache.fluss.exception.TableAlreadyExistException;
4050
import org.apache.fluss.exception.TableNotExistException;
4151
import org.apache.fluss.exception.TableNotPartitionedException;
@@ -56,6 +66,7 @@
5666

5767
import java.util.Collection;
5868
import java.util.List;
69+
import java.util.Map;
5970
import java.util.concurrent.CompletableFuture;
6071

6172
/**
@@ -452,4 +463,90 @@ ListOffsetsResult listOffsets(
452463
* @return A CompletableFuture indicating completion of the operation.
453464
*/
454465
DropAclsResult dropAcls(Collection<AclBindingFilter> filters);
466+
467+
/**
468+
* Add server tag to the specified tabletServers, one tabletServer can only have one serverTag.
469+
*
470+
* <p>If one tabletServer failed adding tag, none of the tags will take effect.
471+
*
472+
* <ul>
473+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
474+
* access to the cluster.
475+
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
476+
* exist.
477+
* <li>{@link ServerTagAlreadyExistException} If the server tag already exists when {@code
478+
* overWriteIfExists} is false.
479+
* </ul>
480+
*
481+
* @param tabletServers the tabletServers we want to add server tags.
482+
* @param serverTag the server tag to be added.
483+
*/
484+
CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag);
485+
486+
/**
487+
* Remove server tag from the specified tabletServers.
488+
*
489+
* <p>If one tabletServer failed removing tag, none of the tags will be removed.
490+
*
491+
* <ul>
492+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
493+
* access to the cluster.
494+
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
495+
* exist.
496+
* <li>{@link ServerTagNotExistException} If the server tag does not exist when {@code
497+
* overWriteIfExists} is false.
498+
* </ul>
499+
*
500+
* @param tabletServers the tabletServers we want to remove server tags.
501+
*/
502+
CompletableFuture<Void> removeServerTag(List<Integer> tabletServers, ServerTag serverTag);
503+
504+
/**
505+
* Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's
506+
* bucket load.
507+
*
508+
* <p>More details, Fluss collects the cluster's load information and optimizes to perform load
509+
* balancing according to the user-defined {@code priorityGoals}.
510+
*
511+
* <p>Currently, Fluss only supports one active rebalance task in the cluster. If an uncompleted
512+
* rebalance task exists, an {@link RebalanceFailureException} will be thrown.
513+
*
514+
* <ul>
515+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
516+
* access to the cluster.
517+
* <li>{@link RebalanceFailureException} If the rebalance failed. Such as there is an ongoing
518+
* execution.
519+
* </ul>
520+
*
521+
* @param priorityGoals the goals to be optimized.
522+
* @param dryRun Calculate and return the rebalance optimization proposal, but do not execute
523+
* it.
524+
* @return the generated rebalance plan for all the tableBuckets which need to do rebalance.
525+
*/
526+
CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
527+
List<GoalType> priorityGoals, boolean dryRun);
528+
529+
/**
530+
* List the rebalance process.
531+
*
532+
* <ul>
533+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
534+
* access to the cluster.
535+
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
536+
* </ul>
537+
*
538+
* @return the rebalance process for all the tableBuckets doing rebalance.
539+
*/
540+
CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess();
541+
542+
/**
543+
* Cannel the rebalance task.
544+
*
545+
* <ul>
546+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
547+
* access to the cluster.
548+
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
549+
* </ul>
550+
*/
551+
CompletableFuture<Void> cancelRebalance();
455552
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
2525
import org.apache.fluss.cluster.Cluster;
2626
import org.apache.fluss.cluster.ServerNode;
27+
import org.apache.fluss.cluster.rebalance.GoalType;
28+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
29+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
30+
import org.apache.fluss.cluster.rebalance.ServerTag;
2731
import org.apache.fluss.exception.LeaderNotAvailableException;
2832
import org.apache.fluss.metadata.DatabaseDescriptor;
2933
import org.apache.fluss.metadata.DatabaseInfo;
@@ -464,6 +468,33 @@ public DropAclsResult dropAcls(Collection<AclBindingFilter> filters) {
464468
return result;
465469
}
466470

471+
@Override
472+
public CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag) {
473+
throw new UnsupportedOperationException("Support soon");
474+
}
475+
476+
@Override
477+
public CompletableFuture<Void> removeServerTag(
478+
List<Integer> tabletServers, ServerTag serverTag) {
479+
throw new UnsupportedOperationException("Support soon");
480+
}
481+
482+
@Override
483+
public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
484+
List<GoalType> priorityGoals, boolean dryRun) {
485+
throw new UnsupportedOperationException("Support soon");
486+
}
487+
488+
@Override
489+
public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess() {
490+
throw new UnsupportedOperationException("Support soon");
491+
}
492+
493+
@Override
494+
public CompletableFuture<Void> cancelRebalance() {
495+
throw new UnsupportedOperationException("Support soon");
496+
}
497+
467498
@Override
468499
public void close() {
469500
// nothing to do yet
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.cluster.rebalance;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
import java.util.Arrays;
23+
24+
/**
25+
* The type of goal to optimize.
26+
*
27+
* @since 0.8
28+
*/
29+
@PublicEvolving
30+
public enum GoalType {
31+
/**
32+
* Goal to generate replica movement tasks to ensure that the number of replicas on each
33+
* tabletServer is near balanced.
34+
*/
35+
REPLICA_DISTRIBUTION_GOAL(0),
36+
37+
/**
38+
* Goal to generate leadership movement and leader replica movement tasks to ensure that the
39+
* number of leader replicas on each tabletServer is near balanced.
40+
*/
41+
LEADER_REPLICA_DISTRIBUTION_GOAL(1),
42+
43+
/** Goal to move the leaders to the first replica of each tableBuckets. */
44+
PREFERRED_LEADER_GOAL(2);
45+
46+
public final int value;
47+
48+
GoalType(int value) {
49+
this.value = value;
50+
}
51+
52+
public static GoalType valueOf(int value) {
53+
if (value == REPLICA_DISTRIBUTION_GOAL.value) {
54+
return REPLICA_DISTRIBUTION_GOAL;
55+
} else if (value == LEADER_REPLICA_DISTRIBUTION_GOAL.value) {
56+
return LEADER_REPLICA_DISTRIBUTION_GOAL;
57+
} else if (value == PREFERRED_LEADER_GOAL.value) {
58+
return PREFERRED_LEADER_GOAL;
59+
} else {
60+
throw new IllegalArgumentException(
61+
String.format(
62+
"Value %s must be one of %s", value, Arrays.asList(GoalType.values())));
63+
}
64+
}
65+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.cluster.rebalance;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
import java.util.List;
24+
import java.util.Objects;
25+
26+
/**
27+
* a generated rebalance plan for a tableBucket.
28+
*
29+
* @since 0.8
30+
*/
31+
@PublicEvolving
32+
public class RebalancePlanForBucket {
33+
private final TableBucket tableBucket;
34+
private final int originalLeader;
35+
private final int newLeader;
36+
private final List<Integer> originReplicas;
37+
private final List<Integer> newReplicas;
38+
39+
public RebalancePlanForBucket(
40+
TableBucket tableBucket,
41+
int originalLeader,
42+
int newLeader,
43+
List<Integer> originReplicas,
44+
List<Integer> newReplicas) {
45+
this.tableBucket = tableBucket;
46+
this.originalLeader = originalLeader;
47+
this.newLeader = newLeader;
48+
this.originReplicas = originReplicas;
49+
this.newReplicas = newReplicas;
50+
}
51+
52+
public TableBucket getTableBucket() {
53+
return tableBucket;
54+
}
55+
56+
public int getBucketId() {
57+
return tableBucket.getBucket();
58+
}
59+
60+
public Integer getOriginalLeader() {
61+
return originalLeader;
62+
}
63+
64+
public Integer getNewLeader() {
65+
return newLeader;
66+
}
67+
68+
public List<Integer> getOriginReplicas() {
69+
return originReplicas;
70+
}
71+
72+
public List<Integer> getNewReplicas() {
73+
return newReplicas;
74+
}
75+
76+
@Override
77+
public String toString() {
78+
return "RebalancePlanForBucket{"
79+
+ "tableBucket="
80+
+ tableBucket
81+
+ ", originalLeader="
82+
+ originalLeader
83+
+ ", newLeader="
84+
+ newLeader
85+
+ ", originReplicas="
86+
+ originReplicas
87+
+ ", newReplicas="
88+
+ newReplicas
89+
+ '}';
90+
}
91+
92+
@Override
93+
public boolean equals(Object o) {
94+
if (this == o) {
95+
return true;
96+
}
97+
if (o == null || getClass() != o.getClass()) {
98+
return false;
99+
}
100+
RebalancePlanForBucket that = (RebalancePlanForBucket) o;
101+
return Objects.equals(tableBucket, that.tableBucket)
102+
&& originalLeader == that.originalLeader
103+
&& newLeader == that.newLeader
104+
&& Objects.equals(originReplicas, that.originReplicas)
105+
&& Objects.equals(newReplicas, that.newReplicas);
106+
}
107+
108+
@Override
109+
public int hashCode() {
110+
return Objects.hash(tableBucket, originalLeader, newLeader, originReplicas, newReplicas);
111+
}
112+
}

0 commit comments

Comments
 (0)