Skip to content

Commit 85341ca

Browse files
committed
[server] Support generate and execute reblance plan
1 parent 00f70b0 commit 85341ca

File tree

60 files changed

+5552
-45
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+5552
-45
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4747
import org.apache.fluss.rpc.gateway.TabletServerGateway;
4848
import org.apache.fluss.rpc.messages.AddServerTagRequest;
49+
import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
4950
import org.apache.fluss.rpc.messages.CreateAclsRequest;
5051
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
5152
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -70,6 +71,7 @@
7071
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
7172
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7273
import org.apache.fluss.rpc.messages.PbTablePath;
74+
import org.apache.fluss.rpc.messages.RebalanceRequest;
7375
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
7476
import org.apache.fluss.rpc.messages.TableExistsRequest;
7577
import org.apache.fluss.rpc.messages.TableExistsResponse;
@@ -488,7 +490,9 @@ public CompletableFuture<Void> removeServerTag(
488490
@Override
489491
public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
490492
List<GoalType> priorityGoals, boolean dryRun) {
491-
throw new UnsupportedOperationException("Support soon");
493+
RebalanceRequest request = new RebalanceRequest().setDryRun(dryRun);
494+
priorityGoals.forEach(goal -> request.addGoal(goal.value));
495+
return gateway.rebalance(request).thenApply(ClientRpcMessageUtils::toRebalancePlan);
492496
}
493497

494498
@Override
@@ -498,7 +502,8 @@ public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalan
498502

499503
@Override
500504
public CompletableFuture<Void> cancelRebalance() {
501-
throw new UnsupportedOperationException("Support soon");
505+
CancelRebalanceRequest request = new CancelRebalanceRequest();
506+
return gateway.cancelRebalance(request).thenApply(r -> null);
502507
}
503508

504509
@Override

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.client.metadata.LakeSnapshot;
2626
import org.apache.fluss.client.write.KvWriteBatch;
2727
import org.apache.fluss.client.write.ReadyWriteBatch;
28+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
2829
import org.apache.fluss.fs.FsPath;
2930
import org.apache.fluss.fs.FsPathAndFileName;
3031
import org.apache.fluss.fs.token.ObtainedSecurityToken;
@@ -51,10 +52,14 @@
5152
import org.apache.fluss.rpc.messages.PbPrefixLookupReqForBucket;
5253
import org.apache.fluss.rpc.messages.PbProduceLogReqForBucket;
5354
import org.apache.fluss.rpc.messages.PbPutKvReqForBucket;
55+
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
56+
import org.apache.fluss.rpc.messages.PbRebalancePlanForPartition;
57+
import org.apache.fluss.rpc.messages.PbRebalancePlanForTable;
5458
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
5559
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
5660
import org.apache.fluss.rpc.messages.ProduceLogRequest;
5761
import org.apache.fluss.rpc.messages.PutKvRequest;
62+
import org.apache.fluss.rpc.messages.RebalanceResponse;
5863

5964
import javax.annotation.Nullable;
6065

@@ -323,6 +328,48 @@ public static DropPartitionRequest makeDropPartitionRequest(
323328
return dropPartitionRequest;
324329
}
325330

331+
public static Map<TableBucket, RebalancePlanForBucket> toRebalancePlan(
332+
RebalanceResponse response) {
333+
Map<TableBucket, RebalancePlanForBucket> rebalancePlan = new HashMap<>();
334+
for (PbRebalancePlanForTable pbTable : response.getPlanForTablesList()) {
335+
long tableId = pbTable.getTableId();
336+
if (pbTable.getPartitionsPlansCount() == 0) {
337+
// none-partition table.
338+
for (PbRebalancePlanForBucket pbBucket : pbTable.getBucketsPlansList()) {
339+
int bucketId = pbBucket.getBucketId();
340+
rebalancePlan.put(
341+
new TableBucket(tableId, null, bucketId),
342+
toRebalancePlanForBucket(tableId, null, bucketId, pbBucket));
343+
}
344+
} else {
345+
// partition table.
346+
for (PbRebalancePlanForPartition pbPartition : pbTable.getPartitionsPlansList()) {
347+
long partitionId = pbPartition.getPartitionId();
348+
for (PbRebalancePlanForBucket pbBucket : pbPartition.getBucketsPlansList()) {
349+
int bucketId = pbBucket.getBucketId();
350+
rebalancePlan.put(
351+
new TableBucket(tableId, partitionId, bucketId),
352+
toRebalancePlanForBucket(tableId, partitionId, bucketId, pbBucket));
353+
}
354+
}
355+
}
356+
}
357+
return rebalancePlan;
358+
}
359+
360+
private static RebalancePlanForBucket toRebalancePlanForBucket(
361+
long tableId,
362+
@Nullable Long partitionId,
363+
int bucketId,
364+
PbRebalancePlanForBucket pbBucket) {
365+
return new RebalancePlanForBucket(
366+
new TableBucket(tableId, partitionId, bucketId),
367+
pbBucket.getOriginalLeader(),
368+
pbBucket.getNewLeader(),
369+
Arrays.stream(pbBucket.getOriginalReplicas()).boxed().collect(Collectors.toList()),
370+
Arrays.stream(pbBucket.getNewReplicas()).boxed().collect(Collectors.toList()));
371+
}
372+
326373
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
327374
return response.getPartitionsInfosList().stream()
328375
.map(
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.admin;
19+
20+
import org.apache.fluss.client.Connection;
21+
import org.apache.fluss.client.ConnectionFactory;
22+
import org.apache.fluss.cluster.rebalance.GoalType;
23+
import org.apache.fluss.cluster.rebalance.ServerTag;
24+
import org.apache.fluss.config.ConfigOptions;
25+
import org.apache.fluss.config.Configuration;
26+
import org.apache.fluss.metadata.DatabaseDescriptor;
27+
import org.apache.fluss.metadata.PartitionSpec;
28+
import org.apache.fluss.metadata.TableDescriptor;
29+
import org.apache.fluss.metadata.TablePath;
30+
import org.apache.fluss.server.replica.ReplicaManager;
31+
import org.apache.fluss.server.testutils.FlussClusterExtension;
32+
33+
import org.junit.jupiter.api.AfterEach;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
import org.junit.jupiter.api.extension.RegisterExtension;
37+
38+
import java.time.Duration;
39+
import java.util.Arrays;
40+
import java.util.Collections;
41+
42+
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
43+
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
44+
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
45+
import static org.assertj.core.api.Assertions.assertThat;
46+
47+
/** ITCase for rebalance. */
48+
public class RebalanceITCase {
49+
@RegisterExtension
50+
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
51+
FlussClusterExtension.builder()
52+
.setNumOfTabletServers(4)
53+
.setClusterConf(initConfig())
54+
.build();
55+
56+
private Connection conn;
57+
private Admin admin;
58+
59+
@BeforeEach
60+
protected void setup() throws Exception {
61+
conn = ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig());
62+
admin = conn.getAdmin();
63+
}
64+
65+
@AfterEach
66+
protected void teardown() throws Exception {
67+
if (admin != null) {
68+
admin.close();
69+
admin = null;
70+
}
71+
72+
if (conn != null) {
73+
conn.close();
74+
conn = null;
75+
}
76+
}
77+
78+
@Test
79+
void testRebalanceForLogTable() throws Exception {
80+
String dbName = "db-balance";
81+
admin.createDatabase(dbName, DatabaseDescriptor.EMPTY, false).get();
82+
// create somne none partitioned log table.
83+
for (int i = 0; i < 6; i++) {
84+
long tableId =
85+
createTable(
86+
new TablePath(dbName, "test-rebalance_table-" + i),
87+
DATA1_TABLE_DESCRIPTOR,
88+
false);
89+
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
90+
}
91+
92+
// create some partitioned table with partition.
93+
TableDescriptor descriptor =
94+
TableDescriptor.builder()
95+
.schema(DATA1_SCHEMA)
96+
.distributedBy(3)
97+
.partitionedBy("b")
98+
.build();
99+
for (int i = 0; i < 3; i++) {
100+
TablePath tablePath = new TablePath(dbName, "test-rebalance_partitioned_table-" + i);
101+
long tableId = createTable(tablePath, descriptor, false);
102+
for (int j = 0; j < 2; j++) {
103+
PartitionSpec partitionSpec =
104+
new PartitionSpec(Collections.singletonMap("b", String.valueOf(j)));
105+
admin.createPartition(tablePath, partitionSpec, false).get();
106+
long partitionId =
107+
admin.listPartitionInfos(tablePath, partitionSpec)
108+
.get()
109+
.get(0)
110+
.getPartitionId();
111+
FLUSS_CLUSTER_EXTENSION.waitUntilTablePartitionReady(tableId, partitionId);
112+
}
113+
}
114+
115+
// verify before rebalance. As we use unbalance assignment, all replicas will be location on
116+
// servers [0,1,2], all leader will be location on server 0.
117+
for (int i = 0; i < 3; i++) {
118+
ReplicaManager replicaManager =
119+
FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager();
120+
assertThat(replicaManager.onlineReplicas().count()).isEqualTo(36);
121+
if (i == 0) {
122+
assertThat(replicaManager.leaderCount()).isEqualTo(36);
123+
} else {
124+
assertThat(replicaManager.leaderCount()).isEqualTo(0);
125+
}
126+
}
127+
ReplicaManager replicaManager3 =
128+
FLUSS_CLUSTER_EXTENSION.getTabletServerById(3).getReplicaManager();
129+
assertThat(replicaManager3.onlineReplicas().count()).isEqualTo(0);
130+
assertThat(replicaManager3.leaderCount()).isEqualTo(0);
131+
132+
// trigger rebalance with goal set[ReplicaDistributionGoal, LeaderReplicaDistributionGoal]
133+
admin.rebalance(
134+
Arrays.asList(
135+
GoalType.REPLICA_DISTRIBUTION_GOAL,
136+
GoalType.LEADER_REPLICA_DISTRIBUTION_GOAL),
137+
false)
138+
.get();
139+
140+
retry(
141+
Duration.ofMinutes(2),
142+
() -> {
143+
// TODO use admin#listRebalanceProcess to verify rebalance is finished.
144+
assertThat(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient().getRebalancePlan())
145+
.isNotPresent();
146+
for (int i = 0; i < 4; i++) {
147+
ReplicaManager replicaManager =
148+
FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager();
149+
// average will be 27
150+
assertThat(replicaManager.onlineReplicas().count()).isBetween(24L, 30L);
151+
long leaderCount = replicaManager.leaderCount();
152+
// average will be 9
153+
assertThat(leaderCount).isBetween(7L, 11L);
154+
}
155+
});
156+
157+
// add server tag PERMANENT_OFFLINE for server 3, trigger all leader and replica removed
158+
// from server 3.
159+
admin.addServerTag(Collections.singletonList(3), ServerTag.PERMANENT_OFFLINE).get();
160+
admin.rebalance(
161+
Arrays.asList(
162+
GoalType.REPLICA_DISTRIBUTION_GOAL,
163+
GoalType.LEADER_REPLICA_DISTRIBUTION_GOAL),
164+
false)
165+
.get();
166+
retry(
167+
Duration.ofMinutes(2),
168+
() -> {
169+
// TODO use admin#listRebalanceProcess to verify rebalance is finished.
170+
assertThat(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient().getRebalancePlan())
171+
.isNotPresent();
172+
assertThat(replicaManager3.onlineReplicas().count()).isEqualTo(0);
173+
assertThat(replicaManager3.leaderCount()).isEqualTo(0);
174+
for (int i = 0; i < 3; i++) {
175+
ReplicaManager replicaManager =
176+
FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager();
177+
// average will be 36
178+
assertThat(replicaManager.onlineReplicas().count()).isBetween(34L, 38L);
179+
long leaderCount = replicaManager.leaderCount();
180+
// average will be 12
181+
assertThat(leaderCount).isBetween(10L, 14L);
182+
}
183+
});
184+
}
185+
186+
private static Configuration initConfig() {
187+
Configuration configuration = new Configuration();
188+
// As we want to test rebalance, we need to set this option to true for generate unbalance
189+
// buckets location in server.
190+
configuration.set(ConfigOptions.SERVER_GENERATE_UNBALANCE_ASSIGNMENT_FOR_TEST, true);
191+
configuration.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
192+
configuration.set(ConfigOptions.DEFAULT_BUCKET_NUMBER, 3);
193+
return configuration;
194+
}
195+
196+
private long createTable(
197+
TablePath tablePath, TableDescriptor tableDescriptor, boolean ignoreIfExists)
198+
throws Exception {
199+
admin.createTable(tablePath, tableDescriptor, ignoreIfExists).get();
200+
return admin.getTableInfo(tablePath).get().getTableId();
201+
}
202+
}

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ public List<Integer> getNewReplicas() {
7373
return newReplicas;
7474
}
7575

76+
public boolean isLeaderAction() {
77+
return originalLeader != newLeader;
78+
}
79+
7680
@Override
7781
public String toString() {
7882
return "RebalancePlanForBucket{"

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class RebalanceResultForBucket {
3232
private final RebalancePlanForBucket rebalancePlanForBucket;
3333
private RebalanceStatusForBucket rebalanceStatusForBucket;
3434

35-
public RebalanceResultForBucket(
35+
private RebalanceResultForBucket(
3636
RebalancePlanForBucket rebalancePlanForBucket,
3737
RebalanceStatusForBucket rebalanceStatusForBucket) {
3838
this.rebalancePlanForBucket = rebalancePlanForBucket;

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,15 @@ public class ConfigOptions {
422422
.withDescription(
423423
"Defines how long the buffer pool will block when waiting for segments to become available.");
424424

425+
public static final ConfigOption<Boolean> SERVER_GENERATE_UNBALANCE_ASSIGNMENT_FOR_TEST =
426+
key("server.generate-unbalance-assignment-for-test")
427+
.booleanType()
428+
.defaultValue(false)
429+
.withDescription(
430+
"Whether to generate unbalance table or partition assignment. This parameter is only used "
431+
+ "for itCase. If set to true, the assignment will always be [0,1,2] as replica factor "
432+
+ "set as 3 even if there are tabletServers more than 3.");
433+
425434
// ------------------------------------------------------------------
426435
// ZooKeeper Settings
427436
// ------------------------------------------------------------------

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public Map<Long, TablePath> allTables() {
165165
return tablePathById;
166166
}
167167

168-
public Set<TableBucket> allBuckets() {
168+
public Set<TableBucket> getAllBuckets() {
169169
Set<TableBucket> allBuckets = new HashSet<>();
170170
for (Map.Entry<Long, Map<Integer, List<Integer>>> tableAssign :
171171
tableAssignments.entrySet()) {

0 commit comments

Comments
 (0)