Skip to content

Commit 0cfd2be

Browse files
committed
[test] Fix some unstable test cases while running CI
1 parent bb8b1ea commit 0cfd2be

File tree

34 files changed

+747
-382
lines changed

34 files changed

+747
-382
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,15 +212,17 @@ private int getBucketId(byte[] keyBytes, InternalRow key) {
212212
public CompletableFuture<List<ScanRecord>> limitScan(
213213
TableBucket tableBucket, int limit, @Nullable int[] projectedFields) {
214214
// because that rocksdb is not suitable to projection, thus do it in client.
215-
int leader = metadataUpdater.leaderFor(tableBucket);
216215
LimitScanRequest limitScanRequest =
217216
new LimitScanRequest()
218217
.setTableId(tableBucket.getTableId())
219218
.setBucketId(tableBucket.getBucket())
220219
.setLimit(limit);
221220
if (tableBucket.getPartitionId() != null) {
222221
limitScanRequest.setPartitionId(tableBucket.getPartitionId());
222+
metadataUpdater.checkAndUpdateMetadata(tablePath, tableBucket);
223223
}
224+
225+
int leader = metadataUpdater.leaderFor(tableBucket);
224226
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader);
225227

226228
CompletableFuture<List<ScanRecord>> future = new CompletableFuture<>();

fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java renamed to fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseUtils.java

Lines changed: 15 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package com.alibaba.fluss.client.admin;
1818

19-
import com.alibaba.fluss.client.Connection;
20-
import com.alibaba.fluss.client.ConnectionFactory;
2119
import com.alibaba.fluss.client.admin.OffsetSpec.LatestSpec;
2220
import com.alibaba.fluss.client.admin.OffsetSpec.TimestampSpec;
2321
import com.alibaba.fluss.client.scanner.ScanRecord;
@@ -33,13 +31,8 @@
3331
import com.alibaba.fluss.metadata.TableDescriptor;
3432
import com.alibaba.fluss.metadata.TablePath;
3533
import com.alibaba.fluss.row.InternalRow;
36-
import com.alibaba.fluss.server.testutils.FlussClusterExtension;
3734
import com.alibaba.fluss.types.RowType;
3835

39-
import org.junit.jupiter.api.AfterEach;
40-
import org.junit.jupiter.api.BeforeEach;
41-
import org.junit.jupiter.api.extension.RegisterExtension;
42-
4336
import javax.annotation.Nullable;
4437

4538
import java.time.Duration;
@@ -55,48 +48,20 @@
5548
* The base test class for client to server request and response. The server include
5649
* CoordinatorServer and TabletServer.
5750
*/
58-
public abstract class ClientToServerITCaseBase {
59-
60-
@RegisterExtension
61-
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
62-
FlussClusterExtension.builder()
63-
.setNumOfTabletServers(3)
64-
.setClusterConf(initConfig())
65-
.build();
66-
67-
protected Connection conn;
68-
protected Admin admin;
69-
protected Configuration clientConf;
70-
71-
@BeforeEach
72-
protected void setup() throws Exception {
73-
clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
74-
conn = ConnectionFactory.createConnection(clientConf);
75-
admin = conn.getAdmin();
76-
}
77-
78-
@AfterEach
79-
protected void teardown() throws Exception {
80-
if (admin != null) {
81-
admin.close();
82-
admin = null;
83-
}
51+
public class ClientToServerITCaseUtils {
8452

85-
if (conn != null) {
86-
conn.close();
87-
conn = null;
88-
}
89-
}
90-
91-
protected long createTable(
92-
TablePath tablePath, TableDescriptor tableDescriptor, boolean ignoreIfExists)
53+
public static long createTable(
54+
Admin admin,
55+
TablePath tablePath,
56+
TableDescriptor tableDescriptor,
57+
boolean ignoreIfExists)
9358
throws Exception {
9459
admin.createDatabase(tablePath.getDatabaseName(), ignoreIfExists).get();
9560
admin.createTable(tablePath, tableDescriptor, ignoreIfExists).get();
9661
return admin.getTable(tablePath).get().getTableId();
9762
}
9863

99-
private static Configuration initConfig() {
64+
public static Configuration initConfig() {
10065
Configuration conf = new Configuration();
10166
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
10267
// set a shorter interval for testing purpose
@@ -109,22 +74,22 @@ private static Configuration initConfig() {
10974
return conf;
11075
}
11176

112-
protected static LogScanner createLogScanner(Table table) {
77+
public static LogScanner createLogScanner(Table table) {
11378
return table.getLogScanner(new LogScan());
11479
}
11580

116-
protected static LogScanner createLogScanner(Table table, int[] projectFields) {
81+
public static LogScanner createLogScanner(Table table, int[] projectFields) {
11782
return table.getLogScanner(new LogScan().withProjectedFields(projectFields));
11883
}
11984

120-
protected static void subscribeFromBeginning(LogScanner logScanner, Table table) {
85+
public static void subscribeFromBeginning(LogScanner logScanner, Table table) {
12186
int bucketCount = getBucketCount(table);
12287
for (int i = 0; i < bucketCount; i++) {
12388
logScanner.subscribeFromBeginning(i);
12489
}
12590
}
12691

127-
protected static void subscribeFromTimestamp(
92+
public static void subscribeFromTimestamp(
12893
PhysicalTablePath physicalTablePath,
12994
@Nullable Long partitionId,
13095
Table table,
@@ -147,7 +112,7 @@ protected static void subscribeFromTimestamp(
147112
}
148113
}
149114

150-
protected static void subscribeFromLatestOffset(
115+
public static void subscribeFromLatestOffset(
151116
PhysicalTablePath physicalTablePath,
152117
@Nullable Long partitionId,
153118
Table table,
@@ -166,7 +131,7 @@ protected static void subscribeFromLatestOffset(
166131
}
167132
}
168133

169-
protected static List<Integer> getAllBuckets(Table table) {
134+
public static List<Integer> getAllBuckets(Table table) {
170135
List<Integer> buckets = new ArrayList<>();
171136
int bucketCount = getBucketCount(table);
172137
for (int i = 0; i < bucketCount; i++) {
@@ -175,7 +140,7 @@ protected static List<Integer> getAllBuckets(Table table) {
175140
return buckets;
176141
}
177142

178-
private static int getBucketCount(Table table) {
143+
public static int getBucketCount(Table table) {
179144
return table.getDescriptor()
180145
.getTableDistribution()
181146
.flatMap(TableDescriptor.TableDistribution::getBucketCount)
@@ -211,7 +176,7 @@ public static void verifyPartitionLogs(
211176
verifyRows(rowType, actualRows, expectPartitionsRows);
212177
}
213178

214-
protected static void verifyRows(
179+
public static void verifyRows(
215180
RowType rowType,
216181
Map<Long, List<InternalRow>> actualRows,
217182
Map<Long, List<InternalRow>> expectedRows) {

0 commit comments

Comments
 (0)