Skip to content

Commit ed3a37a

Browse files
committed
change test way
1 parent b5d9f76 commit ed3a37a

File tree

4 files changed

+43
-24
lines changed

4 files changed

+43
-24
lines changed

fluss-client/src/test/java/org/apache/fluss/client/admin/RebalanceITCase.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import java.util.Collections;
4141

4242
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
43-
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
4443
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
4544
import static org.assertj.core.api.Assertions.assertThat;
4645

@@ -79,26 +78,38 @@ protected void teardown() throws Exception {
7978
void testRebalanceForLogTable() throws Exception {
8079
String dbName = "db-balance";
8180
admin.createDatabase(dbName, DatabaseDescriptor.EMPTY, false).get();
82-
// create somne none partitioned log table.
81+
82+
TableDescriptor logDescriptor =
83+
TableDescriptor.builder()
84+
.schema(DATA1_SCHEMA)
85+
.distributedBy(3)
86+
.property(
87+
ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key(),
88+
"true")
89+
.build();
90+
// create some none partitioned log table.
8391
for (int i = 0; i < 6; i++) {
8492
long tableId =
8593
createTable(
8694
new TablePath(dbName, "test-rebalance_table-" + i),
87-
DATA1_TABLE_DESCRIPTOR,
95+
logDescriptor,
8896
false);
8997
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
9098
}
9199

92100
// create some partitioned table with partition.
93-
TableDescriptor descriptor =
101+
TableDescriptor partitionedDescriptor =
94102
TableDescriptor.builder()
95103
.schema(DATA1_SCHEMA)
96104
.distributedBy(3)
105+
.property(
106+
ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key(),
107+
"true")
97108
.partitionedBy("b")
98109
.build();
99110
for (int i = 0; i < 3; i++) {
100111
TablePath tablePath = new TablePath(dbName, "test-rebalance_partitioned_table-" + i);
101-
long tableId = createTable(tablePath, descriptor, false);
112+
long tableId = createTable(tablePath, partitionedDescriptor, false);
102113
for (int j = 0; j < 2; j++) {
103114
PartitionSpec partitionSpec =
104115
new PartitionSpec(Collections.singletonMap("b", String.valueOf(j)));
@@ -185,9 +196,6 @@ void testRebalanceForLogTable() throws Exception {
185196

186197
private static Configuration initConfig() {
187198
Configuration configuration = new Configuration();
188-
// As we want to test rebalance, we need to set this option to true for generate unbalance
189-
// buckets location in server.
190-
configuration.set(ConfigOptions.SERVER_GENERATE_UNBALANCE_ASSIGNMENT_FOR_TEST, true);
191199
configuration.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
192200
configuration.set(ConfigOptions.DEFAULT_BUCKET_NUMBER, 3);
193201
return configuration;

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -481,15 +481,6 @@ public class ConfigOptions {
481481
.withDescription(
482482
"Defines how long the buffer pool will block when waiting for segments to become available.");
483483

484-
public static final ConfigOption<Boolean> SERVER_GENERATE_UNBALANCE_ASSIGNMENT_FOR_TEST =
485-
key("server.generate-unbalance-assignment-for-test")
486-
.booleanType()
487-
.defaultValue(false)
488-
.withDescription(
489-
"Whether to generate unbalance table or partition assignment. This parameter is only used "
490-
+ "for itCase. If set to true, the assignment will always be [0,1,2] as replica factor "
491-
+ "set as 3 even if there are tabletServers more than 3.");
492-
493484
// ------------------------------------------------------------------
494485
// ZooKeeper Settings
495486
// ------------------------------------------------------------------
@@ -1403,6 +1394,14 @@ public class ConfigOptions {
14031394
+ "The `disable` behavior rejects delete requests with a clear error message. "
14041395
+ "For tables with FIRST_ROW or VERSIONED merge engines, this option defaults to `ignore`.");
14051396

1397+
public static final ConfigOption<Boolean> TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT =
1398+
key("table.generate-unbalance-table-assignment")
1399+
.booleanType()
1400+
.defaultValue(false)
1401+
.withDescription(
1402+
"This parameter is only used for test. If set to true, the assignment will always be [0,1,2] "
1403+
+ "as replica factor set as 3 even if there are tabletServers more than 3.");
1404+
14061405
// ------------------------------------------------------------------------
14071406
// ConfigOptions for Kv
14081407
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,9 @@ public ArrowCompressionInfo getArrowCompressionInfo() {
126126
public AutoPartitionStrategy getAutoPartitionStrategy() {
127127
return AutoPartitionStrategy.from(config);
128128
}
129+
130+
/** Whether to generate unbalance assignment fot this table. */
131+
public boolean generateUnbalanceAssignment() {
132+
return config.get(ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT);
133+
}
129134
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,6 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina
176176
private final LakeCatalogDynamicLoader lakeCatalogDynamicLoader;
177177
private final Supplier<RebalanceManager> rebalanceManagerSupplier;
178178

179-
// This parameter is only used for testing.
180-
private final boolean generateUnBalanceAssignment;
181-
182179
public CoordinatorService(
183180
Configuration conf,
184181
FileSystem remoteFileSystem,
@@ -201,8 +198,6 @@ public CoordinatorService(
201198
this.defaultReplicationFactor = conf.getInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR);
202199
this.logTableAllowCreation = conf.getBoolean(ConfigOptions.LOG_TABLE_ALLOW_CREATION);
203200
this.kvTableAllowCreation = conf.getBoolean(ConfigOptions.KV_TABLE_ALLOW_CREATION);
204-
this.generateUnBalanceAssignment =
205-
conf.getBoolean(ConfigOptions.SERVER_GENERATE_UNBALANCE_ASSIGNMENT_FOR_TEST);
206201
this.eventManagerSupplier =
207202
() -> coordinatorEventProcessorSupplier.get().getCoordinatorEventManager();
208203
this.coordinatorEpochSupplier =
@@ -303,12 +298,22 @@ public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest req
303298

304299
// first, generate the assignment
305300
TableAssignment tableAssignment = null;
301+
Map<String, String> properties = tableDescriptor.getProperties();
302+
boolean generateUnbalanceAssignment;
303+
if (properties.containsKey(ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key())) {
304+
generateUnbalanceAssignment =
305+
Boolean.parseBoolean(
306+
properties.get(
307+
ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key()));
308+
} else {
309+
generateUnbalanceAssignment = false;
310+
}
306311
// only when it's no partitioned table do we generate the assignment for it
307312
if (!tableDescriptor.isPartitioned()) {
308313
// the replication factor must be set now
309314
int replicaFactor = tableDescriptor.getReplicationFactor();
310315
TabletServerInfo[] servers = metadataCache.getLiveServers();
311-
if (generateUnBalanceAssignment) {
316+
if (generateUnbalanceAssignment) {
312317
// this branch is only used for testing.
313318
tableAssignment =
314319
new TableAssignment(
@@ -509,7 +514,9 @@ public CompletableFuture<CreatePartitionResponse> createPartition(
509514
int replicaFactor = table.getTableConfig().getReplicationFactor();
510515
TabletServerInfo[] servers = metadataCache.getLiveServers();
511516
Map<Integer, BucketAssignment> bucketAssignments;
512-
if (generateUnBalanceAssignment) {
517+
518+
boolean generateUnbalanceAssignment = table.getTableConfig().generateUnbalanceAssignment();
519+
if (generateUnbalanceAssignment) {
513520
// This branch is only used for testing.
514521
bucketAssignments = generateUnBalanceAssignment(table.bucketCount, replicaFactor);
515522
} else {

0 commit comments

Comments
 (0)