3030import org .apache .fluss .types .DataTypes ;
3131
3232import org .apache .flink .core .execution .JobClient ;
33+ import org .apache .flink .core .execution .SavepointFormatType ;
34+ import org .apache .flink .table .api .TableResult ;
35+ import org .apache .flink .table .api .bridge .java .StreamTableEnvironment ;
3336import org .apache .flink .types .Row ;
37+ import org .apache .flink .util .CloseableIterator ;
3438import org .apache .flink .util .CollectionUtil ;
3539import org .junit .jupiter .api .BeforeAll ;
40+ import org .junit .jupiter .api .io .TempDir ;
3641import org .junit .jupiter .params .ParameterizedTest ;
3742import org .junit .jupiter .params .provider .ValueSource ;
3843
3944import javax .annotation .Nullable ;
4045
46+ import java .io .File ;
4147import java .time .Duration ;
4248import java .time .Instant ;
4349import java .time .LocalDateTime ;
4450import java .time .ZoneId ;
4551import java .util .ArrayList ;
52+ import java .util .LinkedList ;
4653import java .util .List ;
4754import java .util .Map ;
4855import java .util .stream .Collectors ;
4956
57+ import static org .apache .fluss .flink .source .testutils .FlinkRowAssertionsUtils .assertResultsExactOrder ;
58+ import static org .apache .fluss .flink .source .testutils .FlinkRowAssertionsUtils .assertResultsIgnoreOrder ;
59+ import static org .apache .fluss .flink .source .testutils .FlinkRowAssertionsUtils .assertRowResultsIgnoreOrder ;
5060import static org .apache .fluss .testutils .DataTestUtils .row ;
5161import static org .assertj .core .api .Assertions .assertThat ;
5262
5363/** Test union read log table with full type. */
5464public class FlinkUnionReadLogTableITCase extends FlinkUnionReadTestBase {
65+ @ TempDir public static File savepointDir ;
66+
5567 @ BeforeAll
5668 protected static void beforeAll () {
5769 FlinkUnionReadTestBase .beforeAll ();
@@ -129,6 +141,114 @@ void testReadLogTableFullType(boolean isPartitioned) throws Exception {
129141 }
130142 }
131143
144+ @ ParameterizedTest
145+ @ ValueSource (booleans = {false , true })
146+ void testReadLogTableInStreamMode (boolean isPartitioned ) throws Exception {
147+ // first of all, start tiering
148+ JobClient jobClient = buildTieringJob (execEnv );
149+
150+ String tableName = "stream_logTable_" + (isPartitioned ? "partitioned" : "non_partitioned" );
151+
152+ TablePath t1 = TablePath .of (DEFAULT_DB , tableName );
153+ List <Row > writtenRows = new LinkedList <>();
154+ long tableId = prepareLogTable (t1 , DEFAULT_BUCKET_NUM , isPartitioned , writtenRows );
155+ // wait until records has been synced
156+ waitUntilBucketSynced (t1 , tableId , DEFAULT_BUCKET_NUM , isPartitioned );
157+
158+ // now, start to read the log table, which will read iceberg
159+ // may read fluss or not, depends on the log offset of iceberg snapshot
160+ CloseableIterator <Row > actual =
161+ streamTEnv .executeSql ("select * from " + tableName ).collect ();
162+ assertResultsIgnoreOrder (
163+ actual , writtenRows .stream ().map (Row ::toString ).collect (Collectors .toList ()), true );
164+
165+ // cancel the tiering job
166+ jobClient .cancel ().get ();
167+
168+ // write some log data again
169+ writtenRows .addAll (writeRows (t1 , 3 , isPartitioned ));
170+
171+ // query the log table again and check the data
172+ // it should read both iceberg snapshot and fluss log
173+ actual =
174+ streamTEnv
175+ .executeSql (
176+ "select * from "
177+ + tableName
178+ + " /*+ OPTIONS('scan.partition.discovery.interval'='100ms') */" )
179+ .collect ();
180+ if (isPartitioned ) {
181+ // we write to a new partition to verify partition discovery
182+ writtenRows .addAll (writeFullTypeRows (t1 , 10 , "3027" ));
183+ }
184+ assertResultsIgnoreOrder (
185+ actual , writtenRows .stream ().map (Row ::toString ).collect (Collectors .toList ()), true );
186+ }
187+
188+ @ ParameterizedTest
189+ @ ValueSource (booleans = {false , true })
190+ void testUnionReadLogTableFailover (boolean isPartitioned ) throws Exception {
191+ // first of all, start tiering
192+ JobClient jobClient = buildTieringJob (execEnv );
193+
194+ String tableName1 =
195+ "restore_logTable_" + (isPartitioned ? "partitioned" : "non_partitioned" );
196+ String resultTableName =
197+ "result_table" + (isPartitioned ? "partitioned" : "non_partitioned" );
198+
199+ TablePath table1 = TablePath .of (DEFAULT_DB , tableName1 );
200+ TablePath resultTable = TablePath .of (DEFAULT_DB , resultTableName );
201+ List <Row > writtenRows = new LinkedList <>();
202+ long tableId = prepareLogTable (table1 , DEFAULT_BUCKET_NUM , isPartitioned , writtenRows );
203+ // wait until records has been synced
204+ waitUntilBucketSynced (table1 , tableId , DEFAULT_BUCKET_NUM , isPartitioned );
205+
206+ StreamTableEnvironment streamTEnv = buildStreamTEnv (null );
207+ // now, start to read the log table to write to a fluss result table
208+ // may read fluss or not, depends on the log offset of iceberg snapshot
209+ createFullTypeLogTable (resultTable , DEFAULT_BUCKET_NUM , isPartitioned , false );
210+ TableResult insertResult =
211+ streamTEnv .executeSql (
212+ "insert into " + resultTableName + " select * from " + tableName1 );
213+
214+ CloseableIterator <Row > actual =
215+ streamTEnv .executeSql ("select * from " + resultTableName ).collect ();
216+ if (isPartitioned ) {
217+ assertRowResultsIgnoreOrder (actual , writtenRows , false );
218+ } else {
219+ assertResultsExactOrder (actual , writtenRows , false );
220+ }
221+
222+ // now, stop the job with save point
223+ String savepointPath =
224+ insertResult
225+ .getJobClient ()
226+ .get ()
227+ .stopWithSavepoint (
228+ false ,
229+ savepointDir .getAbsolutePath (),
230+ SavepointFormatType .CANONICAL )
231+ .get ();
232+
233+ // re buildStreamTEnv
234+ streamTEnv = buildStreamTEnv (savepointPath );
235+ insertResult =
236+ streamTEnv .executeSql (
237+ "insert into " + resultTableName + " select * from " + tableName1 );
238+
239+ // write some log data again
240+ List <Row > rows = writeRows (table1 , 3 , isPartitioned );
241+ if (isPartitioned ) {
242+ assertRowResultsIgnoreOrder (actual , rows , true );
243+ } else {
244+ assertResultsExactOrder (actual , rows , true );
245+ }
246+
247+ // cancel jobs
248+ insertResult .getJobClient ().get ().cancel ().get ();
249+ jobClient .cancel ().get ();
250+ }
251+
132252 private long prepareLogTable (
133253 TablePath tablePath , int bucketNum , boolean isPartitioned , List <Row > flinkRows )
134254 throws Exception {
@@ -152,6 +272,12 @@ private long prepareLogTable(
152272
153273 protected long createFullTypeLogTable (TablePath tablePath , int bucketNum , boolean isPartitioned )
154274 throws Exception {
275+ return createFullTypeLogTable (tablePath , bucketNum , isPartitioned , true );
276+ }
277+
278+ protected long createFullTypeLogTable (
279+ TablePath tablePath , int bucketNum , boolean isPartitioned , boolean lakeEnabled )
280+ throws Exception {
155281 Schema .Builder schemaBuilder =
156282 Schema .newBuilder ()
157283 .column ("f_boolean" , DataTypes .BOOLEAN ())
@@ -176,6 +302,12 @@ protected long createFullTypeLogTable(TablePath tablePath, int bucketNum, boolea
176302 .property (ConfigOptions .TABLE_DATALAKE_ENABLED .key (), "true" )
177303 .property (ConfigOptions .TABLE_DATALAKE_FRESHNESS , Duration .ofMillis (500 ));
178304
305+ if (lakeEnabled ) {
306+ tableBuilder
307+ .property (ConfigOptions .TABLE_DATALAKE_ENABLED .key (), "true" )
308+ .property (ConfigOptions .TABLE_DATALAKE_FRESHNESS , Duration .ofMillis (500 ));
309+ }
310+
179311 if (isPartitioned ) {
180312 schemaBuilder .column ("p" , DataTypes .STRING ());
181313 tableBuilder .property (ConfigOptions .TABLE_AUTO_PARTITION_ENABLED , true );
0 commit comments