Skip to content

Commit 593d27e

Browse files
authored
[hotfix] Flink catalog should pass to lake properties to lake table (apache#2205)
1 parent 5049663 commit 593d27e

File tree

2 files changed

+22
-1
lines changed

2 files changed

+22
-1
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,14 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
335335
Map<String, String> newOptions = new HashMap<>(catalogBaseTable.getOptions());
336336
newOptions.put(BOOTSTRAP_SERVERS.key(), bootstrapServers);
337337
newOptions.putAll(securityConfigs);
338+
// add lake properties
339+
if (tableInfo.getTableConfig().isDataLakeEnabled()) {
340+
for (Map.Entry<String, String> lakePropertyEntry :
341+
getLakeCatalogProperties().entrySet()) {
342+
String key = "table.datalake." + lakePropertyEntry.getKey();
343+
newOptions.put(key, lakePropertyEntry.getValue());
344+
}
345+
}
338346
if (CatalogBaseTable.TableKind.TABLE == catalogBaseTable.getTableKind()) {
339347
return ((CatalogTable) catalogBaseTable).copy(newOptions);
340348
} else if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.flink.table.api.ValidationException;
3636
import org.apache.flink.table.api.config.TableConfigOptions;
3737
import org.apache.flink.table.catalog.Catalog;
38+
import org.apache.flink.table.catalog.CatalogBaseTable;
3839
import org.apache.flink.table.catalog.CatalogTable;
3940
import org.apache.flink.table.catalog.ObjectPath;
4041
import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -819,7 +820,7 @@ void testCreateCatalogWithUnexistedDatabase() {
819820
}
820821

821822
@Test
822-
void testCreateCatalogWithLakeProperties() {
823+
void testCreateCatalogWithLakeProperties() throws Exception {
823824
Map<String, String> properties = new HashMap<>();
824825
properties.put("paimon.jdbc.password", "pass");
825826
tEnv.executeSql(
@@ -830,6 +831,18 @@ void testCreateCatalogWithLakeProperties() {
830831
(FlinkCatalog) tEnv.getCatalog("test_catalog_with_lake_properties").get();
831832

832833
assertOptionsEqual(catalog.getLakeCatalogProperties(), properties);
834+
835+
String ddl =
836+
"create table test_get_lake_table ("
837+
+ "a string, "
838+
+ "b int) "
839+
+ "with ('bucket.num' = '5', 'table.datalake.enabled' = 'true')";
840+
tEnv.executeSql(ddl);
841+
842+
CatalogBaseTable catalogTable =
843+
catalog.getTable(new ObjectPath(DEFAULT_DB, "test_get_lake_table"));
844+
assertThat(catalogTable.getOptions())
845+
.containsEntry("table.datalake.paimon.jdbc.password", "pass");
833846
}
834847

835848
/**

0 commit comments

Comments
 (0)