Skip to content

Commit 7de5bd6

Browse files
committed
fix
1 parent 715997d commit 7de5bd6

File tree

4 files changed

+8
-7
lines changed

4 files changed

+8
-7
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
303303
objectPath.getDatabaseName(),
304304
tableName,
305305
tableInfo.getProperties(),
306-
lakeCatalogProperties.get());
306+
lakeCatalogProperties);
307307
} else {
308308
tableInfo = admin.getTableInfo(tablePath).get();
309309
}
@@ -340,7 +340,7 @@ protected CatalogBaseTable getLakeTable(
340340
String databaseName,
341341
String tableName,
342342
Configuration properties,
343-
Map<String, String> lakeCatalogProperties)
343+
Supplier<Map<String, String>> lakeCatalogProperties)
344344
throws TableNotExistException, CatalogException {
345345
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
346346
if (tableComponents.length == 1) {
@@ -767,7 +767,7 @@ public Map<String, String> getSecurityConfigs() {
767767
}
768768

769769
@VisibleForTesting
770-
public Map<String, String> getLakeCatalogProperties() {
770+
public Supplier<Map<String, String>> getLakeCatalogProperties() {
771771
return lakeCatalogProperties;
772772
}
773773
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import java.lang.reflect.Method;
3333
import java.util.Map;
34+
import java.util.function.Supplier;
3435

3536
import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
3637
import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
@@ -51,7 +52,7 @@ public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) {
5152
}
5253

5354
public Catalog getLakeCatalog(
54-
Configuration tableOptions, Map<String, String> lakeCatalogProperties) {
55+
Configuration tableOptions, Supplier<Map<String, String>> lakeCatalogProperties) {
5556
// TODO: Currently, a Fluss cluster only supports a single DataLake storage.
5657
// However, in the
5758
// future, it may support multiple DataLakes. The following code assumes
@@ -73,7 +74,7 @@ public Catalog getLakeCatalog(
7374
}
7475
Map<String, String> catalogProperties =
7576
PropertiesUtils.extractAndRemovePrefix(
76-
lakeCatalogProperties, lakeFormat + ".");
77+
lakeCatalogProperties.get(), lakeFormat + ".");
7778

7879
catalogProperties.putAll(
7980
DataLakeUtils.extractLakeCatalogProperties(tableOptions));

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ private DynamicTableSourceFactory getIcebergFactory() {
8585
lakeFlinkCatalog.getLakeCatalog(
8686
// we can pass empty configuration to get catalog
8787
// since the catalog should already be initialized
88-
new Configuration(), Collections.emptyMap());
88+
new Configuration(), Collections::emptyMap);
8989

9090
// Create FlinkDynamicTableFactory with the catalog
9191
Class<?> icebergFactoryClass =

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void testCreateCatalog() {
9191
Thread.currentThread().getContextClassLoader());
9292

9393
assertThat(actualCatalog2.getSecurityConfigs()).isEqualTo(securityMap);
94-
assertThat(actualCatalog2.getLakeCatalogProperties()).isEqualTo(lakeCatalogMap);
94+
assertThat(actualCatalog2.getLakeCatalogProperties().get()).isEqualTo(lakeCatalogMap);
9595
}
9696

9797
@Test

0 commit comments

Comments
 (0)