Skip to content

Commit 2e1577e

Browse files
authored
Flink: Backport support source watermark for flink sql windows (#12697)
backports (#12191)
1 parent 9635fb4 commit 2e1577e

File tree

6 files changed

+166
-4
lines changed

6 files changed

+166
-4
lines changed

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@
3535
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
3636
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
3737
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
38+
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
3839
import org.apache.flink.table.data.RowData;
3940
import org.apache.flink.table.expressions.ResolvedExpression;
4041
import org.apache.flink.table.types.DataType;
4142
import org.apache.iceberg.expressions.Expression;
4243
import org.apache.iceberg.flink.FlinkConfigOptions;
4344
import org.apache.iceberg.flink.FlinkFilters;
45+
import org.apache.iceberg.flink.FlinkReadOptions;
4446
import org.apache.iceberg.flink.TableLoader;
4547
import org.apache.iceberg.flink.source.assigner.SplitAssignerType;
4648
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -53,7 +55,8 @@ public class IcebergTableSource
5355
implements ScanTableSource,
5456
SupportsProjectionPushDown,
5557
SupportsFilterPushDown,
56-
SupportsLimitPushDown {
58+
SupportsLimitPushDown,
59+
SupportsSourceWatermark {
5760

5861
private int[] projectedFields;
5962
private Long limit;
@@ -175,6 +178,17 @@ public Result applyFilters(List<ResolvedExpression> flinkFilters) {
175178
return Result.of(acceptedFilters, flinkFilters);
176179
}
177180

181+
@Override
182+
public void applySourceWatermark() {
183+
Preconditions.checkArgument(
184+
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE),
185+
"Source watermarks are supported only in flip-27 iceberg source implementation");
186+
187+
Preconditions.checkNotNull(
188+
properties.get(FlinkReadOptions.WATERMARK_COLUMN),
189+
"watermark-column needs to be configured to use source watermark.");
190+
}
191+
178192
@Override
179193
public boolean supportsNestedProjection() {
180194
// TODO: support nested projection

flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java

+53-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.iceberg.flink.source;
2020

2121
import static org.apache.iceberg.types.Types.NestedField.required;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2223

2324
import java.io.IOException;
2425
import java.time.Instant;
@@ -40,6 +41,7 @@
4041
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4142
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4243
import org.apache.iceberg.types.Types;
44+
import org.junit.jupiter.api.AfterEach;
4345
import org.junit.jupiter.api.BeforeEach;
4446
import org.junit.jupiter.api.Test;
4547

@@ -53,7 +55,11 @@ public class TestIcebergSourceSql extends TestSqlBase {
5355
@BeforeEach
5456
@Override
5557
public void before() throws IOException {
56-
TableEnvironment tableEnvironment = getTableEnv();
58+
setUpTableEnv(getTableEnv());
59+
setUpTableEnv(getStreamingTableEnv());
60+
}
61+
62+
private static void setUpTableEnv(TableEnvironment tableEnvironment) {
5763
Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
5864
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true);
5965
// Disable inferring parallelism to avoid interfering watermark tests
@@ -72,6 +78,11 @@ public void before() throws IOException {
7278
tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
7379
}
7480

81+
@AfterEach
82+
public void after() throws IOException {
83+
CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
84+
}
85+
7586
private Record generateRecord(Instant t1, long t2) {
7687
Record record = GenericRecord.create(SCHEMA_TS);
7788
record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
@@ -178,4 +189,45 @@ public void testReadFlinkDynamicTable() throws Exception {
178189
expected,
179190
SCHEMA_TS);
180191
}
192+
193+
@Test
194+
public void testWatermarkInvalidConfig() {
195+
CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);
196+
197+
String flinkTable = "`default_catalog`.`default_database`.flink_table";
198+
SqlHelpers.sql(
199+
getStreamingTableEnv(),
200+
"CREATE TABLE %s "
201+
+ "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
202+
+ "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) LIKE iceberg_catalog.`default`.%s",
203+
flinkTable,
204+
TestFixtures.TABLE);
205+
206+
assertThatThrownBy(() -> SqlHelpers.sql(getStreamingTableEnv(), "SELECT * FROM %s", flinkTable))
207+
.isInstanceOf(NullPointerException.class)
208+
.hasMessage("watermark-column needs to be configured to use source watermark.");
209+
}
210+
211+
@Test
212+
public void testWatermarkValidConfig() throws Exception {
213+
List<Record> expected = generateExpectedRecords(true);
214+
215+
String flinkTable = "`default_catalog`.`default_database`.flink_table";
216+
217+
SqlHelpers.sql(
218+
getStreamingTableEnv(),
219+
"CREATE TABLE %s "
220+
+ "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
221+
+ "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH ('watermark-column'='t1') LIKE iceberg_catalog.`default`.%s",
222+
flinkTable,
223+
TestFixtures.TABLE);
224+
225+
TestHelpers.assertRecordsWithOrder(
226+
SqlHelpers.sql(
227+
getStreamingTableEnv(),
228+
"SELECT t1, t2 FROM TABLE(TUMBLE(TABLE %s, DESCRIPTOR(eventTS), INTERVAL '1' SECOND))",
229+
flinkTable),
230+
expected,
231+
SCHEMA_TS);
232+
}
181233
}

flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java

+15
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public abstract class TestSqlBase {
6363

6464
private volatile TableEnvironment tEnv;
6565

66+
private volatile TableEnvironment streamingTEnv;
67+
6668
protected TableEnvironment getTableEnv() {
6769
if (tEnv == null) {
6870
synchronized (this) {
@@ -75,6 +77,19 @@ protected TableEnvironment getTableEnv() {
7577
return tEnv;
7678
}
7779

80+
protected TableEnvironment getStreamingTableEnv() {
81+
if (streamingTEnv == null) {
82+
synchronized (this) {
83+
if (streamingTEnv == null) {
84+
this.streamingTEnv =
85+
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
86+
}
87+
}
88+
}
89+
90+
return streamingTEnv;
91+
}
92+
7893
@BeforeEach
7994
public abstract void before() throws IOException;
8095

flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@
3535
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
3636
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
3737
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
38+
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
3839
import org.apache.flink.table.data.RowData;
3940
import org.apache.flink.table.expressions.ResolvedExpression;
4041
import org.apache.flink.table.types.DataType;
4142
import org.apache.iceberg.expressions.Expression;
4243
import org.apache.iceberg.flink.FlinkConfigOptions;
4344
import org.apache.iceberg.flink.FlinkFilters;
45+
import org.apache.iceberg.flink.FlinkReadOptions;
4446
import org.apache.iceberg.flink.TableLoader;
4547
import org.apache.iceberg.flink.source.assigner.SplitAssignerType;
4648
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -53,7 +55,8 @@ public class IcebergTableSource
5355
implements ScanTableSource,
5456
SupportsProjectionPushDown,
5557
SupportsFilterPushDown,
56-
SupportsLimitPushDown {
58+
SupportsLimitPushDown,
59+
SupportsSourceWatermark {
5760

5861
private int[] projectedFields;
5962
private Long limit;
@@ -175,6 +178,17 @@ public Result applyFilters(List<ResolvedExpression> flinkFilters) {
175178
return Result.of(acceptedFilters, flinkFilters);
176179
}
177180

181+
@Override
182+
public void applySourceWatermark() {
183+
Preconditions.checkArgument(
184+
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE),
185+
"Source watermarks are supported only in flip-27 iceberg source implementation");
186+
187+
Preconditions.checkNotNull(
188+
properties.get(FlinkReadOptions.WATERMARK_COLUMN),
189+
"watermark-column needs to be configured to use source watermark.");
190+
}
191+
178192
@Override
179193
public boolean supportsNestedProjection() {
180194
// TODO: support nested projection

flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java

+53-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.iceberg.flink.source;
2020

2121
import static org.apache.iceberg.types.Types.NestedField.required;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2223

2324
import java.io.IOException;
2425
import java.time.Instant;
@@ -40,6 +41,7 @@
4041
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4142
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4243
import org.apache.iceberg.types.Types;
44+
import org.junit.jupiter.api.AfterEach;
4345
import org.junit.jupiter.api.BeforeEach;
4446
import org.junit.jupiter.api.Test;
4547

@@ -53,7 +55,11 @@ public class TestIcebergSourceSql extends TestSqlBase {
5355
@BeforeEach
5456
@Override
5557
public void before() throws IOException {
56-
TableEnvironment tableEnvironment = getTableEnv();
58+
setUpTableEnv(getTableEnv());
59+
setUpTableEnv(getStreamingTableEnv());
60+
}
61+
62+
private static void setUpTableEnv(TableEnvironment tableEnvironment) {
5763
Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
5864
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true);
5965
// Disable inferring parallelism to avoid interfering watermark tests
@@ -72,6 +78,11 @@ public void before() throws IOException {
7278
tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
7379
}
7480

81+
@AfterEach
82+
public void after() throws IOException {
83+
CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
84+
}
85+
7586
private Record generateRecord(Instant t1, long t2) {
7687
Record record = GenericRecord.create(SCHEMA_TS);
7788
record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
@@ -178,4 +189,45 @@ public void testReadFlinkDynamicTable() throws Exception {
178189
expected,
179190
SCHEMA_TS);
180191
}
192+
193+
@Test
194+
public void testWatermarkInvalidConfig() {
195+
CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);
196+
197+
String flinkTable = "`default_catalog`.`default_database`.flink_table";
198+
SqlHelpers.sql(
199+
getStreamingTableEnv(),
200+
"CREATE TABLE %s "
201+
+ "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
202+
+ "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) LIKE iceberg_catalog.`default`.%s",
203+
flinkTable,
204+
TestFixtures.TABLE);
205+
206+
assertThatThrownBy(() -> SqlHelpers.sql(getStreamingTableEnv(), "SELECT * FROM %s", flinkTable))
207+
.isInstanceOf(NullPointerException.class)
208+
.hasMessage("watermark-column needs to be configured to use source watermark.");
209+
}
210+
211+
@Test
212+
public void testWatermarkValidConfig() throws Exception {
213+
List<Record> expected = generateExpectedRecords(true);
214+
215+
String flinkTable = "`default_catalog`.`default_database`.flink_table";
216+
217+
SqlHelpers.sql(
218+
getStreamingTableEnv(),
219+
"CREATE TABLE %s "
220+
+ "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
221+
+ "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH ('watermark-column'='t1') LIKE iceberg_catalog.`default`.%s",
222+
flinkTable,
223+
TestFixtures.TABLE);
224+
225+
TestHelpers.assertRecordsWithOrder(
226+
SqlHelpers.sql(
227+
getStreamingTableEnv(),
228+
"SELECT t1, t2 FROM TABLE(TUMBLE(TABLE %s, DESCRIPTOR(eventTS), INTERVAL '1' SECOND))",
229+
flinkTable),
230+
expected,
231+
SCHEMA_TS);
232+
}
181233
}

flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java

+15
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public abstract class TestSqlBase {
6363

6464
private volatile TableEnvironment tEnv;
6565

66+
private volatile TableEnvironment streamingTEnv;
67+
6668
protected TableEnvironment getTableEnv() {
6769
if (tEnv == null) {
6870
synchronized (this) {
@@ -75,6 +77,19 @@ protected TableEnvironment getTableEnv() {
7577
return tEnv;
7678
}
7779

80+
protected TableEnvironment getStreamingTableEnv() {
81+
if (streamingTEnv == null) {
82+
synchronized (this) {
83+
if (streamingTEnv == null) {
84+
this.streamingTEnv =
85+
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
86+
}
87+
}
88+
}
89+
90+
return streamingTEnv;
91+
}
92+
7893
@BeforeEach
7994
public abstract void before() throws IOException;
8095

0 commit comments

Comments
 (0)