Skip to content

Commit 9ddb717

Browse files
committed
address jark's comments
1 parent 1ea02c2 commit 9ddb717

File tree

25 files changed

+245
-133
lines changed

25 files changed

+245
-133
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
2424
import org.apache.fluss.cluster.ServerNode;
2525
import org.apache.fluss.cluster.rebalance.GoalType;
26-
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
27-
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
2826
import org.apache.fluss.cluster.rebalance.ServerTag;
2927
import org.apache.fluss.config.ConfigOptions;
3028
import org.apache.fluss.config.cluster.AlterConfig;
@@ -70,7 +68,6 @@
7068

7169
import java.util.Collection;
7270
import java.util.List;
73-
import java.util.Map;
7471
import java.util.concurrent.CompletableFuture;
7572

7673
/**
@@ -509,13 +506,16 @@ ListOffsetsResult listOffsets(
509506
*
510507
* <p>If one tabletServer failed adding tag, none of the tags will take effect.
511508
*
509+
* <p>If one tabletServer already has a serverTag, and the serverTag is same with the existing
510+
* one, this operation will be ignored.
511+
*
512512
* <ul>
513513
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
514514
* permissions.
515515
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
516516
* exist.
517517
* <li>{@link ServerTagAlreadyExistException} If the server tag already exists for any one of
518-
* the tabletServers.
518+
* the tabletServers, and the server tag is different from the existing one.
519519
* </ul>
520520
*
521521
* @param tabletServers the tabletServers we want to add server tags.
@@ -528,6 +528,8 @@ ListOffsetsResult listOffsets(
528528
*
529529
* <p>If one tabletServer failed removing tag, none of the tags will be removed.
530530
*
531+
* <p>No exception will be thrown if the server already has no any server tag now.
532+
*
531533
* <ul>
532534
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
533535
* permissions.
@@ -563,21 +565,20 @@ ListOffsetsResult listOffsets(
563565
* it.
564566
* @return the generated rebalance plan for all the tableBuckets which need to do rebalance.
565567
*/
566-
CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
567-
List<GoalType> priorityGoals, boolean dryRun);
568+
CompletableFuture<RebalancePlan> rebalance(List<GoalType> priorityGoals, boolean dryRun);
568569

569570
/**
570-
* List the rebalance process.
571+
* List the rebalance progress.
571572
*
572573
* <ul>
573574
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
574575
* permissions.
575576
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
576577
* </ul>
577578
*
578-
* @return the rebalance process for all the tableBuckets doing rebalance.
579+
* @return the rebalance process.
579580
*/
580-
CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess();
581+
CompletableFuture<RebalanceProgress> listRebalanceProgress();
581582

582583
/**
583584
* Cannel the rebalance task.

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import org.apache.fluss.cluster.Cluster;
2626
import org.apache.fluss.cluster.ServerNode;
2727
import org.apache.fluss.cluster.rebalance.GoalType;
28-
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
29-
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
3028
import org.apache.fluss.cluster.rebalance.ServerTag;
3129
import org.apache.fluss.config.cluster.AlterConfig;
3230
import org.apache.fluss.config.cluster.ConfigEntry;
@@ -547,13 +545,13 @@ public CompletableFuture<Void> removeServerTag(
547545
}
548546

549547
@Override
550-
public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
548+
public CompletableFuture<RebalancePlan> rebalance(
551549
List<GoalType> priorityGoals, boolean dryRun) {
552550
throw new UnsupportedOperationException("Support soon");
553551
}
554552

555553
@Override
556-
public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess() {
554+
public CompletableFuture<RebalanceProgress> listRebalanceProgress() {
557555
throw new UnsupportedOperationException("Support soon");
558556
}
559557

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.admin;
19+
20+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
import java.util.Map;
24+
25+
/**
26+
* The rebalance plan.
27+
*
28+
* @since 0.9
29+
*/
30+
public class RebalancePlan {
31+
32+
private final Map<TableBucket, RebalancePlanForBucket> planForBucketMap;
33+
34+
public RebalancePlan(Map<TableBucket, RebalancePlanForBucket> planForBucketMap) {
35+
this.planForBucketMap = planForBucketMap;
36+
}
37+
38+
public Map<TableBucket, RebalancePlanForBucket> getPlanForBucketMap() {
39+
return planForBucketMap;
40+
}
41+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.admin;
19+
20+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
21+
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
22+
import org.apache.fluss.metadata.TableBucket;
23+
24+
import java.util.Map;
25+
26+
/**
27+
* The rebalance progress.
28+
*
29+
* @since 0.9
30+
*/
31+
public class RebalanceProgress {
32+
33+
/** The rebalance status for the overall rebalance. */
34+
private final RebalanceStatus rebalanceStatus;
35+
36+
/** The rebalance progress for the overall rebalance. Between 0.0d to 1.0d */
37+
private final double progress;
38+
39+
/** The rebalance progress for each tabletBucket. */
40+
private final Map<TableBucket, RebalanceResultForBucket> processForBucketMap;
41+
42+
public RebalanceProgress(
43+
RebalanceStatus rebalanceStatus,
44+
double progress,
45+
Map<TableBucket, RebalanceResultForBucket> processForBucketMap) {
46+
this.rebalanceStatus = rebalanceStatus;
47+
this.progress = progress;
48+
this.processForBucketMap = processForBucketMap;
49+
}
50+
51+
public RebalanceStatus status() {
52+
return rebalanceStatus;
53+
}
54+
55+
public double progress() {
56+
return progress;
57+
}
58+
59+
public Map<TableBucket, RebalanceResultForBucket> processForBucketMap() {
60+
return processForBucketMap;
61+
}
62+
}

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
/**
2525
* The type of goal to optimize.
2626
*
27-
* @since 0.8
27+
* @since 0.9
2828
*/
2929
@PublicEvolving
3030
public enum GoalType {
@@ -38,7 +38,7 @@ public enum GoalType {
3838
* Goal to generate leadership movement and leader replica movement tasks to ensure that the
3939
* number of leader replicas on each tabletServer is near balanced.
4040
*/
41-
LEADER_REPLICA_DISTRIBUTION_GOAL(1);
41+
LEADER_DISTRIBUTION_GOAL(1);
4242

4343
public final int value;
4444

@@ -49,8 +49,8 @@ public enum GoalType {
4949
public static GoalType valueOf(int value) {
5050
if (value == REPLICA_DISTRIBUTION_GOAL.value) {
5151
return REPLICA_DISTRIBUTION_GOAL;
52-
} else if (value == LEADER_REPLICA_DISTRIBUTION_GOAL.value) {
53-
return LEADER_REPLICA_DISTRIBUTION_GOAL;
52+
} else if (value == LEADER_DISTRIBUTION_GOAL.value) {
53+
return LEADER_DISTRIBUTION_GOAL;
5454
} else {
5555
throw new IllegalArgumentException(
5656
String.format(

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
/**
2727
* a generated rebalance plan for a tableBucket.
2828
*
29-
* @since 0.8
29+
* @since 0.9
3030
*/
3131
@PublicEvolving
3232
public class RebalancePlanForBucket {

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,43 +25,37 @@
2525
/**
2626
* Status of rebalance process for a tabletBucket.
2727
*
28-
* @since 0.8
28+
* @since 0.9
2929
*/
3030
@PublicEvolving
3131
public class RebalanceResultForBucket {
3232
private final RebalancePlanForBucket rebalancePlanForBucket;
33-
private RebalanceStatusForBucket rebalanceStatusForBucket;
33+
private final RebalanceStatus rebalanceStatus;
3434

3535
public RebalanceResultForBucket(
36-
RebalancePlanForBucket rebalancePlanForBucket,
37-
RebalanceStatusForBucket rebalanceStatusForBucket) {
36+
RebalancePlanForBucket rebalancePlanForBucket, RebalanceStatus rebalanceStatus) {
3837
this.rebalancePlanForBucket = rebalancePlanForBucket;
39-
this.rebalanceStatusForBucket = rebalanceStatusForBucket;
38+
this.rebalanceStatus = rebalanceStatus;
4039
}
4140

4241
public TableBucket tableBucket() {
4342
return rebalancePlanForBucket.getTableBucket();
4443
}
4544

46-
public RebalancePlanForBucket planForBucket() {
45+
public RebalancePlanForBucket plan() {
4746
return rebalancePlanForBucket;
4847
}
4948

5049
public List<Integer> newReplicas() {
5150
return rebalancePlanForBucket.getNewReplicas();
5251
}
5352

54-
public RebalanceResultForBucket setNewStatus(RebalanceStatusForBucket status) {
55-
this.rebalanceStatusForBucket = status;
56-
return this;
57-
}
58-
59-
public RebalanceStatusForBucket status() {
60-
return rebalanceStatusForBucket;
53+
public RebalanceStatus status() {
54+
return rebalanceStatus;
6155
}
6256

6357
public static RebalanceResultForBucket of(
64-
RebalancePlanForBucket planForBucket, RebalanceStatusForBucket status) {
58+
RebalancePlanForBucket planForBucket, RebalanceStatus status) {
6559
return new RebalanceResultForBucket(planForBucket, status);
6660
}
6761

@@ -71,7 +65,7 @@ public String toString() {
7165
+ "rebalancePlanForBucket="
7266
+ rebalancePlanForBucket
7367
+ ", rebalanceStatusForBucket="
74-
+ rebalanceStatusForBucket
68+
+ rebalanceStatus
7569
+ '}';
7670
}
7771
}

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java renamed to fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatus.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,29 @@
2020
import org.apache.fluss.annotation.PublicEvolving;
2121

2222
/**
23-
* Rebalance status for single bucket.
23+
* Rebalance status.
2424
*
25-
* @since 0.8
25+
* @since 0.9
2626
*/
2727
@PublicEvolving
28-
public enum RebalanceStatusForBucket {
28+
public enum RebalanceStatus {
2929
PENDING(1),
3030
REBALANCING(2),
3131
FAILED(3),
3232
COMPLETED(4);
3333

3434
private final int code;
3535

36-
RebalanceStatusForBucket(int code) {
36+
RebalanceStatus(int code) {
3737
this.code = code;
3838
}
3939

40-
public static RebalanceStatusForBucket of(int code) {
41-
for (RebalanceStatusForBucket status : RebalanceStatusForBucket.values()) {
40+
public int getCode() {
41+
return code;
42+
}
43+
44+
public static RebalanceStatus of(int code) {
45+
for (RebalanceStatus status : RebalanceStatus.values()) {
4246
if (status.code == code) {
4347
return status;
4448
}

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,17 @@
2424
/**
2525
* The tag of tabletServer.
2626
*
27-
* @since 0.8
27+
* @since 0.9
2828
*/
2929
@PublicEvolving
3030
public enum ServerTag {
31+
/**
32+
* The tabletServer is permanently offline. Such as the host where the tabletServer on is
33+
* upcoming decommissioning.
34+
*/
3135
PERMANENT_OFFLINE(0),
36+
37+
/** The tabletServer is temporarily offline. Such as the tabletServer is upcoming upgrading. */
3238
TEMPORARY_OFFLINE(1);
3339

3440
public final int value;

fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
/**
2323
* Thrown if there are no rebalance tasks in progress when list rebalance process.
2424
*
25-
* @since 0.8
25+
* @since 0.9
2626
*/
2727
@PublicEvolving
2828
public class NoRebalanceInProgressException extends ApiException {

0 commit comments

Comments
 (0)