Skip to content

Commit 15de5fd

Browse files
committed
fix supplier
1 parent 32826f5 commit 15de5fd

File tree

4 files changed

+16
-12
lines changed

4 files changed

+16
-12
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ public class FlinkCatalog extends AbstractCatalog {
116116
protected final String bootstrapServers;
117117
protected final Map<String, String> securityConfigs;
118118
protected final LakeFlinkCatalog lakeFlinkCatalog;
119+
protected volatile Map<String, String> lakeCatalogProperties;
119120
protected final Supplier<Map<String, String>> lakeCatalogPropertiesSupplier;
120121
protected Connection connection;
121122
protected Admin admin;
@@ -323,7 +324,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
323324
objectPath.getDatabaseName(),
324325
tableName,
325326
tableInfo.getProperties(),
326-
lakeCatalogPropertiesSupplier);
327+
getLakeCatalogProperties());
327328
} else {
328329
tableInfo = admin.getTableInfo(tablePath).get();
329330
}
@@ -360,7 +361,7 @@ protected CatalogBaseTable getLakeTable(
360361
String databaseName,
361362
String tableName,
362363
Configuration properties,
363-
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier)
364+
Map<String, String> lakeCatalogProperties)
364365
throws TableNotExistException, CatalogException {
365366
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
366367
if (tableComponents.length == 1) {
@@ -372,7 +373,7 @@ protected CatalogBaseTable getLakeTable(
372373
tableName = String.join("", tableComponents);
373374
}
374375
return lakeFlinkCatalog
375-
.getLakeCatalog(properties, lakeCatalogPropertiesSupplier)
376+
.getLakeCatalog(properties, lakeCatalogProperties)
376377
.getTable(new ObjectPath(databaseName, tableName));
377378
}
378379

@@ -788,6 +789,13 @@ public Map<String, String> getSecurityConfigs() {
788789

789790
@VisibleForTesting
790791
public Map<String, String> getLakeCatalogProperties() {
791-
return lakeCatalogPropertiesSupplier.get();
792+
if (lakeCatalogProperties == null) {
793+
synchronized (this) {
794+
if (lakeCatalogProperties == null) {
795+
lakeCatalogProperties = lakeCatalogPropertiesSupplier.get();
796+
}
797+
}
798+
}
799+
return lakeCatalogProperties;
792800
}
793801
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131

3232
import java.lang.reflect.Method;
3333
import java.util.Map;
34-
import java.util.function.Supplier;
3534

3635
import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
3736
import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
@@ -52,8 +51,7 @@ public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) {
5251
}
5352

5453
public Catalog getLakeCatalog(
55-
Configuration tableOptions,
56-
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
54+
Configuration tableOptions, Map<String, String> lakeCatalogProperties) {
5755
// TODO: Currently, a Fluss cluster only supports a single DataLake storage.
5856
// However, in the
5957
// future, it may support multiple DataLakes. The following code assumes
@@ -75,7 +73,7 @@ public Catalog getLakeCatalog(
7573
}
7674
Map<String, String> catalogProperties =
7775
PropertiesUtils.extractAndRemovePrefix(
78-
lakeCatalogPropertiesSupplier.get(), lakeFormat + ".");
76+
lakeCatalogProperties, lakeFormat + ".");
7977

8078
catalogProperties.putAll(
8179
DataLakeUtils.extractLakeCatalogProperties(tableOptions));

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ private DynamicTableSourceFactory getIcebergFactory() {
8585
lakeFlinkCatalog.getLakeCatalog(
8686
// we can pass empty configuration to get catalog
8787
// since the catalog should already be initialized
88-
new Configuration(), Collections::emptyMap);
88+
new Configuration(), Collections.emptyMap());
8989

9090
// Create FlinkDynamicTableFactory with the catalog
9191
Class<?> icebergFactoryClass =

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@
7272
import java.util.List;
7373
import java.util.Map;
7474
import java.util.Optional;
75-
import java.util.function.Supplier;
7675

7776
import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
7877
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
@@ -1012,8 +1011,7 @@ public MockLakeFlinkCatalog(String catalogName, ClassLoader classLoader) {
10121011

10131012
@Override
10141013
public Catalog getLakeCatalog(
1015-
Configuration tableOptions,
1016-
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
1014+
Configuration tableOptions, Map<String, String> lakeCatalogProperties) {
10171015
return catalog;
10181016
}
10191017

0 commit comments

Comments
 (0)