Skip to content

Commit 968d3a4

Browse files
beryllwluoyuxia
andauthored
[flink] Supports get iceberg lake table (#1799)
--------- Co-authored-by: luoyuxia <[email protected]>
1 parent be6fc3b commit 968d3a4

File tree

4 files changed

+219
-48
lines changed

4 files changed

+219
-48
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,10 @@
2222
import org.apache.fluss.client.admin.Admin;
2323
import org.apache.fluss.config.ConfigOptions;
2424
import org.apache.fluss.config.Configuration;
25-
import org.apache.fluss.exception.FlussRuntimeException;
2625
import org.apache.fluss.exception.InvalidTableException;
2726
import org.apache.fluss.flink.lake.LakeCatalog;
2827
import org.apache.fluss.flink.procedure.ProcedureManager;
2928
import org.apache.fluss.flink.utils.CatalogExceptionUtils;
30-
import org.apache.fluss.flink.utils.DataLakeUtils;
3129
import org.apache.fluss.flink.utils.FlinkConversions;
3230
import org.apache.fluss.metadata.DatabaseDescriptor;
3331
import org.apache.fluss.metadata.PartitionInfo;
@@ -74,8 +72,6 @@
7472
import org.apache.flink.table.factories.Factory;
7573
import org.apache.flink.table.procedures.Procedure;
7674

77-
import javax.annotation.Nullable;
78-
7975
import java.util.ArrayList;
8076
import java.util.Collections;
8177
import java.util.HashMap;
@@ -118,9 +114,9 @@ public class FlinkCatalog extends AbstractCatalog {
118114
protected final String defaultDatabase;
119115
protected final String bootstrapServers;
120116
protected final Map<String, String> securityConfigs;
117+
private final LakeCatalog lakeCatalog;
121118
protected Connection connection;
122119
protected Admin admin;
123-
private volatile @Nullable LakeCatalog lakeCatalog;
124120

125121
public FlinkCatalog(
126122
String name,
@@ -134,6 +130,7 @@ public FlinkCatalog(
134130
this.bootstrapServers = bootstrapServers;
135131
this.classLoader = classLoader;
136132
this.securityConfigs = securityConfigs;
133+
this.lakeCatalog = new LakeCatalog(catalogName, classLoader);
137134
}
138135

139136
@Override
@@ -334,16 +331,17 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
334331
protected CatalogBaseTable getLakeTable(
335332
String databaseName, String tableName, Configuration properties)
336333
throws TableNotExistException, CatalogException {
337-
mayInitLakeCatalogCatalog(properties);
338334
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
339335
if (tableComponents.length == 1) {
340336
// should be pattern like table_name$lake
341337
tableName = tableComponents[0];
342338
} else {
343-
// be some thing like table_name$lake$snapshot
339+
// be something like table_name$lake$snapshot
344340
tableName = String.join("", tableComponents);
345341
}
346-
return lakeCatalog.getTable(new ObjectPath(databaseName, tableName));
342+
return lakeCatalog
343+
.getLakeCatalog(properties)
344+
.getTable(new ObjectPath(databaseName, tableName));
347345
}
348346

349347
@Override
@@ -751,26 +749,6 @@ public Procedure getProcedure(ObjectPath procedurePath)
751749
}
752750
}
753751

754-
private void mayInitLakeCatalogCatalog(Configuration tableOptions) {
755-
// TODO: Currently, a Fluss cluster only supports a single DataLake storage. However, in the
756-
// future, it may support multiple DataLakes. The following code assumes that a single
757-
// lakeCatalog is shared across multiple tables, which will no longer be valid in such
758-
// cases and should be updated accordingly.
759-
if (lakeCatalog == null) {
760-
synchronized (this) {
761-
if (lakeCatalog == null) {
762-
try {
763-
Map<String, String> catalogProperties =
764-
DataLakeUtils.extractLakeCatalogProperties(tableOptions);
765-
lakeCatalog = new LakeCatalog(catalogName, catalogProperties, classLoader);
766-
} catch (Exception e) {
767-
throw new FlussRuntimeException("Failed to init paimon catalog.", e);
768-
}
769-
}
770-
}
771-
}
772-
}
773-
774752
@VisibleForTesting
775753
public Map<String, String> getSecurityConfigs() {
776754
return securityConfigs;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java

Lines changed: 99 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,36 +17,116 @@
1717

1818
package org.apache.fluss.flink.lake;
1919

20-
import org.apache.flink.table.catalog.CatalogBaseTable;
21-
import org.apache.flink.table.catalog.ObjectPath;
22-
import org.apache.flink.table.catalog.exceptions.CatalogException;
23-
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
20+
import org.apache.fluss.config.ConfigOptions;
21+
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.flink.utils.DataLakeUtils;
23+
import org.apache.fluss.metadata.DataLakeFormat;
24+
import org.apache.fluss.utils.MapUtils;
25+
26+
import org.apache.flink.table.catalog.Catalog;
2427
import org.apache.paimon.catalog.CatalogContext;
25-
import org.apache.paimon.flink.FlinkCatalog;
2628
import org.apache.paimon.flink.FlinkCatalogFactory;
2729
import org.apache.paimon.flink.FlinkFileIOLoader;
2830
import org.apache.paimon.options.Options;
2931

32+
import java.lang.reflect.Method;
3033
import java.util.Map;
3134

35+
import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
36+
import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
37+
3238
/** A lake catalog to delegate the operations on lake table. */
3339
public class LakeCatalog {
40+
private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE =
41+
MapUtils.newConcurrentHashMap();
42+
43+
private final String catalogName;
44+
private final ClassLoader classLoader;
45+
46+
public LakeCatalog(String catalogName, ClassLoader classLoader) {
47+
this.catalogName = catalogName;
48+
this.classLoader = classLoader;
49+
}
50+
51+
public Catalog getLakeCatalog(Configuration tableOptions) {
52+
DataLakeFormat lakeFormat = tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
53+
// TODO: Currently, a Fluss cluster only supports a single DataLake storage.
54+
// However, in the
55+
// future, it may support multiple DataLakes. The following code assumes
56+
// that a single
57+
// lakeCatalog is shared across multiple tables, which will no longer be
58+
// valid in such
59+
// cases and should be updated accordingly.
60+
return LAKE_CATALOG_CACHE.computeIfAbsent(
61+
lakeFormat,
62+
(dataLakeFormat) -> {
63+
if (dataLakeFormat == PAIMON) {
64+
return PaimonCatalogFactory.create(catalogName, tableOptions, classLoader);
65+
} else if (dataLakeFormat == ICEBERG) {
66+
return IcebergCatalogFactory.create(catalogName, tableOptions);
67+
} else {
68+
throw new UnsupportedOperationException(
69+
"Unsupported datalake format: " + dataLakeFormat);
70+
}
71+
});
72+
}
73+
74+
/**
75+
* Factory for creating Paimon Catalog instances.
76+
*
77+
* <p>Purpose: Encapsulates Paimon-related dependencies (e.g. FlinkFileIOLoader) to avoid direct
78+
* dependency in the main LakeCatalog class.
79+
*/
80+
public static class PaimonCatalogFactory {
81+
82+
private PaimonCatalogFactory() {}
3483

35-
// currently, only support paimon
36-
// todo make it pluggable
37-
private final FlinkCatalog paimonFlinkCatalog;
38-
39-
public LakeCatalog(
40-
String catalogName, Map<String, String> catalogProperties, ClassLoader classLoader) {
41-
CatalogContext catalogContext =
42-
CatalogContext.create(
43-
Options.fromMap(catalogProperties), null, new FlinkFileIOLoader());
44-
paimonFlinkCatalog =
45-
FlinkCatalogFactory.createCatalog(catalogName, catalogContext, classLoader);
84+
public static Catalog create(
85+
String catalogName, Configuration tableOptions, ClassLoader classLoader) {
86+
Map<String, String> catalogProperties =
87+
DataLakeUtils.extractLakeCatalogProperties(tableOptions);
88+
return FlinkCatalogFactory.createCatalog(
89+
catalogName,
90+
CatalogContext.create(
91+
Options.fromMap(catalogProperties), null, new FlinkFileIOLoader()),
92+
classLoader);
93+
}
4694
}
4795

48-
public CatalogBaseTable getTable(ObjectPath objectPath)
49-
throws TableNotExistException, CatalogException {
50-
return paimonFlinkCatalog.getTable(objectPath);
96+
/** Factory use reflection to create Iceberg Catalog instances. */
97+
public static class IcebergCatalogFactory {
98+
99+
private IcebergCatalogFactory() {}
100+
101+
// Iceberg 1.4.3 is the last Java 8 compatible version, while the Flink 1.18+ connector
102+
// requires Iceberg 1.5.0+.
103+
// Using reflection to maintain Java 8 compatibility.
104+
// Once Fluss drops Java 8, we can remove the reflection code
105+
public static Catalog create(String catalogName, Configuration tableOptions) {
106+
Map<String, String> catalogProperties =
107+
DataLakeUtils.extractLakeCatalogProperties(tableOptions);
108+
// Map "type" to "catalog-type" (equivalent)
109+
// Required: either "catalog-type" (standard type) or "catalog-impl"
110+
// (fully-qualified custom class, mandatory if "catalog-type" is missing)
111+
if (catalogProperties.containsKey("type")) {
112+
catalogProperties.put("catalog-type", catalogProperties.get("type"));
113+
}
114+
try {
115+
Class<?> flinkCatalogFactoryClass =
116+
Class.forName("org.apache.iceberg.flink.FlinkCatalogFactory");
117+
Object factoryInstance =
118+
flinkCatalogFactoryClass.getDeclaredConstructor().newInstance();
119+
120+
Method createCatalogMethod =
121+
flinkCatalogFactoryClass.getMethod(
122+
"createCatalog", String.class, Map.class);
123+
return (Catalog)
124+
createCatalogMethod.invoke(factoryInstance, catalogName, catalogProperties);
125+
} catch (Exception e) {
126+
throw new RuntimeException(
127+
"Failed to create Iceberg catalog using reflection. Please make sure iceberg-flink-runtime is on the classpath.",
128+
e);
129+
}
130+
}
51131
}
52132
}

fluss-lake/fluss-lake-iceberg/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,12 @@
236236
<scope>test</scope>
237237
</dependency>
238238

239-
239+
<dependency>
240+
<groupId>org.apache.iceberg</groupId>
241+
<artifactId>iceberg-flink-${flink.major.version}</artifactId>
242+
<version>${iceberg.version}</version>
243+
<scope>test</scope>
244+
</dependency>
240245
</dependencies>
241246

242247
<build>
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.lake.iceberg.flink;
20+
21+
import org.apache.fluss.config.ConfigOptions;
22+
import org.apache.fluss.flink.catalog.FlinkCatalog;
23+
import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
24+
25+
import org.apache.flink.table.api.DataTypes;
26+
import org.apache.flink.table.api.Schema;
27+
import org.apache.flink.table.catalog.CatalogBaseTable;
28+
import org.apache.flink.table.catalog.CatalogTable;
29+
import org.apache.flink.table.catalog.Column;
30+
import org.apache.flink.table.catalog.ObjectPath;
31+
import org.apache.flink.table.catalog.ResolvedCatalogTable;
32+
import org.apache.flink.table.catalog.ResolvedSchema;
33+
import org.apache.flink.table.catalog.UniqueConstraint;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
37+
import java.util.Arrays;
38+
import java.util.Collections;
39+
import java.util.HashMap;
40+
import java.util.List;
41+
import java.util.Map;
42+
43+
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
44+
import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS;
45+
import static org.assertj.core.api.Assertions.assertThat;
46+
47+
/** Test class for {@link FlinkCatalog}. */
48+
class FlinkCatalogLakeTest extends FlinkIcebergTieringTestBase {
49+
50+
protected static final String DEFAULT_DB = "fluss";
51+
52+
protected static final String CATALOG_NAME = "test_iceberg_lake";
53+
54+
FlinkCatalog catalog;
55+
56+
@BeforeEach
57+
public void beforeEach() {
58+
super.beforeEach();
59+
buildCatalog();
60+
}
61+
62+
@Test
63+
// TODO: remove this test in #1803
64+
void testGetLakeTable() throws Exception {
65+
Map<String, String> options = new HashMap<>();
66+
options.put(TABLE_DATALAKE_ENABLED.key(), "true");
67+
ObjectPath lakeTablePath = new ObjectPath(DEFAULT_DB, "lake_table");
68+
CatalogTable table = this.newCatalogTable(options);
69+
catalog.createTable(lakeTablePath, table, false);
70+
assertThat(catalog.tableExists(lakeTablePath)).isTrue();
71+
CatalogBaseTable lakeTable =
72+
catalog.getTable(new ObjectPath(DEFAULT_DB, "lake_table$lake"));
73+
Schema schema = lakeTable.getUnresolvedSchema();
74+
assertThat(schema.getColumns().size()).isEqualTo(3 + SYSTEM_COLUMNS.size());
75+
assertThat(schema.getPrimaryKey().isPresent()).isTrue();
76+
assertThat(schema.getPrimaryKey().get().getColumnNames()).isEqualTo(List.of("first"));
77+
}
78+
79+
private CatalogTable newCatalogTable(Map<String, String> options) {
80+
ResolvedSchema resolvedSchema =
81+
new ResolvedSchema(
82+
Arrays.asList(
83+
Column.physical("first", DataTypes.STRING().notNull()),
84+
Column.physical("second", DataTypes.INT()),
85+
Column.physical("third", DataTypes.STRING().notNull())),
86+
Collections.emptyList(),
87+
UniqueConstraint.primaryKey("PK_first", List.of("first")));
88+
CatalogTable origin =
89+
CatalogTable.of(
90+
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
91+
"test comment",
92+
Collections.emptyList(),
93+
options);
94+
return new ResolvedCatalogTable(origin, resolvedSchema);
95+
}
96+
97+
public void buildCatalog() {
98+
String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
99+
catalog =
100+
new FlinkCatalog(
101+
CATALOG_NAME,
102+
DEFAULT_DB,
103+
bootstrapServers,
104+
Thread.currentThread().getContextClassLoader(),
105+
Collections.emptyMap());
106+
catalog.open();
107+
}
108+
}

0 commit comments

Comments
 (0)