Skip to content

Commit a900238

Browse files
committed
[server] Support generate and execute reblance plan
1 parent 4b82ea4 commit a900238

File tree

58 files changed

+5394
-207
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+5394
-207
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.fluss.rpc.messages.AddServerTagRequest;
5252
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
5353
import org.apache.fluss.rpc.messages.AlterTableRequest;
54+
import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
5455
import org.apache.fluss.rpc.messages.CreateAclsRequest;
5556
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
5657
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -77,6 +78,7 @@
7778
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
7879
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7980
import org.apache.fluss.rpc.messages.PbTablePath;
81+
import org.apache.fluss.rpc.messages.RebalanceRequest;
8082
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
8183
import org.apache.fluss.rpc.messages.TableExistsRequest;
8284
import org.apache.fluss.rpc.messages.TableExistsResponse;
@@ -555,7 +557,9 @@ public CompletableFuture<Void> removeServerTag(
555557
@Override
556558
public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
557559
List<GoalType> priorityGoals, boolean dryRun) {
558-
throw new UnsupportedOperationException("Support soon");
560+
RebalanceRequest request = new RebalanceRequest().setDryRun(dryRun);
561+
priorityGoals.forEach(goal -> request.addGoal(goal.value));
562+
return gateway.rebalance(request).thenApply(ClientRpcMessageUtils::toRebalancePlan);
559563
}
560564

561565
@Override
@@ -565,7 +569,8 @@ public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalan
565569

566570
@Override
567571
public CompletableFuture<Void> cancelRebalance() {
568-
throw new UnsupportedOperationException("Support soon");
572+
CancelRebalanceRequest request = new CancelRebalanceRequest();
573+
return gateway.cancelRebalance(request).thenApply(r -> null);
569574
}
570575

571576
@Override

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.client.metadata.LakeSnapshot;
2626
import org.apache.fluss.client.write.KvWriteBatch;
2727
import org.apache.fluss.client.write.ReadyWriteBatch;
28+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
2829
import org.apache.fluss.config.cluster.AlterConfigOpType;
2930
import org.apache.fluss.config.cluster.ColumnPositionType;
3031
import org.apache.fluss.config.cluster.ConfigEntry;
@@ -61,11 +62,15 @@
6162
import org.apache.fluss.rpc.messages.PbPrefixLookupReqForBucket;
6263
import org.apache.fluss.rpc.messages.PbProduceLogReqForBucket;
6364
import org.apache.fluss.rpc.messages.PbPutKvReqForBucket;
65+
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
66+
import org.apache.fluss.rpc.messages.PbRebalancePlanForPartition;
67+
import org.apache.fluss.rpc.messages.PbRebalancePlanForTable;
6468
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
6569
import org.apache.fluss.rpc.messages.PbRenameColumn;
6670
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
6771
import org.apache.fluss.rpc.messages.ProduceLogRequest;
6872
import org.apache.fluss.rpc.messages.PutKvRequest;
73+
import org.apache.fluss.rpc.messages.RebalanceResponse;
6974
import org.apache.fluss.utils.json.DataTypeJsonSerde;
7075
import org.apache.fluss.utils.json.JsonSerdeUtils;
7176

@@ -370,6 +375,48 @@ public static AlterTableRequest makeAlterTableRequest(
370375
return request;
371376
}
372377

378+
public static Map<TableBucket, RebalancePlanForBucket> toRebalancePlan(
379+
RebalanceResponse response) {
380+
Map<TableBucket, RebalancePlanForBucket> rebalancePlan = new HashMap<>();
381+
for (PbRebalancePlanForTable pbTable : response.getPlanForTablesList()) {
382+
long tableId = pbTable.getTableId();
383+
if (pbTable.getPartitionsPlansCount() == 0) {
384+
// none-partition table.
385+
for (PbRebalancePlanForBucket pbBucket : pbTable.getBucketsPlansList()) {
386+
int bucketId = pbBucket.getBucketId();
387+
rebalancePlan.put(
388+
new TableBucket(tableId, null, bucketId),
389+
toRebalancePlanForBucket(tableId, null, bucketId, pbBucket));
390+
}
391+
} else {
392+
// partition table.
393+
for (PbRebalancePlanForPartition pbPartition : pbTable.getPartitionsPlansList()) {
394+
long partitionId = pbPartition.getPartitionId();
395+
for (PbRebalancePlanForBucket pbBucket : pbPartition.getBucketsPlansList()) {
396+
int bucketId = pbBucket.getBucketId();
397+
rebalancePlan.put(
398+
new TableBucket(tableId, partitionId, bucketId),
399+
toRebalancePlanForBucket(tableId, partitionId, bucketId, pbBucket));
400+
}
401+
}
402+
}
403+
}
404+
return rebalancePlan;
405+
}
406+
407+
private static RebalancePlanForBucket toRebalancePlanForBucket(
408+
long tableId,
409+
@Nullable Long partitionId,
410+
int bucketId,
411+
PbRebalancePlanForBucket pbBucket) {
412+
return new RebalancePlanForBucket(
413+
new TableBucket(tableId, partitionId, bucketId),
414+
pbBucket.getOriginalLeader(),
415+
pbBucket.getNewLeader(),
416+
Arrays.stream(pbBucket.getOriginalReplicas()).boxed().collect(Collectors.toList()),
417+
Arrays.stream(pbBucket.getNewReplicas()).boxed().collect(Collectors.toList()));
418+
}
419+
373420
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
374421
return response.getPartitionsInfosList().stream()
375422
.map(
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.admin;
19+
20+
import org.apache.fluss.client.Connection;
21+
import org.apache.fluss.client.ConnectionFactory;
22+
import org.apache.fluss.cluster.rebalance.GoalType;
23+
import org.apache.fluss.cluster.rebalance.ServerTag;
24+
import org.apache.fluss.config.ConfigOptions;
25+
import org.apache.fluss.config.Configuration;
26+
import org.apache.fluss.metadata.DatabaseDescriptor;
27+
import org.apache.fluss.metadata.PartitionSpec;
28+
import org.apache.fluss.metadata.TableDescriptor;
29+
import org.apache.fluss.metadata.TablePath;
30+
import org.apache.fluss.server.replica.ReplicaManager;
31+
import org.apache.fluss.server.testutils.FlussClusterExtension;
32+
33+
import org.junit.jupiter.api.AfterEach;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
import org.junit.jupiter.api.extension.RegisterExtension;
37+
38+
import java.time.Duration;
39+
import java.util.Arrays;
40+
import java.util.Collections;
41+
42+
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
43+
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
44+
import static org.assertj.core.api.Assertions.assertThat;
45+
46+
/** ITCase for rebalance. */
47+
public class RebalanceITCase {
48+
@RegisterExtension
49+
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
50+
FlussClusterExtension.builder()
51+
.setNumOfTabletServers(4)
52+
.setClusterConf(initConfig())
53+
.build();
54+
55+
private Connection conn;
56+
private Admin admin;
57+
58+
@BeforeEach
59+
protected void setup() throws Exception {
60+
conn = ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig());
61+
admin = conn.getAdmin();
62+
}
63+
64+
@AfterEach
65+
protected void teardown() throws Exception {
66+
if (admin != null) {
67+
admin.close();
68+
admin = null;
69+
}
70+
71+
if (conn != null) {
72+
conn.close();
73+
conn = null;
74+
}
75+
}
76+
77+
@Test
78+
void testRebalanceForLogTable() throws Exception {
79+
String dbName = "db-balance";
80+
admin.createDatabase(dbName, DatabaseDescriptor.EMPTY, false).get();
81+
82+
TableDescriptor logDescriptor =
83+
TableDescriptor.builder()
84+
.schema(DATA1_SCHEMA)
85+
.distributedBy(3)
86+
.property(
87+
ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key(),
88+
"true")
89+
.build();
90+
// create some none partitioned log table.
91+
for (int i = 0; i < 6; i++) {
92+
long tableId =
93+
createTable(
94+
new TablePath(dbName, "test-rebalance_table-" + i),
95+
logDescriptor,
96+
false);
97+
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
98+
}
99+
100+
// create some partitioned table with partition.
101+
TableDescriptor partitionedDescriptor =
102+
TableDescriptor.builder()
103+
.schema(DATA1_SCHEMA)
104+
.distributedBy(3)
105+
.property(
106+
ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key(),
107+
"true")
108+
.partitionedBy("b")
109+
.build();
110+
for (int i = 0; i < 3; i++) {
111+
TablePath tablePath = new TablePath(dbName, "test-rebalance_partitioned_table-" + i);
112+
long tableId = createTable(tablePath, partitionedDescriptor, false);
113+
for (int j = 0; j < 2; j++) {
114+
PartitionSpec partitionSpec =
115+
new PartitionSpec(Collections.singletonMap("b", String.valueOf(j)));
116+
admin.createPartition(tablePath, partitionSpec, false).get();
117+
long partitionId =
118+
admin.listPartitionInfos(tablePath, partitionSpec)
119+
.get()
120+
.get(0)
121+
.getPartitionId();
122+
FLUSS_CLUSTER_EXTENSION.waitUntilTablePartitionReady(tableId, partitionId);
123+
}
124+
}
125+
126+
// verify before rebalance. As we use unbalance assignment, all replicas will be location on
127+
// servers [0,1,2], all leader will be location on server 0.
128+
for (int i = 0; i < 3; i++) {
129+
ReplicaManager replicaManager =
130+
FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager();
131+
assertThat(replicaManager.onlineReplicas().count()).isEqualTo(36);
132+
if (i == 0) {
133+
assertThat(replicaManager.leaderCount()).isEqualTo(36);
134+
} else {
135+
assertThat(replicaManager.leaderCount()).isEqualTo(0);
136+
}
137+
}
138+
ReplicaManager replicaManager3 =
139+
FLUSS_CLUSTER_EXTENSION.getTabletServerById(3).getReplicaManager();
140+
assertThat(replicaManager3.onlineReplicas().count()).isEqualTo(0);
141+
assertThat(replicaManager3.leaderCount()).isEqualTo(0);
142+
143+
// trigger rebalance with goal set[ReplicaDistributionGoal, LeaderReplicaDistributionGoal]
144+
admin.rebalance(
145+
Arrays.asList(
146+
GoalType.REPLICA_DISTRIBUTION_GOAL,
147+
GoalType.LEADER_REPLICA_DISTRIBUTION_GOAL),
148+
false)
149+
.get();
150+
151+
retry(
152+
Duration.ofMinutes(2),
153+
() -> {
154+
// TODO use admin#listRebalanceProcess to verify rebalance is finished.
155+
assertThat(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient().getRebalancePlan())
156+
.isNotPresent();
157+
for (int i = 0; i < 4; i++) {
158+
ReplicaManager replicaManager =
159+
FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager();
160+
// average will be 27
161+
assertThat(replicaManager.onlineReplicas().count()).isBetween(24L, 30L);
162+
long leaderCount = replicaManager.leaderCount();
163+
// average will be 9
164+
assertThat(leaderCount).isBetween(7L, 11L);
165+
}
166+
});
167+
168+
// add server tag PERMANENT_OFFLINE for server 3, trigger all leader and replica removed
169+
// from server 3.
170+
admin.addServerTag(Collections.singletonList(3), ServerTag.PERMANENT_OFFLINE).get();
171+
admin.rebalance(
172+
Arrays.asList(
173+
GoalType.REPLICA_DISTRIBUTION_GOAL,
174+
GoalType.LEADER_REPLICA_DISTRIBUTION_GOAL),
175+
false)
176+
.get();
177+
retry(
178+
Duration.ofMinutes(2),
179+
() -> {
180+
// TODO use admin#listRebalanceProcess to verify rebalance is finished.
181+
assertThat(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient().getRebalancePlan())
182+
.isNotPresent();
183+
assertThat(replicaManager3.onlineReplicas().count()).isEqualTo(0);
184+
assertThat(replicaManager3.leaderCount()).isEqualTo(0);
185+
for (int i = 0; i < 3; i++) {
186+
ReplicaManager replicaManager =
187+
FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager();
188+
// average will be 36
189+
assertThat(replicaManager.onlineReplicas().count()).isBetween(34L, 38L);
190+
long leaderCount = replicaManager.leaderCount();
191+
// average will be 12
192+
assertThat(leaderCount).isBetween(10L, 14L);
193+
}
194+
});
195+
}
196+
197+
private static Configuration initConfig() {
198+
Configuration configuration = new Configuration();
199+
configuration.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
200+
configuration.set(ConfigOptions.DEFAULT_BUCKET_NUMBER, 3);
201+
return configuration;
202+
}
203+
204+
private long createTable(
205+
TablePath tablePath, TableDescriptor tableDescriptor, boolean ignoreIfExists)
206+
throws Exception {
207+
admin.createTable(tablePath, tableDescriptor, ignoreIfExists).get();
208+
return admin.getTableInfo(tablePath).get().getTableId();
209+
}
210+
}

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ public List<Integer> getNewReplicas() {
7373
return newReplicas;
7474
}
7575

76+
public boolean isLeaderAction() {
77+
return originalLeader != newLeader;
78+
}
79+
7680
@Override
7781
public String toString() {
7882
return "RebalancePlanForBucket{"

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class RebalanceResultForBucket {
3232
private final RebalancePlanForBucket rebalancePlanForBucket;
3333
private RebalanceStatusForBucket rebalanceStatusForBucket;
3434

35-
public RebalanceResultForBucket(
35+
private RebalanceResultForBucket(
3636
RebalancePlanForBucket rebalancePlanForBucket,
3737
RebalanceStatusForBucket rebalanceStatusForBucket) {
3838
this.rebalancePlanForBucket = rebalancePlanForBucket;

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1471,6 +1471,14 @@ public class ConfigOptions {
14711471
+ "This mode reduces storage and transmission costs but loses the ability to track previous values. "
14721472
+ "This option only affects primary key tables.");
14731473

1474+
public static final ConfigOption<Boolean> TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT =
1475+
key("table.generate-unbalance-table-assignment")
1476+
.booleanType()
1477+
.defaultValue(false)
1478+
.withDescription(
1479+
"This parameter is only used for test. If set to true, the assignment will always be [0,1,2] "
1480+
+ "as replica factor set as 3 even if there are tabletServers more than 3.");
1481+
14741482
// ------------------------------------------------------------------------
14751483
// ConfigOptions for Kv
14761484
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,4 +140,9 @@ public ArrowCompressionInfo getArrowCompressionInfo() {
140140
public AutoPartitionStrategy getAutoPartitionStrategy() {
141141
return AutoPartitionStrategy.from(config);
142142
}
143+
144+
/** Whether to generate unbalance assignment fot this table. */
145+
public boolean generateUnbalanceAssignment() {
146+
return config.get(ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT);
147+
}
143148
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public Map<Long, TablePath> allTables() {
184184
return tablePathById;
185185
}
186186

187-
public Set<TableBucket> allBuckets() {
187+
public Set<TableBucket> getAllBuckets() {
188188
Set<TableBucket> allBuckets = new HashSet<>();
189189
for (Map.Entry<Long, Map<Integer, List<Integer>>> tableAssign :
190190
tableAssignments.entrySet()) {

0 commit comments

Comments
 (0)