Skip to content

Commit ea30fd6

Browse files
authored
[chore][improve][fluss] Improve flaky test case FlussSinkITCase (#4256)
1 parent 7f2fd1f commit ea30fd6

1 file changed

Lines changed: 26 additions & 1 deletion

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import org.apache.flink.types.Row;
4141
import org.apache.flink.util.CloseableIterator;
4242

43+
import com.alibaba.fluss.client.Connection;
44+
import com.alibaba.fluss.client.ConnectionFactory;
4345
import com.alibaba.fluss.server.testutils.FlussClusterExtension;
4446
import org.junit.jupiter.api.AfterEach;
4547
import org.junit.jupiter.api.BeforeEach;
@@ -74,7 +76,9 @@ public class FlussSinkITCase extends AbstractTestBase {
7476
protected TableEnvironment tBatchEnv;
7577

7678
@BeforeEach
77-
void before() {
79+
void before() throws Exception {
80+
waitForFlussClusterReady();
81+
7882
// open a catalog so that we can get table from the catalog
7983
String bootstrapServers = FLUSS_CLUSTER_EXTENSION.getBootstrapServers();
8084

@@ -94,6 +98,27 @@ void before() {
9498
tBatchEnv.useDatabase(DEFAULT_DB);
9599
}
96100

101+
private void waitForFlussClusterReady() throws Exception {
102+
int maxRetries = 30;
103+
int retryIntervalMs = 1000;
104+
Exception lastException = null;
105+
106+
for (int i = 0; i < maxRetries; i++) {
107+
try (Connection connection =
108+
ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig())) {
109+
// Connection successful, cluster is ready
110+
return;
111+
} catch (Exception e) {
112+
lastException = e;
113+
Thread.sleep(retryIntervalMs);
114+
}
115+
}
116+
117+
throw new IllegalStateException(
118+
"Failed to connect to Fluss cluster after " + maxRetries + " attempts",
119+
lastException);
120+
}
121+
97122
@AfterEach
98123
void after() {
99124
tBatchEnv.useDatabase(BUILTIN_DATABASE);

0 commit comments

Comments
 (0)