Skip to content

Commit 184eb8a

Browse files
MehulBatraMehul Batra
andauthored
[flink] Support $lake table for iceberg (#1812)
--------- Co-authored-by: Mehul Batra <[email protected]>
1 parent ab80723 commit 184eb8a

File tree

6 files changed

+193
-22
lines changed

6 files changed

+193
-22
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public FlinkCatalog(
135135

136136
@Override
137137
public Optional<Factory> getFactory() {
138-
return Optional.of(new FlinkTableFactory());
138+
return Optional.of(new FlinkTableFactory(lakeCatalog));
139139
}
140140

141141
@Override
@@ -336,7 +336,8 @@ protected CatalogBaseTable getLakeTable(
336336
// should be pattern like table_name$lake
337337
tableName = tableComponents[0];
338338
} else {
339-
// be something like table_name$lake$snapshot
339+
// pattern is table_name$lake$snapshots
340+
// Need to reconstruct: table_name + $snapshots
340341
tableName = String.join("", tableComponents);
341342
}
342343
return lakeCatalog

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.config.ConfigOptions;
2121
import org.apache.fluss.config.Configuration;
2222
import org.apache.fluss.flink.FlinkConnectorOptions;
23+
import org.apache.fluss.flink.lake.LakeCatalog;
2324
import org.apache.fluss.flink.lake.LakeTableFactory;
2425
import org.apache.fluss.flink.sink.FlinkTableSink;
2526
import org.apache.fluss.flink.source.FlinkTableSource;
@@ -68,17 +69,25 @@
6869
/** Factory to create table source and table sink for Fluss. */
6970
public class FlinkTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
7071

72+
private final LakeCatalog lakeCatalog;
7173
private volatile LakeTableFactory lakeTableFactory;
7274

75+
public FlinkTableFactory(LakeCatalog lakeCatalog) {
76+
this.lakeCatalog = lakeCatalog;
77+
}
78+
7379
@Override
7480
public DynamicTableSource createDynamicTableSource(Context context) {
7581
// check whether should read from datalake
7682
ObjectIdentifier tableIdentifier = context.getObjectIdentifier();
7783
String tableName = tableIdentifier.getObjectName();
7884
if (tableName.contains(LAKE_TABLE_SPLITTER)) {
79-
tableName = tableName.substring(0, tableName.indexOf(LAKE_TABLE_SPLITTER));
85+
// Extract the lake table name: for "table$lake" -> "table"
86+
// for "table$lake$snapshots" -> "table$snapshots"
87+
String lakeTableName = tableName.replaceFirst("\\$lake", "");
88+
8089
lakeTableFactory = mayInitLakeTableFactory();
81-
return lakeTableFactory.createDynamicTableSource(context, tableName);
90+
return lakeTableFactory.createDynamicTableSource(context, lakeTableName);
8291
}
8392

8493
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
@@ -248,7 +257,7 @@ private LakeTableFactory mayInitLakeTableFactory() {
248257
if (lakeTableFactory == null) {
249258
synchronized (this) {
250259
if (lakeTableFactory == null) {
251-
lakeTableFactory = new LakeTableFactory();
260+
lakeTableFactory = new LakeTableFactory(lakeCatalog);
252261
}
253262
}
254263
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,31 @@ public LakeCatalog(String catalogName, ClassLoader classLoader) {
5050

5151
public Catalog getLakeCatalog(Configuration tableOptions) {
5252
DataLakeFormat lakeFormat = tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
53-
// TODO: Currently, a Fluss cluster only supports a single DataLake storage.
54-
// However, in the
55-
// future, it may support multiple DataLakes. The following code assumes
56-
// that a single
57-
// lakeCatalog is shared across multiple tables, which will no longer be
58-
// valid in such
59-
// cases and should be updated accordingly.
53+
if (lakeFormat == null) {
54+
throw new IllegalArgumentException(
55+
"DataLake format is not specified in table options. "
56+
+ "Please ensure '"
57+
+ ConfigOptions.TABLE_DATALAKE_FORMAT.key()
58+
+ "' is set.");
59+
}
60+
return LAKE_CATALOG_CACHE.computeIfAbsent(
61+
lakeFormat,
62+
(dataLakeFormat) -> {
63+
if (dataLakeFormat == PAIMON) {
64+
return PaimonCatalogFactory.create(catalogName, tableOptions, classLoader);
65+
} else if (dataLakeFormat == ICEBERG) {
66+
return IcebergCatalogFactory.create(catalogName, tableOptions);
67+
} else {
68+
throw new UnsupportedOperationException(
69+
"Unsupported datalake format: " + dataLakeFormat);
70+
}
71+
});
72+
}
73+
74+
public Catalog getLakeCatalog(Configuration tableOptions, DataLakeFormat lakeFormat) {
75+
if (lakeFormat == null) {
76+
throw new IllegalArgumentException("DataLake format cannot be null");
77+
}
6078
return LAKE_CATALOG_CACHE.computeIfAbsent(
6179
lakeFormat,
6280
(dataLakeFormat) -> {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,37 +20,101 @@
2020
import org.apache.flink.table.catalog.ObjectIdentifier;
2121
import org.apache.flink.table.connector.source.DynamicTableSource;
2222
import org.apache.flink.table.factories.DynamicTableFactory;
23+
import org.apache.flink.table.factories.DynamicTableSourceFactory;
2324
import org.apache.flink.table.factories.FactoryUtil;
24-
import org.apache.paimon.flink.FlinkTableFactory;
25+
26+
import java.util.Map;
2527

2628
/** A factory to create {@link DynamicTableSource} for lake table. */
2729
public class LakeTableFactory {
30+
private final LakeCatalog lakeCatalog;
2831

29-
// now, always assume is paimon, todo need to describe lake storage from
30-
// to know which lake storage used
31-
private final org.apache.paimon.flink.FlinkTableFactory paimonFlinkTableFactory;
32-
33-
public LakeTableFactory() {
34-
paimonFlinkTableFactory = new FlinkTableFactory();
32+
public LakeTableFactory(LakeCatalog lakeCatalog) {
33+
this.lakeCatalog = lakeCatalog;
3534
}
3635

3736
public DynamicTableSource createDynamicTableSource(
3837
DynamicTableFactory.Context context, String tableName) {
3938
ObjectIdentifier originIdentifier = context.getObjectIdentifier();
40-
ObjectIdentifier paimonIdentifier =
39+
ObjectIdentifier lakeIdentifier =
4140
ObjectIdentifier.of(
4241
originIdentifier.getCatalogName(),
4342
originIdentifier.getDatabaseName(),
4443
tableName);
44+
45+
// Determine the lake format from the table options
46+
Map<String, String> tableOptions = context.getCatalogTable().getOptions();
47+
48+
// If not present, fallback to 'fluss.table.datalake.format' (set by Fluss)
49+
String connector = tableOptions.get("connector");
50+
if (connector == null) {
51+
connector = tableOptions.get("fluss.table.datalake.format");
52+
}
53+
54+
if (connector == null) {
55+
// For Paimon system tables (like table_name$options), the table options are empty
56+
// Default to Paimon for backward compatibility
57+
connector = "paimon";
58+
}
59+
60+
// For Iceberg and Paimon, pass the table name as-is to their factory.
61+
// Metadata tables will be handled internally by their respective factories.
4562
DynamicTableFactory.Context newContext =
4663
new FactoryUtil.DefaultDynamicTableContext(
47-
paimonIdentifier,
64+
lakeIdentifier,
4865
context.getCatalogTable(),
4966
context.getEnrichmentOptions(),
5067
context.getConfiguration(),
5168
context.getClassLoader(),
5269
context.isTemporary());
5370

54-
return paimonFlinkTableFactory.createDynamicTableSource(newContext);
71+
// Get the appropriate factory based on connector type
72+
DynamicTableSourceFactory factory = getLakeTableFactory(connector, tableOptions);
73+
return factory.createDynamicTableSource(newContext);
74+
}
75+
76+
private DynamicTableSourceFactory getLakeTableFactory(
77+
String connector, Map<String, String> tableOptions) {
78+
if ("paimon".equalsIgnoreCase(connector)) {
79+
return getPaimonFactory();
80+
} else if ("iceberg".equalsIgnoreCase(connector)) {
81+
return getIcebergFactory(tableOptions);
82+
} else {
83+
throw new UnsupportedOperationException(
84+
"Unsupported lake connector: "
85+
+ connector
86+
+ ". Only 'paimon' and 'iceberg' are supported.");
87+
}
88+
}
89+
90+
private DynamicTableSourceFactory getPaimonFactory() {
91+
return new org.apache.paimon.flink.FlinkTableFactory();
92+
}
93+
94+
private DynamicTableSourceFactory getIcebergFactory(Map<String, String> tableOptions) {
95+
try {
96+
// Get the Iceberg FlinkCatalog instance from LakeCatalog
97+
org.apache.fluss.config.Configuration flussConfig =
98+
org.apache.fluss.config.Configuration.fromMap(tableOptions);
99+
100+
// Get catalog with explicit ICEBERG format
101+
org.apache.flink.table.catalog.Catalog catalog =
102+
lakeCatalog.getLakeCatalog(
103+
flussConfig, org.apache.fluss.metadata.DataLakeFormat.ICEBERG);
104+
105+
// Create FlinkDynamicTableFactory with the catalog
106+
Class<?> icebergFactoryClass =
107+
Class.forName("org.apache.iceberg.flink.FlinkDynamicTableFactory");
108+
Class<?> flinkCatalogClass = Class.forName("org.apache.iceberg.flink.FlinkCatalog");
109+
110+
return (DynamicTableSourceFactory)
111+
icebergFactoryClass
112+
.getDeclaredConstructor(flinkCatalogClass)
113+
.newInstance(catalog);
114+
} catch (Exception e) {
115+
throw new RuntimeException(
116+
"Failed to create Iceberg table factory. Please ensure iceberg-flink-runtime is on the classpath.",
117+
e);
118+
}
55119
}
56120
}

fluss-lake/fluss-lake-iceberg/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,12 @@
242242
<version>${iceberg.version}</version>
243243
<scope>test</scope>
244244
</dependency>
245+
246+
<dependency>
247+
<groupId>org.apache.flink</groupId>
248+
<artifactId>flink-connector-files</artifactId>
249+
<scope>test</scope>
250+
</dependency>
245251
</dependencies>
246252

247253
<build>

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@
3232
import org.apache.fluss.types.DataTypes;
3333

3434
import org.apache.flink.core.execution.JobClient;
35+
import org.apache.flink.table.api.TableResult;
3536
import org.apache.flink.types.Row;
3637
import org.apache.flink.types.RowKind;
3738
import org.apache.flink.util.CloseableIterator;
39+
import org.apache.flink.util.CollectionUtil;
3840
import org.junit.jupiter.params.ParameterizedTest;
3941
import org.junit.jupiter.params.provider.ValueSource;
4042

@@ -51,6 +53,7 @@
5153
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
5254
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
5355
import static org.apache.fluss.testutils.DataTestUtils.row;
56+
import static org.assertj.core.api.Assertions.assertThat;
5457

5558
/** Test case for union read primary key table. */
5659
public class FlinkUnionReadPrimaryKeyTableITCase extends FlinkUnionReadTestBase {
@@ -273,6 +276,76 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
273276
}
274277
}
275278

279+
@ParameterizedTest
280+
@ValueSource(booleans = {false, true})
281+
void testReadIcebergLakeTableAndSystemTable(boolean isPartitioned) throws Exception {
282+
// first of all, start tiering
283+
JobClient jobClient = buildTieringJob(execEnv);
284+
285+
String tableName = "lake_pk_table_" + (isPartitioned ? "partitioned" : "non_partitioned");
286+
287+
TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
288+
Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
289+
// create table & write initial data
290+
long tableId =
291+
preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned, bucketLogEndOffset);
292+
293+
// wait until records have been synced to Iceberg
294+
waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
295+
296+
// Test 1: Read Iceberg lake table directly using $lake suffix
297+
TableResult lakeTableResult =
298+
batchTEnv.executeSql(String.format("select * from %s$lake", tableName));
299+
List<Row> icebergRows = CollectionUtil.iteratorToList(lakeTableResult.collect());
300+
301+
// Verify that we can read data from Iceberg via $lake suffix
302+
assertThat(icebergRows).isNotEmpty();
303+
304+
// Note: The expected row count should be based on how many rows were written
305+
// In preparePKTableFullType, we write 2 unique rows (by PK) per iteration, 2 iterations
306+
// Since this is a primary key table, duplicate PKs are deduplicated, so only 2 unique rows
307+
// per partition
308+
int expectedUserRowCount = isPartitioned ? 2 * waitUntilPartitions(t1).size() : 2;
309+
assertThat(icebergRows).hasSize(expectedUserRowCount);
310+
311+
// verify rows have expected number of columns
312+
int userColumnCount = lakeTableResult.getResolvedSchema().getColumnCount();
313+
Row firstRow = icebergRows.get(0);
314+
assertThat(firstRow.getArity())
315+
.as("Iceberg row should have at least user columns")
316+
.isGreaterThanOrEqualTo(userColumnCount);
317+
318+
// Test 2: Read Iceberg system table (snapshots) using $lake$snapshots suffix
319+
TableResult snapshotsResult =
320+
batchTEnv.executeSql(String.format("select * from %s$lake$snapshots", tableName));
321+
List<Row> snapshotRows = CollectionUtil.iteratorToList(snapshotsResult.collect());
322+
323+
// Verify that we can read snapshots from Iceberg via $lake$snapshots suffix
324+
assertThat(snapshotRows).as("Should have at least one snapshot").isNotEmpty();
325+
326+
// Verify snapshot structure based on Iceberg snapshots table schema
327+
// Expected columns: committed_at, snapshot_id, parent_id, operation, manifest_list, summary
328+
Row firstSnapshot = snapshotRows.get(0);
329+
assertThat(firstSnapshot.getArity()).as("Snapshot row should have 6 columns").isEqualTo(6);
330+
331+
// Verify committed_at field (index 0) is not null
332+
assertThat(firstSnapshot.getField(0)).as("committed_at should not be null").isNotNull();
333+
334+
// Verify snapshot_id field (index 1) is not null
335+
assertThat(firstSnapshot.getField(1)).as("snapshot_id should not be null").isNotNull();
336+
337+
// Verify manifest_list field (index 4) is not null and is a string path
338+
assertThat(firstSnapshot.getField(4))
339+
.as("manifest_list should be a non-null path")
340+
.isNotNull()
341+
.isInstanceOf(String.class);
342+
343+
// Verify summary field (index 5) contains expected metadata
344+
assertThat(firstSnapshot.getField(5)).as("summary should not be null").isNotNull();
345+
346+
jobClient.cancel().get();
347+
}
348+
276349
private void writeFullTypeRow(TablePath tablePath, String partition) throws Exception {
277350
List<InternalRow> rows =
278351
Collections.singletonList(

0 commit comments

Comments
 (0)