Skip to content

Commit f199e33

Browse files
committed
[test] Fix some unstable test cases while running CI
1 parent a40bf13 commit f199e33

File tree

33 files changed

+530
-314
lines changed

33 files changed

+530
-314
lines changed

fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ protected long createTable(
9696
return admin.getTable(tablePath).get().getTableId();
9797
}
9898

99-
private static Configuration initConfig() {
99+
public static Configuration initConfig() {
100100
Configuration conf = new Configuration();
101101
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
102102
// set a shorter interval for testing purpose
@@ -117,7 +117,7 @@ protected static LogScanner createLogScanner(Table table, int[] projectFields) {
117117
return table.getLogScanner(new LogScan().withProjectedFields(projectFields));
118118
}
119119

120-
protected static void subscribeFromBeginning(LogScanner logScanner, Table table) {
120+
public static void subscribeFromBeginning(LogScanner logScanner, Table table) {
121121
int bucketCount = getBucketCount(table);
122122
for (int i = 0; i < bucketCount; i++) {
123123
logScanner.subscribeFromBeginning(i);

fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java

Lines changed: 68 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
/** Test for {@link FlussAdmin}. */
6666
class FlussAdminITCase extends ClientToServerITCaseBase {
6767

68-
protected static final TablePath DEFAULT_TABLE_PATH = TablePath.of("test_db", "person");
6968
protected static final Schema DEFAULT_SCHEMA =
7069
Schema.newBuilder()
7170
.primaryKey("id")
@@ -80,26 +79,27 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
8079
TableDescriptor.builder()
8180
.schema(DEFAULT_SCHEMA)
8281
.comment("test table")
83-
.distributedBy(10, "id")
82+
.distributedBy(3, "id")
8483
.property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1))
8584
.customProperty("connector", "fluss")
8685
.build();
8786

8887
@BeforeEach
8988
protected void setup() throws Exception {
9089
super.setup();
91-
// create a default table in fluss.
92-
createTable(DEFAULT_TABLE_PATH, DEFAULT_TABLE_DESCRIPTOR, false);
9390
}
9491

9592
@Test
9693
void testMultiClient() throws Exception {
94+
TablePath tablePath = TablePath.of("test_db", "multi_client_test_t1");
95+
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);
96+
9797
Admin admin1 = conn.getAdmin();
9898
Admin admin2 = conn.getAdmin();
9999
assertThat(admin1).isNotSameAs(admin2);
100100

101-
TableInfo t1 = admin1.getTable(DEFAULT_TABLE_PATH).get();
102-
TableInfo t2 = admin2.getTable(DEFAULT_TABLE_PATH).get();
101+
TableInfo t1 = admin1.getTable(tablePath).get();
102+
TableInfo t2 = admin2.getTable(tablePath).get();
103103
assertThat(t1).isEqualTo(t2);
104104

105105
admin1.close();
@@ -108,14 +108,17 @@ void testMultiClient() throws Exception {
108108

109109
@Test
110110
void testGetTableAndSchema() throws Exception {
111-
SchemaInfo schemaInfo = admin.getTableSchema(DEFAULT_TABLE_PATH).get();
111+
TablePath tablePath = TablePath.of("test_db", "test_get_table_and_schema");
112+
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);
113+
114+
SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get();
112115
assertThat(schemaInfo.getSchema()).isEqualTo(DEFAULT_SCHEMA);
113116
assertThat(schemaInfo.getSchemaId()).isEqualTo(1);
114-
SchemaInfo schemaInfo2 = admin.getTableSchema(DEFAULT_TABLE_PATH, 1).get();
117+
SchemaInfo schemaInfo2 = admin.getTableSchema(tablePath, 1).get();
115118
assertThat(schemaInfo2).isEqualTo(schemaInfo);
116119

117120
// get default table.
118-
TableInfo tableInfo = admin.getTable(DEFAULT_TABLE_PATH).get();
121+
TableInfo tableInfo = admin.getTable(tablePath).get();
119122
assertThat(tableInfo.getSchemaId()).isEqualTo(schemaInfo.getSchemaId());
120123
assertThat(tableInfo.getTableDescriptor()).isEqualTo(DEFAULT_TABLE_DESCRIPTOR);
121124

@@ -157,8 +160,11 @@ void testCreateInvalidDatabaseAndTable() {
157160
}
158161

159162
@Test
160-
void testCreateTableWithInvalidProperty() {
161-
TablePath tablePath = TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(), "test_property");
163+
void testCreateTableWithInvalidProperty() throws Exception {
164+
TablePath tablePath = TablePath.of("test_db", "test_create_table_with_property");
165+
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);
166+
167+
TablePath tablePath1 = TablePath.of("test_db", "test_property");
162168
TableDescriptor t1 =
163169
TableDescriptor.builder()
164170
.schema(DEFAULT_SCHEMA)
@@ -167,7 +173,7 @@ void testCreateTableWithInvalidProperty() {
167173
.property("connector", "fluss")
168174
.build();
169175
// should throw exception
170-
assertThatThrownBy(() -> admin.createTable(tablePath, t1, false).get())
176+
assertThatThrownBy(() -> admin.createTable(tablePath1, t1, false).get())
171177
.cause()
172178
.isInstanceOf(InvalidConfigException.class)
173179
.hasMessageContaining("'connector' is not a Fluss table property.");
@@ -180,7 +186,7 @@ void testCreateTableWithInvalidProperty() {
180186
.property("table.log.ttl", "unknown")
181187
.build();
182188
// should throw exception
183-
assertThatThrownBy(() -> admin.createTable(tablePath, t2, false).get())
189+
assertThatThrownBy(() -> admin.createTable(tablePath1, t2, false).get())
184190
.cause()
185191
.isInstanceOf(InvalidConfigException.class)
186192
.hasMessageContaining(
@@ -194,15 +200,21 @@ void testCreateTableWithInvalidProperty() {
194200
.property(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), "0")
195201
.build();
196202
// should throw exception
197-
assertThatThrownBy(() -> admin.createTable(tablePath, t3, false).get())
203+
assertThatThrownBy(() -> admin.createTable(tablePath1, t3, false).get())
198204
.cause()
199205
.isInstanceOf(InvalidConfigException.class)
200206
.hasMessage("'table.log.tiered.local-segments' must be greater than 0.");
201207
}
202208

203209
@Test
204210
void testCreateTableWithInvalidReplicationFactor() throws Exception {
205-
TablePath tablePath = TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(), "t1");
211+
TablePath tablePath =
212+
TablePath.of("test_db", "test_create_table_with_invalid_replication_factor_t1");
213+
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);
214+
215+
TablePath tablePath1 =
216+
TablePath.of("test_db", "test_create_table_with_invalid_replication_factor_t2");
217+
206218
// set replica factor to a non positive number, should also throw exception
207219
TableDescriptor nonPositiveReplicaFactorTable =
208220
TableDescriptor.builder()
@@ -214,7 +226,7 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
214226
// should throw exception
215227
assertThatThrownBy(
216228
() ->
217-
admin.createTable(tablePath, nonPositiveReplicaFactorTable, false)
229+
admin.createTable(tablePath1, nonPositiveReplicaFactorTable, false)
218230
.get())
219231
.cause()
220232
.isInstanceOf(InvalidReplicationFactorException.class)
@@ -234,7 +246,7 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
234246
.customProperty("connector", "fluss")
235247
.property(ConfigOptions.TABLE_REPLICATION_FACTOR.key(), "3")
236248
.build();
237-
assertThatThrownBy(() -> admin.createTable(tablePath, tableDescriptor, false).get())
249+
assertThatThrownBy(() -> admin.createTable(tablePath1, tableDescriptor, false).get())
238250
.cause()
239251
.isInstanceOf(InvalidReplicationFactorException.class)
240252
.hasMessageContaining(
@@ -248,18 +260,21 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
248260
FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();
249261

250262
// we can create the table now
251-
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
252-
TableInfo tableInfo = admin.getTable(DEFAULT_TABLE_PATH).get();
263+
admin.createTable(tablePath1, DEFAULT_TABLE_DESCRIPTOR, false).get();
264+
TableInfo tableInfo = admin.getTable(tablePath).get();
253265
assertThat(tableInfo.getTableDescriptor()).isEqualTo(DEFAULT_TABLE_DESCRIPTOR);
254266
}
255267

256268
@Test
257269
void testCreateExistedTable() throws Exception {
258-
assertThatThrownBy(() -> createTable(DEFAULT_TABLE_PATH, DEFAULT_TABLE_DESCRIPTOR, false))
270+
TablePath tablePath = TablePath.of("test_db", "test_create_table_existed_t1");
271+
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);
272+
273+
assertThatThrownBy(() -> createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false))
259274
.cause()
260275
.isInstanceOf(DatabaseAlreadyExistException.class);
261276
// no exception
262-
createTable(DEFAULT_TABLE_PATH, DEFAULT_TABLE_DESCRIPTOR, true);
277+
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, true);
263278

264279
// database not exists, throw exception
265280
assertThatThrownBy(
@@ -275,6 +290,9 @@ void testCreateExistedTable() throws Exception {
275290

276291
@Test
277292
void testDropDatabaseAndTable() throws Exception {
293+
TablePath tablePath = TablePath.of("test_db", "test_drop_database_and_table_t1");
294+
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);
295+
278296
// drop not existed database with ignoreIfNotExists false.
279297
assertThatThrownBy(() -> admin.deleteDatabase("unknown_db", false, true).get())
280298
.cause()
@@ -294,7 +312,7 @@ void testDropDatabaseAndTable() throws Exception {
294312
assertThat(admin.databaseExists("test_db").get()).isFalse();
295313

296314
// re-create.
297-
createTable(DEFAULT_TABLE_PATH, DEFAULT_TABLE_DESCRIPTOR, false);
315+
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);
298316

299317
// drop not existed table with ignoreIfNotExists false.
300318
assertThatThrownBy(
@@ -308,22 +326,35 @@ void testDropDatabaseAndTable() throws Exception {
308326
admin.deleteTable(TablePath.of("test_db", "unknown_table"), true).get();
309327

310328
// drop existed table.
311-
assertThat(admin.tableExists(DEFAULT_TABLE_PATH).get()).isTrue();
312-
admin.deleteTable(DEFAULT_TABLE_PATH, true).get();
313-
assertThat(admin.tableExists(DEFAULT_TABLE_PATH).get()).isFalse();
329+
assertThat(admin.tableExists(tablePath).get()).isTrue();
330+
admin.deleteTable(tablePath, true).get();
331+
assertThat(admin.tableExists(tablePath).get()).isFalse();
314332
}
315333

316334
@Test
317335
void testListDatabasesAndTables() throws Exception {
336+
TablePath tablePath = TablePath.of("test_db", "test_drop_database_and_table");
337+
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);
338+
318339
admin.createDatabase("db1", true).get();
319340
admin.createDatabase("db2", true).get();
320341
admin.createDatabase("db3", true).get();
321342
assertThat(admin.listDatabases().get())
322343
.containsExactlyInAnyOrder("test_db", "db1", "db2", "db3", "fluss");
323344

324-
admin.createTable(TablePath.of("db1", "table1"), DEFAULT_TABLE_DESCRIPTOR, true).get();
325-
admin.createTable(TablePath.of("db1", "table2"), DEFAULT_TABLE_DESCRIPTOR, true).get();
326-
assertThat(admin.listTables("db1").get()).containsExactlyInAnyOrder("table1", "table2");
345+
admin.createTable(
346+
TablePath.of("db1", "list_database_and_table_t1"),
347+
DEFAULT_TABLE_DESCRIPTOR,
348+
true)
349+
.get();
350+
admin.createTable(
351+
TablePath.of("db1", "list_database_and_table_t2"),
352+
DEFAULT_TABLE_DESCRIPTOR,
353+
true)
354+
.get();
355+
assertThat(admin.listTables("db1").get())
356+
.containsExactlyInAnyOrder(
357+
"list_database_and_table_t1", "list_database_and_table_t2");
327358
assertThat(admin.listTables("db2").get()).isEmpty();
328359

329360
assertThatThrownBy(() -> admin.listTables("unknown_db").get())
@@ -333,8 +364,10 @@ void testListDatabasesAndTables() throws Exception {
333364

334365
@Test
335366
void testListPartitionInfos() throws Exception {
336-
String dbName = DEFAULT_TABLE_PATH.getDatabaseName();
367+
String dbName = "test_db";
337368
TablePath nonPartitionedTablePath = TablePath.of(dbName, "test_non_partitioned_table");
369+
createTable(nonPartitionedTablePath, DEFAULT_TABLE_DESCRIPTOR, false);
370+
338371
admin.createTable(nonPartitionedTablePath, DEFAULT_TABLE_DESCRIPTOR, true).get();
339372
assertThatThrownBy(() -> admin.listPartitionInfos(nonPartitionedTablePath).get())
340373
.cause()
@@ -350,7 +383,7 @@ void testListPartitionInfos() throws Exception {
350383
.column("pt", DataTypes.STRING())
351384
.build())
352385
.comment("test table")
353-
.distributedBy(10, "id")
386+
.distributedBy(3, "id")
354387
.partitionedBy("pt")
355388
.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true)
356389
.property(
@@ -372,8 +405,11 @@ void testListPartitionInfos() throws Exception {
372405

373406
@Test
374407
void testGetKvSnapshot() throws Exception {
375-
TablePath tablePath1 =
376-
TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(), "test-table-snapshot");
408+
TablePath tablePath = TablePath.of("test_db", "test-table-snapshot_t1");
409+
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);
410+
411+
TablePath tablePath1 = TablePath.of("test_db", "test-table-snapshot_t2");
412+
377413
int bucketNum = 3;
378414
TableDescriptor tableDescriptor =
379415
TableDescriptor.builder()

fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/FlussLogScannerITCase.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.alibaba.fluss.types.DataTypes;
3232
import com.alibaba.fluss.types.RowType;
3333

34+
import org.junit.jupiter.api.BeforeEach;
3435
import org.junit.jupiter.api.Test;
3536
import org.junit.jupiter.params.ParameterizedTest;
3637
import org.junit.jupiter.params.provider.ValueSource;
@@ -46,7 +47,6 @@
4647
import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE;
4748
import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA;
4849
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO;
49-
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
5050
import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow;
5151
import static com.alibaba.fluss.testutils.DataTestUtils.row;
5252
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
@@ -55,14 +55,20 @@
5555
/** ITCase for {@link FlussLogScanner}. */
5656
public class FlussLogScannerITCase extends ClientToServerITCaseBase {
5757

58+
@BeforeEach
59+
protected void setup() throws Exception {
60+
super.setup();
61+
}
62+
5863
@Test
5964
void testPoll() throws Exception {
60-
createTable(DATA1_TABLE_PATH, DATA1_TABLE_INFO.getTableDescriptor(), false);
65+
TablePath tablePath = new TablePath("test_db_1", "test_poll_t1");
66+
createTable(tablePath, DATA1_TABLE_INFO.getTableDescriptor(), false);
6167

6268
// append a batch of data.
6369
int recordSize = 10;
6470
List<IndexedRow> expectedRows = new ArrayList<>();
65-
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
71+
try (Table table = conn.getTable(tablePath)) {
6672
AppendWriter appendWriter = table.getAppendWriter();
6773
for (int i = 0; i < recordSize; i++) {
6874
IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {i, "a"});
@@ -89,16 +95,18 @@ void testPoll() throws Exception {
8995

9096
@Test
9197
void testPollWhileCreateTableNotReady() throws Exception {
98+
TablePath tablePath =
99+
new TablePath("test_db_1", "test_poll_while_create_table_not_ready_t1");
92100
// create one table with 100 buckets.
93101
int bucketNumber = 100;
94102
TableDescriptor tableDescriptor =
95103
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(bucketNumber).build();
96-
createTable(DATA1_TABLE_PATH, tableDescriptor, false);
104+
createTable(tablePath, tableDescriptor, false);
97105

98106
// append a batch of data.
99107
int recordSize = 10;
100108
List<IndexedRow> expectedRows = new ArrayList<>();
101-
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
109+
try (Table table = conn.getTable(tablePath)) {
102110
AppendWriter appendWriter = table.getAppendWriter();
103111
for (int i = 0; i < recordSize; i++) {
104112
IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {i, "a"});
@@ -125,12 +133,13 @@ void testPollWhileCreateTableNotReady() throws Exception {
125133

126134
@Test
127135
void testLogScannerMultiThreadAccess() throws Exception {
128-
createTable(DATA1_TABLE_PATH, DATA1_TABLE_INFO.getTableDescriptor(), false);
136+
TablePath tablePath = new TablePath("test_db_1", "test_log_scanner_multi_thread_access_t1");
137+
createTable(tablePath, DATA1_TABLE_INFO.getTableDescriptor(), false);
129138

130139
// append a batch of data.
131140
int recordSize = 10;
132141
List<IndexedRow> expectedRows = new ArrayList<>();
133-
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
142+
try (Table table = conn.getTable(tablePath)) {
134143
AppendWriter appendWriter = table.getAppendWriter();
135144
for (int i = 0; i < recordSize; i++) {
136145
IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {i, "a"});
@@ -166,7 +175,7 @@ void testLogScannerMultiThreadAccess() throws Exception {
166175
@Test
167176
void testLogHeavyWriteAndScan() throws Exception {
168177
final String db = "db";
169-
final String tbl = "kv_heavy_table";
178+
final String tbl = "log_heavy_table_and_scan";
170179
// create table
171180
TableDescriptor descriptor =
172181
TableDescriptor.builder()
@@ -220,7 +229,7 @@ void testLogHeavyWriteAndScan() throws Exception {
220229
@Test
221230
void testKvHeavyWriteAndScan() throws Exception {
222231
final String db = "db";
223-
final String tbl = "kv_heavy_table";
232+
final String tbl = "kv_heavy_table_and_scan";
224233
// create table
225234
TableDescriptor descriptor =
226235
TableDescriptor.builder()

0 commit comments

Comments
 (0)