Skip to content

Commit b03b9eb

Browse files
committed
[server] Support generate and execute reblance plan
1 parent 25887e8 commit b03b9eb

File tree

58 files changed

+5394
-207
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+5394
-207
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.fluss.rpc.messages.AddServerTagRequest;
5252
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
5353
import org.apache.fluss.rpc.messages.AlterTableRequest;
54+
import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
5455
import org.apache.fluss.rpc.messages.CreateAclsRequest;
5556
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
5657
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -77,6 +78,7 @@
7778
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
7879
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7980
import org.apache.fluss.rpc.messages.PbTablePath;
81+
import org.apache.fluss.rpc.messages.RebalanceRequest;
8082
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
8183
import org.apache.fluss.rpc.messages.TableExistsRequest;
8284
import org.apache.fluss.rpc.messages.TableExistsResponse;
@@ -559,7 +561,9 @@ public CompletableFuture<Void> removeServerTag(
559561
@Override
560562
public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
561563
List<GoalType> priorityGoals, boolean dryRun) {
562-
throw new UnsupportedOperationException("Support soon");
564+
RebalanceRequest request = new RebalanceRequest().setDryRun(dryRun);
565+
priorityGoals.forEach(goal -> request.addGoal(goal.value));
566+
return gateway.rebalance(request).thenApply(ClientRpcMessageUtils::toRebalancePlan);
563567
}
564568

565569
@Override
@@ -569,7 +573,8 @@ public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalan
569573

570574
@Override
571575
public CompletableFuture<Void> cancelRebalance() {
572-
throw new UnsupportedOperationException("Support soon");
576+
CancelRebalanceRequest request = new CancelRebalanceRequest();
577+
return gateway.cancelRebalance(request).thenApply(r -> null);
573578
}
574579

575580
@Override

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.client.metadata.LakeSnapshot;
2626
import org.apache.fluss.client.write.KvWriteBatch;
2727
import org.apache.fluss.client.write.ReadyWriteBatch;
28+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
2829
import org.apache.fluss.config.cluster.AlterConfigOpType;
2930
import org.apache.fluss.config.cluster.ConfigEntry;
3031
import org.apache.fluss.fs.FsPath;
@@ -56,10 +57,14 @@
5657
import org.apache.fluss.rpc.messages.PbPrefixLookupReqForBucket;
5758
import org.apache.fluss.rpc.messages.PbProduceLogReqForBucket;
5859
import org.apache.fluss.rpc.messages.PbPutKvReqForBucket;
60+
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
61+
import org.apache.fluss.rpc.messages.PbRebalancePlanForPartition;
62+
import org.apache.fluss.rpc.messages.PbRebalancePlanForTable;
5963
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
6064
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
6165
import org.apache.fluss.rpc.messages.ProduceLogRequest;
6266
import org.apache.fluss.rpc.messages.PutKvRequest;
67+
import org.apache.fluss.rpc.messages.RebalanceResponse;
6368

6469
import javax.annotation.Nullable;
6570

@@ -328,6 +333,48 @@ public static DropPartitionRequest makeDropPartitionRequest(
328333
return dropPartitionRequest;
329334
}
330335

336+
public static Map<TableBucket, RebalancePlanForBucket> toRebalancePlan(
337+
RebalanceResponse response) {
338+
Map<TableBucket, RebalancePlanForBucket> rebalancePlan = new HashMap<>();
339+
for (PbRebalancePlanForTable pbTable : response.getPlanForTablesList()) {
340+
long tableId = pbTable.getTableId();
341+
if (pbTable.getPartitionsPlansCount() == 0) {
342+
// none-partition table.
343+
for (PbRebalancePlanForBucket pbBucket : pbTable.getBucketsPlansList()) {
344+
int bucketId = pbBucket.getBucketId();
345+
rebalancePlan.put(
346+
new TableBucket(tableId, null, bucketId),
347+
toRebalancePlanForBucket(tableId, null, bucketId, pbBucket));
348+
}
349+
} else {
350+
// partition table.
351+
for (PbRebalancePlanForPartition pbPartition : pbTable.getPartitionsPlansList()) {
352+
long partitionId = pbPartition.getPartitionId();
353+
for (PbRebalancePlanForBucket pbBucket : pbPartition.getBucketsPlansList()) {
354+
int bucketId = pbBucket.getBucketId();
355+
rebalancePlan.put(
356+
new TableBucket(tableId, partitionId, bucketId),
357+
toRebalancePlanForBucket(tableId, partitionId, bucketId, pbBucket));
358+
}
359+
}
360+
}
361+
}
362+
return rebalancePlan;
363+
}
364+
365+
private static RebalancePlanForBucket toRebalancePlanForBucket(
366+
long tableId,
367+
@Nullable Long partitionId,
368+
int bucketId,
369+
PbRebalancePlanForBucket pbBucket) {
370+
return new RebalancePlanForBucket(
371+
new TableBucket(tableId, partitionId, bucketId),
372+
pbBucket.getOriginalLeader(),
373+
pbBucket.getNewLeader(),
374+
Arrays.stream(pbBucket.getOriginalReplicas()).boxed().collect(Collectors.toList()),
375+
Arrays.stream(pbBucket.getNewReplicas()).boxed().collect(Collectors.toList()));
376+
}
377+
331378
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
332379
return response.getPartitionsInfosList().stream()
333380
.map(
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.admin;
19+
20+
import org.apache.fluss.client.Connection;
21+
import org.apache.fluss.client.ConnectionFactory;
22+
import org.apache.fluss.cluster.rebalance.GoalType;
23+
import org.apache.fluss.cluster.rebalance.ServerTag;
24+
import org.apache.fluss.config.ConfigOptions;
25+
import org.apache.fluss.config.Configuration;
26+
import org.apache.fluss.metadata.DatabaseDescriptor;
27+
import org.apache.fluss.metadata.PartitionSpec;
28+
import org.apache.fluss.metadata.TableDescriptor;
29+
import org.apache.fluss.metadata.TablePath;
30+
import org.apache.fluss.server.replica.ReplicaManager;
31+
import org.apache.fluss.server.testutils.FlussClusterExtension;
32+
33+
import org.junit.jupiter.api.AfterEach;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
import org.junit.jupiter.api.extension.RegisterExtension;
37+
38+
import java.time.Duration;
39+
import java.util.Arrays;
40+
import java.util.Collections;
41+
42+
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
43+
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
44+
import static org.assertj.core.api.Assertions.assertThat;
45+
46+
/** ITCase for rebalance. */
47+
public class RebalanceITCase {
48+
@RegisterExtension
49+
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
50+
FlussClusterExtension.builder()
51+
.setNumOfTabletServers(4)
52+
.setClusterConf(initConfig())
53+
.build();
54+
55+
private Connection conn;
56+
private Admin admin;
57+
58+
@BeforeEach
59+
protected void setup() throws Exception {
60+
conn = ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig());
61+
admin = conn.getAdmin();
62+
}
63+
64+
@AfterEach
65+
protected void teardown() throws Exception {
66+
if (admin != null) {
67+
admin.close();
68+
admin = null;
69+
}
70+
71+
if (conn != null) {
72+
conn.close();
73+
conn = null;
74+
}
75+
}
76+
77+
@Test
78+
void testRebalanceForLogTable() throws Exception {
79+
String dbName = "db-balance";
80+
admin.createDatabase(dbName, DatabaseDescriptor.EMPTY, false).get();
81+
82+
TableDescriptor logDescriptor =
83+
TableDescriptor.builder()
84+
.schema(DATA1_SCHEMA)
85+
.distributedBy(3)
86+
.property(
87+
ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key(),
88+
"true")
89+
.build();
90+
// create some none partitioned log table.
91+
for (int i = 0; i < 6; i++) {
92+
long tableId =
93+
createTable(
94+
new TablePath(dbName, "test-rebalance_table-" + i),
95+
logDescriptor,
96+
false);
97+
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
98+
}
99+
100+
// create some partitioned table with partition.
101+
TableDescriptor partitionedDescriptor =
102+
TableDescriptor.builder()
103+
.schema(DATA1_SCHEMA)
104+
.distributedBy(3)
105+
.property(
106+
ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key(),
107+
"true")
108+
.partitionedBy("b")
109+
.build();
110+
for (int i = 0; i < 3; i++) {
111+
TablePath tablePath = new TablePath(dbName, "test-rebalance_partitioned_table-" + i);
112+
long tableId = createTable(tablePath, partitionedDescriptor, false);
113+
for (int j = 0; j < 2; j++) {
114+
PartitionSpec partitionSpec =
115+
new PartitionSpec(Collections.singletonMap("b", String.valueOf(j)));
116+
admin.createPartition(tablePath, partitionSpec, false).get();
117+
long partitionId =
118+
admin.listPartitionInfos(tablePath, partitionSpec)
119+
.get()
120+
.get(0)
121+
.getPartitionId();
122+
FLUSS_CLUSTER_EXTENSION.waitUntilTablePartitionReady(tableId, partitionId);
123+
}
124+
}
125+
126+
// verify before rebalance. As we use unbalance assignment, all replicas will be location on
127+
// servers [0,1,2], all leader will be location on server 0.
128+
for (int i = 0; i < 3; i++) {
129+
ReplicaManager replicaManager =
130+
FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager();
131+
assertThat(replicaManager.onlineReplicas().count()).isEqualTo(36);
132+
if (i == 0) {
133+
assertThat(replicaManager.leaderCount()).isEqualTo(36);
134+
} else {
135+
assertThat(replicaManager.leaderCount()).isEqualTo(0);
136+
}
137+
}
138+
ReplicaManager replicaManager3 =
139+
FLUSS_CLUSTER_EXTENSION.getTabletServerById(3).getReplicaManager();
140+
assertThat(replicaManager3.onlineReplicas().count()).isEqualTo(0);
141+
assertThat(replicaManager3.leaderCount()).isEqualTo(0);
142+
143+
// trigger rebalance with goal set[ReplicaDistributionGoal, LeaderReplicaDistributionGoal]
144+
admin.rebalance(
145+
Arrays.asList(
146+
GoalType.REPLICA_DISTRIBUTION_GOAL,
147+
GoalType.LEADER_REPLICA_DISTRIBUTION_GOAL),
148+
false)
149+
.get();
150+
151+
retry(
152+
Duration.ofMinutes(2),
153+
() -> {
154+
// TODO use admin#listRebalanceProcess to verify rebalance is finished.
155+
assertThat(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient().getRebalancePlan())
156+
.isNotPresent();
157+
for (int i = 0; i < 4; i++) {
158+
ReplicaManager replicaManager =
159+
FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager();
160+
// average will be 27
161+
assertThat(replicaManager.onlineReplicas().count()).isBetween(24L, 30L);
162+
long leaderCount = replicaManager.leaderCount();
163+
// average will be 9
164+
assertThat(leaderCount).isBetween(7L, 11L);
165+
}
166+
});
167+
168+
// add server tag PERMANENT_OFFLINE for server 3, trigger all leader and replica removed
169+
// from server 3.
170+
admin.addServerTag(Collections.singletonList(3), ServerTag.PERMANENT_OFFLINE).get();
171+
admin.rebalance(
172+
Arrays.asList(
173+
GoalType.REPLICA_DISTRIBUTION_GOAL,
174+
GoalType.LEADER_REPLICA_DISTRIBUTION_GOAL),
175+
false)
176+
.get();
177+
retry(
178+
Duration.ofMinutes(2),
179+
() -> {
180+
// TODO use admin#listRebalanceProcess to verify rebalance is finished.
181+
assertThat(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient().getRebalancePlan())
182+
.isNotPresent();
183+
assertThat(replicaManager3.onlineReplicas().count()).isEqualTo(0);
184+
assertThat(replicaManager3.leaderCount()).isEqualTo(0);
185+
for (int i = 0; i < 3; i++) {
186+
ReplicaManager replicaManager =
187+
FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager();
188+
// average will be 36
189+
assertThat(replicaManager.onlineReplicas().count()).isBetween(34L, 38L);
190+
long leaderCount = replicaManager.leaderCount();
191+
// average will be 12
192+
assertThat(leaderCount).isBetween(10L, 14L);
193+
}
194+
});
195+
}
196+
197+
private static Configuration initConfig() {
198+
Configuration configuration = new Configuration();
199+
configuration.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
200+
configuration.set(ConfigOptions.DEFAULT_BUCKET_NUMBER, 3);
201+
return configuration;
202+
}
203+
204+
private long createTable(
205+
TablePath tablePath, TableDescriptor tableDescriptor, boolean ignoreIfExists)
206+
throws Exception {
207+
admin.createTable(tablePath, tableDescriptor, ignoreIfExists).get();
208+
return admin.getTableInfo(tablePath).get().getTableId();
209+
}
210+
}

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ public List<Integer> getNewReplicas() {
7373
return newReplicas;
7474
}
7575

76+
public boolean isLeaderAction() {
77+
return originalLeader != newLeader;
78+
}
79+
7680
@Override
7781
public String toString() {
7882
return "RebalancePlanForBucket{"

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class RebalanceResultForBucket {
3232
private final RebalancePlanForBucket rebalancePlanForBucket;
3333
private RebalanceStatusForBucket rebalanceStatusForBucket;
3434

35-
public RebalanceResultForBucket(
35+
private RebalanceResultForBucket(
3636
RebalancePlanForBucket rebalancePlanForBucket,
3737
RebalanceStatusForBucket rebalanceStatusForBucket) {
3838
this.rebalancePlanForBucket = rebalancePlanForBucket;

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1394,6 +1394,14 @@ public class ConfigOptions {
13941394
+ "The `disable` behavior rejects delete requests with a clear error message. "
13951395
+ "For tables with FIRST_ROW or VERSIONED merge engines, this option defaults to `ignore`.");
13961396

1397+
public static final ConfigOption<Boolean> TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT =
1398+
key("table.generate-unbalance-table-assignment")
1399+
.booleanType()
1400+
.defaultValue(false)
1401+
.withDescription(
1402+
"This parameter is only used for test. If set to true, the assignment will always be [0,1,2] "
1403+
+ "as replica factor set as 3 even if there are tabletServers more than 3.");
1404+
13971405
// ------------------------------------------------------------------------
13981406
// ConfigOptions for Kv
13991407
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,9 @@ public ArrowCompressionInfo getArrowCompressionInfo() {
126126
public AutoPartitionStrategy getAutoPartitionStrategy() {
127127
return AutoPartitionStrategy.from(config);
128128
}
129+
130+
/** Whether to generate unbalance assignment fot this table. */
131+
public boolean generateUnbalanceAssignment() {
132+
return config.get(ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT);
133+
}
129134
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public Map<Long, TablePath> allTables() {
184184
return tablePathById;
185185
}
186186

187-
public Set<TableBucket> allBuckets() {
187+
public Set<TableBucket> getAllBuckets() {
188188
Set<TableBucket> allBuckets = new HashSet<>();
189189
for (Map.Entry<Long, Map<Integer, List<Integer>>> tableAssign :
190190
tableAssignments.entrySet()) {

0 commit comments

Comments
 (0)