Skip to content

Commit d863bb4

Browse files
authored
[flink] Add restore test for streaming union read log table (apache#1663)
1 parent ee29bc4 commit d863bb4

File tree

6 files changed

+103
-17
lines changed

6 files changed

+103
-17
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public BoundedSplitReader getBoundedSplitScanner(SourceSplitBase split) {
6868
LakeSnapshotScanner lakeSnapshotScanner =
6969
new LakeSnapshotScanner(lakeSource, lakeSnapshotSplit);
7070
return new BoundedSplitReader(
71-
lakeSnapshotScanner, lakeSnapshotSplit.getRecordsToSplit());
71+
lakeSnapshotScanner, lakeSnapshotSplit.getRecordsToSkip());
7272
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
7373
LakeSnapshotAndFlussLogSplit lakeSplit = (LakeSnapshotAndFlussLogSplit) split;
7474
return new BoundedSplitReader(getBatchScanner(lakeSplit), lakeSplit.getRecordsToSkip());

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class LakeSnapshotSplit extends SourceSplitBase {
3030

3131
private final LakeSplit lakeSplit;
3232

33-
private final long recordsToSplit;
33+
private final long recordsToSkip;
3434

3535
private final int splitIndex;
3636

@@ -47,19 +47,19 @@ public LakeSnapshotSplit(
4747
@Nullable String partitionName,
4848
LakeSplit lakeSplit,
4949
int splitIndex,
50-
long recordsToSplit) {
50+
long recordsToSkip) {
5151
super(tableBucket, partitionName);
5252
this.lakeSplit = lakeSplit;
5353
this.splitIndex = splitIndex;
54-
this.recordsToSplit = recordsToSplit;
54+
this.recordsToSkip = recordsToSkip;
5555
}
5656

5757
public LakeSplit getLakeSplit() {
5858
return lakeSplit;
5959
}
6060

61-
public long getRecordsToSplit() {
62-
return recordsToSplit;
61+
public long getRecordsToSkip() {
62+
return recordsToSkip;
6363
}
6464

6565
public int getSplitIndex() {
@@ -93,8 +93,8 @@ public String toString() {
9393
return "LakeSnapshotSplit{"
9494
+ "lakeSplit="
9595
+ lakeSplit
96-
+ ", recordsToSplit="
97-
+ recordsToSplit
96+
+ ", recordsToSkip="
97+
+ recordsToSkip
9898
+ ", splitIndex="
9999
+ splitIndex
100100
+ ", tableBucket="

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,16 @@
2525
public class LakeSnapshotSplitState extends SourceSplitState {
2626

2727
private final LakeSnapshotSplit split;
28-
private long recordsToSplit;
28+
private long recordsToSkip;
2929

3030
public LakeSnapshotSplitState(LakeSnapshotSplit split) {
3131
super(split);
3232
this.split = split;
33-
this.recordsToSplit = split.getRecordsToSplit();
33+
this.recordsToSkip = split.getRecordsToSkip();
3434
}
3535

3636
public void setRecordsToSkip(long recordsToSkip) {
37-
this.recordsToSplit = recordsToSkip;
37+
this.recordsToSkip = recordsToSkip;
3838
}
3939

4040
@Override
@@ -44,6 +44,6 @@ public SourceSplitBase toSourceSplit() {
4444
split.getPartitionName(),
4545
split.getLakeSplit(),
4646
split.getSplitIndex(),
47-
recordsToSplit);
47+
recordsToSkip);
4848
}
4949
}

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,20 @@
2424
import org.apache.fluss.row.TimestampNtz;
2525

2626
import org.apache.flink.core.execution.JobClient;
27+
import org.apache.flink.core.execution.SavepointFormatType;
28+
import org.apache.flink.table.api.TableResult;
29+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
2730
import org.apache.flink.types.Row;
2831
import org.apache.flink.util.CloseableIterator;
2932
import org.apache.flink.util.CollectionUtil;
3033
import org.junit.jupiter.api.BeforeAll;
34+
import org.junit.jupiter.api.io.TempDir;
3135
import org.junit.jupiter.params.ParameterizedTest;
3236
import org.junit.jupiter.params.provider.ValueSource;
3337

3438
import javax.annotation.Nullable;
3539

40+
import java.io.File;
3641
import java.time.Instant;
3742
import java.time.LocalDateTime;
3843
import java.time.ZoneId;
@@ -42,13 +47,16 @@
4247
import java.util.Map;
4348
import java.util.stream.Collectors;
4449

50+
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
4551
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
4652
import static org.apache.fluss.testutils.DataTestUtils.row;
4753
import static org.assertj.core.api.Assertions.assertThat;
4854

4955
/** The IT case for Flink union data in lake and fluss for log table. */
5056
class FlinkUnionReadLogTableITCase extends FlinkUnionReadTestBase {
5157

58+
@TempDir public static File savepointDir;
59+
5260
@BeforeAll
5361
protected static void beforeAll() {
5462
FlinkUnionReadTestBase.beforeAll();
@@ -169,6 +177,63 @@ void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception {
169177
actual, writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
170178
}
171179

180+
@ParameterizedTest
181+
@ValueSource(booleans = {false, true})
182+
void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception {
183+
// first of all, start tiering
184+
JobClient jobClient = buildTieringJob(execEnv);
185+
186+
String tableName1 =
187+
"restore_logTable_" + (isPartitioned ? "partitioned" : "non_partitioned");
188+
String resultTableName =
189+
"result_table" + (isPartitioned ? "partitioned" : "non_partitioned");
190+
191+
TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
192+
TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
193+
List<Row> writtenRows = new LinkedList<>();
194+
long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM, isPartitioned, writtenRows);
195+
// wait until records has been synced
196+
waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
197+
198+
StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
199+
// now, start to read the log table to write to a fluss result table
200+
// may read fluss or not, depends on the log offset of paimon snapshot
201+
createFullTypeLogTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned, false);
202+
TableResult insertResult =
203+
streamTEnv.executeSql(
204+
"insert into " + resultTableName + " select * from " + tableName1);
205+
206+
CloseableIterator<Row> actual =
207+
streamTEnv.executeSql("select * from " + resultTableName).collect();
208+
assertResultsExactOrder(actual, writtenRows, false);
209+
210+
// now, stop the job with save point
211+
String savepointPath =
212+
insertResult
213+
.getJobClient()
214+
.get()
215+
.stopWithSavepoint(
216+
false,
217+
savepointDir.getAbsolutePath(),
218+
SavepointFormatType.CANONICAL)
219+
.get();
220+
221+
// re buildSteamTEnv
222+
streamTEnv = buildSteamTEnv(savepointPath);
223+
insertResult =
224+
streamTEnv.executeSql(
225+
"insert into " + resultTableName + " select * from " + tableName1);
226+
227+
// write some log data again
228+
List<Row> rows = writeRows(table1, 3, isPartitioned);
229+
230+
assertResultsExactOrder(actual, rows, true);
231+
232+
// cancel jobs
233+
insertResult.getJobClient().get().cancel().get();
234+
jobClient.cancel().get();
235+
}
236+
172237
private long prepareLogTable(
173238
TablePath tablePath, int bucketNum, boolean isPartitioned, List<Row> flinkRows)
174239
throws Exception {

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
2222
import org.apache.fluss.server.testutils.FlussClusterExtension;
2323

24+
import org.apache.flink.configuration.Configuration;
2425
import org.apache.flink.table.api.EnvironmentSettings;
2526
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
2627
import org.junit.jupiter.api.BeforeAll;
2728
import org.junit.jupiter.api.BeforeEach;
2829
import org.junit.jupiter.api.extension.RegisterExtension;
2930

31+
import javax.annotation.Nullable;
32+
3033
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
3134

3235
/** Base class for Flink union read test. */
@@ -60,7 +63,12 @@ protected FlussClusterExtension getFlussClusterExtension() {
6063
return FLUSS_CLUSTER_EXTENSION;
6164
}
6265

63-
private void buildStreamTEnv() {
66+
protected StreamTableEnvironment buildSteamTEnv(@Nullable String savepointPath) {
67+
Configuration conf = new Configuration();
68+
if (savepointPath != null) {
69+
conf.setString("execution.savepoint.path", savepointPath);
70+
execEnv.configure(conf);
71+
}
6472
String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
6573
// create table environment
6674
streamTEnv = StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode());
@@ -71,6 +79,11 @@ private void buildStreamTEnv() {
7179
CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers));
7280
streamTEnv.executeSql("use catalog " + CATALOG_NAME);
7381
streamTEnv.executeSql("use " + DEFAULT_DB);
82+
return streamTEnv;
83+
}
84+
85+
private void buildStreamTEnv() {
86+
buildSteamTEnv(null);
7487
}
7588

7689
public void buildBatchTEnv() {

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,12 @@ protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPart
282282

283283
protected long createFullTypeLogTable(TablePath tablePath, int bucketNum, boolean isPartitioned)
284284
throws Exception {
285+
return createFullTypeLogTable(tablePath, bucketNum, isPartitioned, true);
286+
}
287+
288+
protected long createFullTypeLogTable(
289+
TablePath tablePath, int bucketNum, boolean isPartitioned, boolean lakeEnabled)
290+
throws Exception {
285291
Schema.Builder schemaBuilder =
286292
Schema.newBuilder()
287293
.column("f_boolean", DataTypes.BOOLEAN())
@@ -301,10 +307,12 @@ protected long createFullTypeLogTable(TablePath tablePath, int bucketNum, boolea
301307
.column("f_binary", DataTypes.BINARY(4));
302308

303309
TableDescriptor.Builder tableBuilder =
304-
TableDescriptor.builder()
305-
.distributedBy(bucketNum, "f_int")
306-
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
307-
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
310+
TableDescriptor.builder().distributedBy(bucketNum, "f_int");
311+
if (lakeEnabled) {
312+
tableBuilder
313+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
314+
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
315+
}
308316

309317
if (isPartitioned) {
310318
schemaBuilder.column("p", DataTypes.STRING());

0 commit comments

Comments
 (0)