Skip to content

Commit 60fd7ef

Browse files
committed
change test way
1 parent b5d9f76 commit 60fd7ef

File tree

4 files changed

+43
-26
lines changed

4 files changed

+43
-26
lines changed

fluss-client/src/test/java/org/apache/fluss/client/admin/RebalanceITCase.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.fluss.metadata.TablePath;
3030
import org.apache.fluss.server.replica.ReplicaManager;
3131
import org.apache.fluss.server.testutils.FlussClusterExtension;
32-
3332
import org.junit.jupiter.api.AfterEach;
3433
import org.junit.jupiter.api.BeforeEach;
3534
import org.junit.jupiter.api.Test;
@@ -40,7 +39,6 @@
4039
import java.util.Collections;
4140

4241
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
43-
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
4442
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
4543
import static org.assertj.core.api.Assertions.assertThat;
4644

@@ -79,26 +77,38 @@ protected void teardown() throws Exception {
7977
void testRebalanceForLogTable() throws Exception {
8078
String dbName = "db-balance";
8179
admin.createDatabase(dbName, DatabaseDescriptor.EMPTY, false).get();
82-
// create somne none partitioned log table.
80+
81+
TableDescriptor logDescriptor =
82+
TableDescriptor.builder()
83+
.schema(DATA1_SCHEMA)
84+
.distributedBy(3)
85+
.property(
86+
ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key(),
87+
"true")
88+
.build();
89+
// create some none partitioned log table.
8390
for (int i = 0; i < 6; i++) {
8491
long tableId =
8592
createTable(
8693
new TablePath(dbName, "test-rebalance_table-" + i),
87-
DATA1_TABLE_DESCRIPTOR,
94+
logDescriptor,
8895
false);
8996
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
9097
}
9198

9299
// create some partitioned table with partition.
93-
TableDescriptor descriptor =
100+
TableDescriptor partitionedDescriptor =
94101
TableDescriptor.builder()
95102
.schema(DATA1_SCHEMA)
96103
.distributedBy(3)
104+
.property(
105+
ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key(),
106+
"true")
97107
.partitionedBy("b")
98108
.build();
99109
for (int i = 0; i < 3; i++) {
100110
TablePath tablePath = new TablePath(dbName, "test-rebalance_partitioned_table-" + i);
101-
long tableId = createTable(tablePath, descriptor, false);
111+
long tableId = createTable(tablePath, partitionedDescriptor, false);
102112
for (int j = 0; j < 2; j++) {
103113
PartitionSpec partitionSpec =
104114
new PartitionSpec(Collections.singletonMap("b", String.valueOf(j)));
@@ -185,9 +195,6 @@ void testRebalanceForLogTable() throws Exception {
185195

186196
private static Configuration initConfig() {
187197
Configuration configuration = new Configuration();
188-
// As we want to test rebalance, we need to set this option to true for generate unbalance
189-
// buckets location in server.
190-
configuration.set(ConfigOptions.SERVER_GENERATE_UNBALANCE_ASSIGNMENT_FOR_TEST, true);
191198
configuration.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
192199
configuration.set(ConfigOptions.DEFAULT_BUCKET_NUMBER, 3);
193200
return configuration;

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -481,15 +481,6 @@ public class ConfigOptions {
481481
.withDescription(
482482
"Defines how long the buffer pool will block when waiting for segments to become available.");
483483

484-
public static final ConfigOption<Boolean> SERVER_GENERATE_UNBALANCE_ASSIGNMENT_FOR_TEST =
485-
key("server.generate-unbalance-assignment-for-test")
486-
.booleanType()
487-
.defaultValue(false)
488-
.withDescription(
489-
"Whether to generate unbalance table or partition assignment. This parameter is only used "
490-
+ "for itCase. If set to true, the assignment will always be [0,1,2] as replica factor "
491-
+ "set as 3 even if there are tabletServers more than 3.");
492-
493484
// ------------------------------------------------------------------
494485
// ZooKeeper Settings
495486
// ------------------------------------------------------------------
@@ -1403,6 +1394,14 @@ public class ConfigOptions {
14031394
+ "The `disable` behavior rejects delete requests with a clear error message. "
14041395
+ "For tables with FIRST_ROW or VERSIONED merge engines, this option defaults to `ignore`.");
14051396

1397+
public static final ConfigOption<Boolean> TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT =
1398+
key("table.generate-unbalance-table-assignment")
1399+
.booleanType()
1400+
.defaultValue(false)
1401+
.withDescription(
1402+
"This parameter is only used for test. If set to true, the assignment will always be [0,1,2] "
1403+
+ "as replica factor set as 3 even if there are tabletServers more than 3.");
1404+
14061405
// ------------------------------------------------------------------------
14071406
// ConfigOptions for Kv
14081407
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,9 @@ public ArrowCompressionInfo getArrowCompressionInfo() {
126126
public AutoPartitionStrategy getAutoPartitionStrategy() {
127127
return AutoPartitionStrategy.from(config);
128128
}
129+
130+
/** Whether to generate unbalance assignment fot this table. */
131+
public boolean generateUnbalanceAssignment() {
132+
return config.get(ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT);
133+
}
129134
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@
132132
import org.apache.fluss.utils.concurrent.FutureUtils;
133133

134134
import javax.annotation.Nullable;
135-
136135
import java.io.UncheckedIOException;
137136
import java.util.ArrayList;
138137
import java.util.Arrays;
@@ -176,9 +175,6 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina
176175
private final LakeCatalogDynamicLoader lakeCatalogDynamicLoader;
177176
private final Supplier<RebalanceManager> rebalanceManagerSupplier;
178177

179-
// This parameter is only used for testing.
180-
private final boolean generateUnBalanceAssignment;
181-
182178
public CoordinatorService(
183179
Configuration conf,
184180
FileSystem remoteFileSystem,
@@ -201,8 +197,6 @@ public CoordinatorService(
201197
this.defaultReplicationFactor = conf.getInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR);
202198
this.logTableAllowCreation = conf.getBoolean(ConfigOptions.LOG_TABLE_ALLOW_CREATION);
203199
this.kvTableAllowCreation = conf.getBoolean(ConfigOptions.KV_TABLE_ALLOW_CREATION);
204-
this.generateUnBalanceAssignment =
205-
conf.getBoolean(ConfigOptions.SERVER_GENERATE_UNBALANCE_ASSIGNMENT_FOR_TEST);
206200
this.eventManagerSupplier =
207201
() -> coordinatorEventProcessorSupplier.get().getCoordinatorEventManager();
208202
this.coordinatorEpochSupplier =
@@ -303,12 +297,22 @@ public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest req
303297

304298
// first, generate the assignment
305299
TableAssignment tableAssignment = null;
300+
Map<String, String> properties = tableDescriptor.getProperties();
301+
boolean generateUnbalanceAssignment;
302+
if (properties.containsKey(ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key())) {
303+
generateUnbalanceAssignment =
304+
Boolean.parseBoolean(
305+
properties.get(
306+
ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key()));
307+
} else {
308+
generateUnbalanceAssignment = false;
309+
}
306310
// only when it's no partitioned table do we generate the assignment for it
307311
if (!tableDescriptor.isPartitioned()) {
308312
// the replication factor must be set now
309313
int replicaFactor = tableDescriptor.getReplicationFactor();
310314
TabletServerInfo[] servers = metadataCache.getLiveServers();
311-
if (generateUnBalanceAssignment) {
315+
if (generateUnbalanceAssignment) {
312316
// this branch is only used for testing.
313317
tableAssignment =
314318
new TableAssignment(
@@ -509,7 +513,9 @@ public CompletableFuture<CreatePartitionResponse> createPartition(
509513
int replicaFactor = table.getTableConfig().getReplicationFactor();
510514
TabletServerInfo[] servers = metadataCache.getLiveServers();
511515
Map<Integer, BucketAssignment> bucketAssignments;
512-
if (generateUnBalanceAssignment) {
516+
517+
boolean generateUnbalanceAssignment = table.getTableConfig().generateUnbalanceAssignment();
518+
if (generateUnbalanceAssignment) {
513519
// This branch is only used for testing.
514520
bucketAssignments = generateUnBalanceAssignment(table.bucketCount, replicaFactor);
515521
} else {

0 commit comments

Comments
 (0)