Skip to content

Commit aec141d

Browse files
authored
[flink] Optimize the getLakeTableFactory in LakeTableFactory (#1816)
1 parent 07847f5 commit aec141d

File tree

5 files changed

+75
-96
lines changed

5 files changed

+75
-96
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.fluss.config.ConfigOptions;
2424
import org.apache.fluss.config.Configuration;
2525
import org.apache.fluss.exception.InvalidTableException;
26-
import org.apache.fluss.flink.lake.LakeCatalog;
26+
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
2727
import org.apache.fluss.flink.procedure.ProcedureManager;
2828
import org.apache.fluss.flink.utils.CatalogExceptionUtils;
2929
import org.apache.fluss.flink.utils.FlinkConversions;
@@ -114,7 +114,7 @@ public class FlinkCatalog extends AbstractCatalog {
114114
protected final String defaultDatabase;
115115
protected final String bootstrapServers;
116116
protected final Map<String, String> securityConfigs;
117-
private final LakeCatalog lakeCatalog;
117+
private final LakeFlinkCatalog lakeFlinkCatalog;
118118
protected Connection connection;
119119
protected Admin admin;
120120

@@ -130,12 +130,12 @@ public FlinkCatalog(
130130
this.bootstrapServers = bootstrapServers;
131131
this.classLoader = classLoader;
132132
this.securityConfigs = securityConfigs;
133-
this.lakeCatalog = new LakeCatalog(catalogName, classLoader);
133+
this.lakeFlinkCatalog = new LakeFlinkCatalog(catalogName, classLoader);
134134
}
135135

136136
@Override
137137
public Optional<Factory> getFactory() {
138-
return Optional.of(new FlinkTableFactory(lakeCatalog));
138+
return Optional.of(new FlinkTableFactory(lakeFlinkCatalog));
139139
}
140140

141141
@Override
@@ -340,7 +340,7 @@ protected CatalogBaseTable getLakeTable(
340340
// Need to reconstruct: table_name + $snapshots
341341
tableName = String.join("", tableComponents);
342342
}
343-
return lakeCatalog
343+
return lakeFlinkCatalog
344344
.getLakeCatalog(properties)
345345
.getTable(new ObjectPath(databaseName, tableName));
346346
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.fluss.config.ConfigOptions;
2121
import org.apache.fluss.config.Configuration;
2222
import org.apache.fluss.flink.FlinkConnectorOptions;
23-
import org.apache.fluss.flink.lake.LakeCatalog;
23+
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
2424
import org.apache.fluss.flink.lake.LakeTableFactory;
2525
import org.apache.fluss.flink.sink.FlinkTableSink;
2626
import org.apache.fluss.flink.source.FlinkTableSource;
@@ -69,11 +69,11 @@
6969
/** Factory to create table source and table sink for Fluss. */
7070
public class FlinkTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
7171

72-
private final LakeCatalog lakeCatalog;
72+
private final LakeFlinkCatalog lakeFlinkCatalog;
7373
private volatile LakeTableFactory lakeTableFactory;
7474

75-
public FlinkTableFactory(LakeCatalog lakeCatalog) {
76-
this.lakeCatalog = lakeCatalog;
75+
public FlinkTableFactory(LakeFlinkCatalog lakeFlinkCatalog) {
76+
this.lakeFlinkCatalog = lakeFlinkCatalog;
7777
}
7878

7979
@Override
@@ -257,7 +257,7 @@ private LakeTableFactory mayInitLakeTableFactory() {
257257
if (lakeTableFactory == null) {
258258
synchronized (this) {
259259
if (lakeTableFactory == null) {
260-
lakeTableFactory = new LakeTableFactory(lakeCatalog);
260+
lakeTableFactory = new LakeTableFactory(lakeFlinkCatalog);
261261
}
262262
}
263263
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java renamed to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.fluss.config.Configuration;
2222
import org.apache.fluss.flink.utils.DataLakeUtils;
2323
import org.apache.fluss.metadata.DataLakeFormat;
24-
import org.apache.fluss.utils.MapUtils;
2524

2625
import org.apache.flink.table.catalog.Catalog;
2726
import org.apache.paimon.catalog.CatalogContext;
@@ -34,59 +33,64 @@
3433

3534
import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
3635
import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
36+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
3737

38-
/** A lake catalog to delegate the operations on lake table. */
39-
public class LakeCatalog {
40-
private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE =
41-
MapUtils.newConcurrentHashMap();
38+
/** A lake flink catalog to delegate the operations on lake table. */
39+
public class LakeFlinkCatalog {
4240

4341
private final String catalogName;
4442
private final ClassLoader classLoader;
4543

46-
public LakeCatalog(String catalogName, ClassLoader classLoader) {
44+
private volatile Catalog catalog;
45+
private volatile DataLakeFormat lakeFormat;
46+
47+
public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) {
4748
this.catalogName = catalogName;
4849
this.classLoader = classLoader;
4950
}
5051

5152
public Catalog getLakeCatalog(Configuration tableOptions) {
52-
DataLakeFormat lakeFormat = tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
53-
if (lakeFormat == null) {
54-
throw new IllegalArgumentException(
55-
"DataLake format is not specified in table options. "
56-
+ "Please ensure '"
57-
+ ConfigOptions.TABLE_DATALAKE_FORMAT.key()
58-
+ "' is set.");
59-
}
60-
return LAKE_CATALOG_CACHE.computeIfAbsent(
61-
lakeFormat,
62-
(dataLakeFormat) -> {
63-
if (dataLakeFormat == PAIMON) {
64-
return PaimonCatalogFactory.create(catalogName, tableOptions, classLoader);
65-
} else if (dataLakeFormat == ICEBERG) {
66-
return IcebergCatalogFactory.create(catalogName, tableOptions);
53+
// TODO: Currently, a Fluss cluster only supports a single DataLake storage.
54+
// However, in the
55+
// future, it may support multiple DataLakes. The following code assumes
56+
// that a single
57+
// lakeCatalog is shared across multiple tables, which will no longer be
58+
// valid in such
59+
// cases and should be updated accordingly.
60+
if (catalog == null) {
61+
synchronized (this) {
62+
if (catalog == null) {
63+
DataLakeFormat lakeFormat =
64+
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
65+
if (lakeFormat == null) {
66+
throw new IllegalArgumentException(
67+
"DataLake format is not specified in table options. "
68+
+ "Please ensure '"
69+
+ ConfigOptions.TABLE_DATALAKE_FORMAT.key()
70+
+ "' is set.");
71+
}
72+
if (lakeFormat == PAIMON) {
73+
catalog =
74+
PaimonCatalogFactory.create(catalogName, tableOptions, classLoader);
75+
this.lakeFormat = PAIMON;
76+
} else if (lakeFormat == ICEBERG) {
77+
catalog = IcebergCatalogFactory.create(catalogName, tableOptions);
78+
this.lakeFormat = ICEBERG;
6779
} else {
6880
throw new UnsupportedOperationException(
69-
"Unsupported datalake format: " + dataLakeFormat);
81+
"Unsupported data lake format: " + lakeFormat);
7082
}
71-
});
83+
}
84+
}
85+
}
86+
return catalog;
7287
}
7388

74-
public Catalog getLakeCatalog(Configuration tableOptions, DataLakeFormat lakeFormat) {
75-
if (lakeFormat == null) {
76-
throw new IllegalArgumentException("DataLake format cannot be null");
77-
}
78-
return LAKE_CATALOG_CACHE.computeIfAbsent(
89+
public DataLakeFormat getLakeFormat() {
90+
checkNotNull(
7991
lakeFormat,
80-
(dataLakeFormat) -> {
81-
if (dataLakeFormat == PAIMON) {
82-
return PaimonCatalogFactory.create(catalogName, tableOptions, classLoader);
83-
} else if (dataLakeFormat == ICEBERG) {
84-
return IcebergCatalogFactory.create(catalogName, tableOptions);
85-
} else {
86-
throw new UnsupportedOperationException(
87-
"Unsupported datalake format: " + dataLakeFormat);
88-
}
89-
});
92+
"DataLake format is null, must call getLakeCatalog first to initialize lake format.");
93+
return lakeFormat;
9094
}
9195

9296
/**

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java

Lines changed: 22 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,20 @@
1717

1818
package org.apache.fluss.flink.lake;
1919

20+
import org.apache.fluss.config.Configuration;
21+
2022
import org.apache.flink.table.catalog.ObjectIdentifier;
2123
import org.apache.flink.table.connector.source.DynamicTableSource;
2224
import org.apache.flink.table.factories.DynamicTableFactory;
2325
import org.apache.flink.table.factories.DynamicTableSourceFactory;
2426
import org.apache.flink.table.factories.FactoryUtil;
2527

26-
import java.util.Map;
27-
2828
/** A factory to create {@link DynamicTableSource} for lake table. */
2929
public class LakeTableFactory {
30-
private final LakeCatalog lakeCatalog;
30+
private final LakeFlinkCatalog lakeFlinkCatalog;
3131

32-
public LakeTableFactory(LakeCatalog lakeCatalog) {
33-
this.lakeCatalog = lakeCatalog;
32+
public LakeTableFactory(LakeFlinkCatalog lakeFlinkCatalog) {
33+
this.lakeFlinkCatalog = lakeFlinkCatalog;
3434
}
3535

3636
public DynamicTableSource createDynamicTableSource(
@@ -42,21 +42,6 @@ public DynamicTableSource createDynamicTableSource(
4242
originIdentifier.getDatabaseName(),
4343
tableName);
4444

45-
// Determine the lake format from the table options
46-
Map<String, String> tableOptions = context.getCatalogTable().getOptions();
47-
48-
// If not present, fallback to 'fluss.table.datalake.format' (set by Fluss)
49-
String connector = tableOptions.get("connector");
50-
if (connector == null) {
51-
connector = tableOptions.get("fluss.table.datalake.format");
52-
}
53-
54-
if (connector == null) {
55-
// For Paimon system tables (like table_name$options), the table options are empty
56-
// Default to Paimon for backward compatibility
57-
connector = "paimon";
58-
}
59-
6045
// For Iceberg and Paimon, pass the table name as-is to their factory.
6146
// Metadata tables will be handled internally by their respective factories.
6247
DynamicTableFactory.Context newContext =
@@ -69,44 +54,41 @@ public DynamicTableSource createDynamicTableSource(
6954
context.isTemporary());
7055

7156
// Get the appropriate factory based on connector type
72-
DynamicTableSourceFactory factory = getLakeTableFactory(connector, tableOptions);
57+
DynamicTableSourceFactory factory = getLakeTableFactory();
7358
return factory.createDynamicTableSource(newContext);
7459
}
7560

76-
private DynamicTableSourceFactory getLakeTableFactory(
77-
String connector, Map<String, String> tableOptions) {
78-
if ("paimon".equalsIgnoreCase(connector)) {
79-
return getPaimonFactory();
80-
} else if ("iceberg".equalsIgnoreCase(connector)) {
81-
return getIcebergFactory(tableOptions);
82-
} else {
83-
throw new UnsupportedOperationException(
84-
"Unsupported lake connector: "
85-
+ connector
86-
+ ". Only 'paimon' and 'iceberg' are supported.");
61+
private DynamicTableSourceFactory getLakeTableFactory() {
62+
switch (lakeFlinkCatalog.getLakeFormat()) {
63+
case PAIMON:
64+
return getPaimonFactory();
65+
case ICEBERG:
66+
return getIcebergFactory();
67+
default:
68+
throw new UnsupportedOperationException(
69+
"Unsupported lake connector: "
70+
+ lakeFlinkCatalog.getLakeFormat()
71+
+ ". Only 'paimon' and 'iceberg' are supported.");
8772
}
8873
}
8974

9075
private DynamicTableSourceFactory getPaimonFactory() {
9176
return new org.apache.paimon.flink.FlinkTableFactory();
9277
}
9378

94-
private DynamicTableSourceFactory getIcebergFactory(Map<String, String> tableOptions) {
79+
private DynamicTableSourceFactory getIcebergFactory() {
9580
try {
96-
// Get the Iceberg FlinkCatalog instance from LakeCatalog
97-
org.apache.fluss.config.Configuration flussConfig =
98-
org.apache.fluss.config.Configuration.fromMap(tableOptions);
99-
10081
// Get catalog with explicit ICEBERG format
10182
org.apache.flink.table.catalog.Catalog catalog =
102-
lakeCatalog.getLakeCatalog(
103-
flussConfig, org.apache.fluss.metadata.DataLakeFormat.ICEBERG);
83+
lakeFlinkCatalog.getLakeCatalog(
84+
// we can pass empty configuration to get catalog
85+
// since the catalog should already be initialized
86+
new Configuration());
10487

10588
// Create FlinkDynamicTableFactory with the catalog
10689
Class<?> icebergFactoryClass =
10790
Class.forName("org.apache.iceberg.flink.FlinkDynamicTableFactory");
10891
Class<?> flinkCatalogClass = Class.forName("org.apache.iceberg.flink.FlinkCatalog");
109-
11092
return (DynamicTableSourceFactory)
11193
icebergFactoryClass
11294
.getDeclaredConstructor(flinkCatalogClass)

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
278278

279279
@ParameterizedTest
280280
@ValueSource(booleans = {false, true})
281-
void testReadIcebergLakeTableAndSystemTable(boolean isPartitioned) throws Exception {
281+
void testReadIcebergLakeTable(boolean isPartitioned) throws Exception {
282282
// first of all, start tiering
283283
JobClient jobClient = buildTieringJob(execEnv);
284284

@@ -308,13 +308,6 @@ void testReadIcebergLakeTableAndSystemTable(boolean isPartitioned) throws Except
308308
int expectedUserRowCount = isPartitioned ? 2 * waitUntilPartitions(t1).size() : 2;
309309
assertThat(icebergRows).hasSize(expectedUserRowCount);
310310

311-
// verify rows have expected number of columns
312-
int userColumnCount = lakeTableResult.getResolvedSchema().getColumnCount();
313-
Row firstRow = icebergRows.get(0);
314-
assertThat(firstRow.getArity())
315-
.as("Iceberg row should have at least user columns")
316-
.isGreaterThanOrEqualTo(userColumnCount);
317-
318311
// Test 2: Read Iceberg system table (snapshots) using $lake$snapshots suffix
319312
TableResult snapshotsResult =
320313
batchTEnv.executeSql(String.format("select * from %s$lake$snapshots", tableName));

0 commit comments

Comments
 (0)