Skip to content

Commit 72579c8

Browse files
committed
[server] Introduce new rebalance API
1 parent b432b14 commit 72579c8

File tree

27 files changed

+1557
-2
lines changed

27 files changed

+1557
-2
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,14 @@
2222
import org.apache.fluss.client.metadata.KvSnapshots;
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
2424
import org.apache.fluss.cluster.ServerNode;
25+
import org.apache.fluss.cluster.rebalance.GoalType;
26+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
27+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
28+
import org.apache.fluss.cluster.rebalance.ServerTag;
2529
import org.apache.fluss.config.ConfigOptions;
2630
import org.apache.fluss.config.cluster.AlterConfig;
2731
import org.apache.fluss.config.cluster.ConfigEntry;
32+
import org.apache.fluss.exception.AuthorizationException;
2833
import org.apache.fluss.exception.DatabaseAlreadyExistException;
2934
import org.apache.fluss.exception.DatabaseNotEmptyException;
3035
import org.apache.fluss.exception.DatabaseNotExistException;
@@ -35,10 +40,15 @@
3540
import org.apache.fluss.exception.InvalidTableException;
3641
import org.apache.fluss.exception.KvSnapshotNotExistException;
3742
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
43+
import org.apache.fluss.exception.NoRebalanceInProgressException;
3844
import org.apache.fluss.exception.NonPrimaryKeyTableException;
3945
import org.apache.fluss.exception.PartitionAlreadyExistsException;
4046
import org.apache.fluss.exception.PartitionNotExistException;
47+
import org.apache.fluss.exception.RebalanceFailureException;
4148
import org.apache.fluss.exception.SchemaNotExistException;
49+
import org.apache.fluss.exception.ServerNotExistException;
50+
import org.apache.fluss.exception.ServerTagAlreadyExistException;
51+
import org.apache.fluss.exception.ServerTagNotExistException;
4252
import org.apache.fluss.exception.TableAlreadyExistException;
4353
import org.apache.fluss.exception.TableNotExistException;
4454
import org.apache.fluss.exception.TableNotPartitionedException;
@@ -60,6 +70,7 @@
6070

6171
import java.util.Collection;
6272
import java.util.List;
73+
import java.util.Map;
6374
import java.util.concurrent.CompletableFuture;
6475

6576
/**
@@ -492,4 +503,90 @@ ListOffsetsResult listOffsets(
492503
* @return A CompletableFuture indicating completion of the operation.
493504
*/
494505
CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> configs);
506+
507+
/**
508+
* Add server tag to the specified tabletServers, one tabletServer can only have one serverTag.
509+
*
510+
* <p>If one tabletServer failed adding tag, none of the tags will take effect.
511+
*
512+
* <ul>
513+
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
514+
* permissions.
515+
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
516+
* exist.
517+
* <li>{@link ServerTagAlreadyExistException} If the server tag already exists for any one of
518+
* the tabletServers.
519+
* </ul>
520+
*
521+
* @param tabletServers the tabletServers we want to add server tags.
522+
* @param serverTag the server tag to be added.
523+
*/
524+
CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag);
525+
526+
/**
527+
* Remove server tag from the specified tabletServers.
528+
*
529+
* <p>If one tabletServer failed removing tag, none of the tags will be removed.
530+
*
531+
* <ul>
532+
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
533+
* permissions.
534+
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
535+
* exist.
536+
* <li>{@link ServerTagNotExistException} If the server tag does not exist for any one of the
537+
* tabletServers.
538+
* </ul>
539+
*
540+
* @param tabletServers the tabletServers we want to remove server tags.
541+
*/
542+
CompletableFuture<Void> removeServerTag(List<Integer> tabletServers, ServerTag serverTag);
543+
544+
/**
545+
* Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's
546+
* bucket load.
547+
*
548+
* <p>More details, Fluss collects the cluster's load information and optimizes to perform load
549+
* balancing according to the user-defined {@code priorityGoals}.
550+
*
551+
* <p>Currently, Fluss only supports one active rebalance task in the cluster. If an uncompleted
552+
* rebalance task exists, an {@link RebalanceFailureException} will be thrown.
553+
*
554+
* <ul>
555+
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
556+
* permissions.
557+
* <li>{@link RebalanceFailureException} If the rebalance failed. Such as there is an ongoing
558+
* execution.
559+
* </ul>
560+
*
561+
* @param priorityGoals the goals to be optimized.
562+
* @param dryRun Calculate and return the rebalance optimization proposal, but do not execute
563+
* it.
564+
* @return the generated rebalance plan for all the tableBuckets which need to do rebalance.
565+
*/
566+
CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
567+
List<GoalType> priorityGoals, boolean dryRun);
568+
569+
/**
570+
* List the rebalance process.
571+
*
572+
* <ul>
573+
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
574+
* permissions.
575+
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
576+
* </ul>
577+
*
578+
* @return the rebalance process for all the tableBuckets doing rebalance.
579+
*/
580+
CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess();
581+
582+
/**
583+
* Cannel the rebalance task.
584+
*
585+
* <ul>
586+
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
587+
* permissions.
588+
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
589+
* </ul>
590+
*/
591+
CompletableFuture<Void> cancelRebalance();
495592
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
2525
import org.apache.fluss.cluster.Cluster;
2626
import org.apache.fluss.cluster.ServerNode;
27+
import org.apache.fluss.cluster.rebalance.GoalType;
28+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
29+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
30+
import org.apache.fluss.cluster.rebalance.ServerTag;
2731
import org.apache.fluss.config.cluster.AlterConfig;
2832
import org.apache.fluss.config.cluster.ConfigEntry;
2933
import org.apache.fluss.exception.LeaderNotAvailableException;
@@ -535,6 +539,33 @@ public CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> confi
535539
return future;
536540
}
537541

542+
@Override
543+
public CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag) {
544+
throw new UnsupportedOperationException("Support soon");
545+
}
546+
547+
@Override
548+
public CompletableFuture<Void> removeServerTag(
549+
List<Integer> tabletServers, ServerTag serverTag) {
550+
throw new UnsupportedOperationException("Support soon");
551+
}
552+
553+
@Override
554+
public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
555+
List<GoalType> priorityGoals, boolean dryRun) {
556+
throw new UnsupportedOperationException("Support soon");
557+
}
558+
559+
@Override
560+
public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess() {
561+
throw new UnsupportedOperationException("Support soon");
562+
}
563+
564+
@Override
565+
public CompletableFuture<Void> cancelRebalance() {
566+
throw new UnsupportedOperationException("Support soon");
567+
}
568+
538569
@Override
539570
public void close() {
540571
// nothing to do yet
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.cluster.rebalance;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
import java.util.Arrays;
23+
24+
/**
25+
* The type of goal to optimize.
26+
*
27+
* @since 0.8
28+
*/
29+
@PublicEvolving
30+
public enum GoalType {
31+
/**
32+
* Goal to generate replica movement tasks to ensure that the number of replicas on each
33+
* tabletServer is near balanced.
34+
*/
35+
REPLICA_DISTRIBUTION_GOAL(0),
36+
37+
/**
38+
* Goal to generate leadership movement and leader replica movement tasks to ensure that the
39+
* number of leader replicas on each tabletServer is near balanced.
40+
*/
41+
LEADER_REPLICA_DISTRIBUTION_GOAL(1);
42+
43+
public final int value;
44+
45+
GoalType(int value) {
46+
this.value = value;
47+
}
48+
49+
public static GoalType valueOf(int value) {
50+
if (value == REPLICA_DISTRIBUTION_GOAL.value) {
51+
return REPLICA_DISTRIBUTION_GOAL;
52+
} else if (value == LEADER_REPLICA_DISTRIBUTION_GOAL.value) {
53+
return LEADER_REPLICA_DISTRIBUTION_GOAL;
54+
} else {
55+
throw new IllegalArgumentException(
56+
String.format(
57+
"Value %s must be one of %s", value, Arrays.asList(GoalType.values())));
58+
}
59+
}
60+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.cluster.rebalance;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
import java.util.List;
24+
import java.util.Objects;
25+
26+
/**
27+
* a generated rebalance plan for a tableBucket.
28+
*
29+
* @since 0.8
30+
*/
31+
@PublicEvolving
32+
public class RebalancePlanForBucket {
33+
private final TableBucket tableBucket;
34+
private final int originalLeader;
35+
private final int newLeader;
36+
private final List<Integer> originReplicas;
37+
private final List<Integer> newReplicas;
38+
39+
public RebalancePlanForBucket(
40+
TableBucket tableBucket,
41+
int originalLeader,
42+
int newLeader,
43+
List<Integer> originReplicas,
44+
List<Integer> newReplicas) {
45+
this.tableBucket = tableBucket;
46+
this.originalLeader = originalLeader;
47+
this.newLeader = newLeader;
48+
this.originReplicas = originReplicas;
49+
this.newReplicas = newReplicas;
50+
}
51+
52+
public TableBucket getTableBucket() {
53+
return tableBucket;
54+
}
55+
56+
public int getBucketId() {
57+
return tableBucket.getBucket();
58+
}
59+
60+
public Integer getOriginalLeader() {
61+
return originalLeader;
62+
}
63+
64+
public Integer getNewLeader() {
65+
return newLeader;
66+
}
67+
68+
public List<Integer> getOriginReplicas() {
69+
return originReplicas;
70+
}
71+
72+
public List<Integer> getNewReplicas() {
73+
return newReplicas;
74+
}
75+
76+
@Override
77+
public String toString() {
78+
return "RebalancePlanForBucket{"
79+
+ "tableBucket="
80+
+ tableBucket
81+
+ ", originalLeader="
82+
+ originalLeader
83+
+ ", newLeader="
84+
+ newLeader
85+
+ ", originReplicas="
86+
+ originReplicas
87+
+ ", newReplicas="
88+
+ newReplicas
89+
+ '}';
90+
}
91+
92+
@Override
93+
public boolean equals(Object o) {
94+
if (this == o) {
95+
return true;
96+
}
97+
if (o == null || getClass() != o.getClass()) {
98+
return false;
99+
}
100+
RebalancePlanForBucket that = (RebalancePlanForBucket) o;
101+
return Objects.equals(tableBucket, that.tableBucket)
102+
&& originalLeader == that.originalLeader
103+
&& newLeader == that.newLeader
104+
&& Objects.equals(originReplicas, that.originReplicas)
105+
&& Objects.equals(newReplicas, that.newReplicas);
106+
}
107+
108+
@Override
109+
public int hashCode() {
110+
return Objects.hash(tableBucket, originalLeader, newLeader, originReplicas, newReplicas);
111+
}
112+
}

0 commit comments

Comments
 (0)