Skip to content

Commit edb52c5

Browse files
committed
[server] Introduce new rebalance API
1 parent 1f9de70 commit edb52c5

File tree

29 files changed

+1577
-2
lines changed

29 files changed

+1577
-2
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
import org.apache.fluss.client.metadata.KvSnapshots;
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
2424
import org.apache.fluss.cluster.ServerNode;
25+
import org.apache.fluss.cluster.maintencance.GoalType;
26+
import org.apache.fluss.cluster.maintencance.RebalancePlanForBucket;
27+
import org.apache.fluss.cluster.maintencance.ServerTag;
2528
import org.apache.fluss.config.ConfigOptions;
29+
import org.apache.fluss.exception.AuthorizationException;
2630
import org.apache.fluss.exception.DatabaseAlreadyExistException;
2731
import org.apache.fluss.exception.DatabaseNotEmptyException;
2832
import org.apache.fluss.exception.DatabaseNotExistException;
@@ -32,10 +36,15 @@
3236
import org.apache.fluss.exception.InvalidTableException;
3337
import org.apache.fluss.exception.KvSnapshotNotExistException;
3438
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
39+
import org.apache.fluss.exception.NoRebalanceInProgressException;
3540
import org.apache.fluss.exception.NonPrimaryKeyTableException;
3641
import org.apache.fluss.exception.PartitionAlreadyExistsException;
3742
import org.apache.fluss.exception.PartitionNotExistException;
43+
import org.apache.fluss.exception.RebalanceFailureException;
3844
import org.apache.fluss.exception.SchemaNotExistException;
45+
import org.apache.fluss.exception.ServerNotExistException;
46+
import org.apache.fluss.exception.ServerTagAlreadyExistException;
47+
import org.apache.fluss.exception.ServerTagNotExistException;
3948
import org.apache.fluss.exception.TableAlreadyExistException;
4049
import org.apache.fluss.exception.TableNotExistException;
4150
import org.apache.fluss.exception.TableNotPartitionedException;
@@ -53,9 +62,11 @@
5362
import org.apache.fluss.metadata.TablePath;
5463
import org.apache.fluss.security.acl.AclBinding;
5564
import org.apache.fluss.security.acl.AclBindingFilter;
65+
import org.apache.fluss.shaded.netty4.io.netty.util.concurrent.CompleteFuture;
5666

5767
import java.util.Collection;
5868
import java.util.List;
69+
import java.util.Map;
5970
import java.util.concurrent.CompletableFuture;
6071

6172
/**
@@ -452,4 +463,90 @@ ListOffsetsResult listOffsets(
452463
* @return A CompletableFuture indicating completion of the operation.
453464
*/
454465
DropAclsResult dropAcls(Collection<AclBindingFilter> filters);
466+
467+
/**
468+
* Add server tag to the specified tabletServers, one tabletServer can only have one serverTag.
469+
*
470+
* <p>If one tabletServer failed adding tag, none of the tags will take effect.
471+
*
472+
* <ul>
473+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
474+
* access to the cluster.
475+
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
476+
* exist.
477+
* <li>{@link ServerTagAlreadyExistException} If the server tag already exists when {@code
478+
* overWriteIfExists} is false.
479+
* </ul>
480+
*
481+
* @param tabletServers the tabletServers we want to add server tags.
482+
* @param serverTag the server tag to be added.
483+
*/
484+
CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag);
485+
486+
/**
487+
* Remove server tag from the specified tabletServers.
488+
*
489+
* <p>If one tabletServer failed removing tag, none of the tags will be removed.
490+
*
491+
* <ul>
492+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
493+
* access to the cluster.
494+
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
495+
* exist.
496+
* <li>{@link ServerTagNotExistException} If the server tag does not exist when {@code
497+
* overWriteIfExists} is false.
498+
* </ul>
499+
*
500+
* @param tabletServers the tabletServers we want to remove server tags.
501+
*/
502+
CompletableFuture<Void> removeServerTag(List<Integer> tabletServers, ServerTag serverTag);
503+
504+
/**
505+
* Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's
506+
* bucket load.
507+
*
508+
* <p>More details, Fluss collects the cluster's load information and optimizes to perform load
509+
* balancing according to the user-defined {@code priorityGoals}.
510+
*
511+
* <p>Currently, Fluss only supports one active rebalance task in the cluster. If an uncompleted
512+
* rebalance task exists, an {@link RebalanceFailureException} will be thrown.
513+
*
514+
* <ul>
515+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
516+
* access to the cluster.
517+
* <li>{@link RebalanceFailureException} If the rebalance failed. Such as there is an ongoing
518+
* execution.
519+
* </ul>
520+
*
521+
* @param priorityGoals the goals to be optimized.
522+
* @param dryRun Calculate and return the rebalance optimization proposal, but do not execute
523+
* it.
524+
* @return the generated rebalance plan for all the tableBuckets which need to do rebalance.
525+
*/
526+
CompleteFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
527+
List<GoalType> priorityGoals, boolean dryRun);
528+
529+
/**
530+
* List the rebalance process.
531+
*
532+
* <ul>
533+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
534+
* access to the cluster.
535+
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
536+
* </ul>
537+
*
538+
* @return the rebalance process for all the tableBuckets doing rebalance.
539+
*/
540+
CompleteFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess();
541+
542+
/**
543+
* Cannel the rebalance task.
544+
*
545+
* <ul>
546+
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
547+
* access to the cluster.
548+
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
549+
* </ul>
550+
*/
551+
CompletableFuture<Void> cancelRebalance();
455552
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
2525
import org.apache.fluss.cluster.Cluster;
2626
import org.apache.fluss.cluster.ServerNode;
27+
import org.apache.fluss.cluster.maintencance.GoalType;
28+
import org.apache.fluss.cluster.maintencance.RebalancePlanForBucket;
29+
import org.apache.fluss.cluster.maintencance.ServerTag;
2730
import org.apache.fluss.exception.LeaderNotAvailableException;
2831
import org.apache.fluss.metadata.DatabaseDescriptor;
2932
import org.apache.fluss.metadata.DatabaseInfo;
@@ -70,6 +73,7 @@
7073
import org.apache.fluss.rpc.protocol.ApiError;
7174
import org.apache.fluss.security.acl.AclBinding;
7275
import org.apache.fluss.security.acl.AclBindingFilter;
76+
import org.apache.fluss.shaded.netty4.io.netty.util.concurrent.CompleteFuture;
7377
import org.apache.fluss.utils.MapUtils;
7478

7579
import javax.annotation.Nullable;
@@ -464,6 +468,33 @@ public DropAclsResult dropAcls(Collection<AclBindingFilter> filters) {
464468
return result;
465469
}
466470

471+
@Override
472+
public CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag) {
473+
throw new UnsupportedOperationException("Support soon");
474+
}
475+
476+
@Override
477+
public CompletableFuture<Void> removeServerTag(
478+
List<Integer> tabletServers, ServerTag serverTag) {
479+
throw new UnsupportedOperationException("Support soon");
480+
}
481+
482+
@Override
483+
public CompleteFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
484+
List<GoalType> priorityGoals, boolean dryRun) {
485+
throw new UnsupportedOperationException("Support soon");
486+
}
487+
488+
@Override
489+
public CompleteFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess() {
490+
throw new UnsupportedOperationException("Support soon");
491+
}
492+
493+
@Override
494+
public CompletableFuture<Void> cancelRebalance() {
495+
throw new UnsupportedOperationException("Support soon");
496+
}
497+
467498
@Override
468499
public void close() {
469500
// nothing to do yet
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.admin;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.cluster.maintencance.RebalanceStatusForBucket;
22+
23+
import java.util.List;
24+
25+
/**
26+
* Result of rebalance process for a tabletBucket.
27+
*
28+
* @since 0.8
29+
*/
30+
@PublicEvolving
31+
public class RebalanceResultForBucket {
32+
33+
private final List<Integer> originReplicas;
34+
private final List<Integer> newReplicas;
35+
private final RebalanceStatusForBucket rebalanceStatusForBucket;
36+
37+
public RebalanceResultForBucket(
38+
List<Integer> originReplicas,
39+
List<Integer> newReplicas,
40+
RebalanceStatusForBucket rebalanceStatusForBucket) {
41+
this.originReplicas = originReplicas;
42+
this.newReplicas = newReplicas;
43+
this.rebalanceStatusForBucket = rebalanceStatusForBucket;
44+
}
45+
46+
public List<Integer> replicas() {
47+
return originReplicas;
48+
}
49+
50+
public List<Integer> newReplicas() {
51+
return newReplicas;
52+
}
53+
54+
public RebalanceStatusForBucket rebalanceStatus() {
55+
return rebalanceStatusForBucket;
56+
}
57+
58+
@Override
59+
public String toString() {
60+
return "BucketReassignment{"
61+
+ "replicas="
62+
+ originReplicas
63+
+ ", newReplicas="
64+
+ newReplicas
65+
+ ", rebalanceStatus="
66+
+ rebalanceStatusForBucket
67+
+ '}';
68+
}
69+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.cluster.maintencance;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
import java.util.Arrays;
23+
24+
/**
25+
* The type of goal to optimize.
26+
*
27+
* @since 0.8
28+
*/
29+
@PublicEvolving
30+
public enum GoalType {
31+
REPLICA_DISTRIBUTION_GOAL(0),
32+
PREFERRED_LEADER_GOAL(1);
33+
34+
public final int value;
35+
36+
GoalType(int value) {
37+
this.value = value;
38+
}
39+
40+
public static GoalType valueOf(int value) {
41+
if (value == REPLICA_DISTRIBUTION_GOAL.value) {
42+
return REPLICA_DISTRIBUTION_GOAL;
43+
} else if (value == PREFERRED_LEADER_GOAL.value) {
44+
return PREFERRED_LEADER_GOAL;
45+
} else {
46+
throw new IllegalArgumentException(
47+
String.format(
48+
"Value %s must be one of %s", value, Arrays.asList(GoalType.values())));
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)