Skip to content

Commit 7b5f809

Browse files
xx789633vamossagar12
authored andcommitted
[flink] Allow pass lake catalog property info when create flink catalog (apache#1937)
1 parent 29545c3 commit 7b5f809

File tree

13 files changed

+135
-36
lines changed

13 files changed

+135
-36
lines changed

fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.List;
3434
import java.util.Map;
3535
import java.util.Optional;
36+
import java.util.function.Supplier;
3637

3738
/** A {@link FlinkCatalog} used for Flink 2.1. */
3839
public class Flink21Catalog extends FlinkCatalog {
@@ -42,8 +43,15 @@ public Flink21Catalog(
4243
String defaultDatabase,
4344
String bootstrapServers,
4445
ClassLoader classLoader,
45-
Map<String, String> securityConfigs) {
46-
super(name, defaultDatabase, bootstrapServers, classLoader, securityConfigs);
46+
Map<String, String> securityConfigs,
47+
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
48+
super(
49+
name,
50+
defaultDatabase,
51+
bootstrapServers,
52+
classLoader,
53+
securityConfigs,
54+
lakeCatalogPropertiesSupplier);
4755
}
4856

4957
@VisibleForTesting
@@ -53,13 +61,15 @@ public Flink21Catalog(
5361
String bootstrapServers,
5462
ClassLoader classLoader,
5563
Map<String, String> securityConfigs,
64+
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier,
5665
LakeFlinkCatalog lakeFlinkCatalog) {
5766
super(
5867
name,
5968
defaultDatabase,
6069
bootstrapServers,
6170
classLoader,
6271
securityConfigs,
72+
lakeCatalogPropertiesSupplier,
6373
lakeFlinkCatalog);
6474
}
6575

fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public FlinkCatalog createCatalog(Context context) {
2929
catalog.defaultDatabase,
3030
catalog.bootstrapServers,
3131
catalog.classLoader,
32-
catalog.securityConfigs);
32+
catalog.securityConfigs,
33+
catalog.lakeCatalogPropertiesSupplier);
3334
}
3435
}

fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ static void beforeAll() {
4242
catalog.defaultDatabase,
4343
catalog.bootstrapServers,
4444
catalog.classLoader,
45-
catalog.securityConfigs);
45+
catalog.securityConfigs,
46+
catalog.lakeCatalogPropertiesSupplier);
4647
catalog.open();
4748
}
4849

fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ protected FlinkCatalog initCatalog(
4343
bootstrapServers,
4444
Thread.currentThread().getContextClassLoader(),
4545
Collections.emptyMap(),
46+
Collections::emptyMap,
4647
lakeFlinkCatalog);
4748
}
4849

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import java.util.Map;
8181
import java.util.Objects;
8282
import java.util.Optional;
83+
import java.util.function.Supplier;
8384
import java.util.stream.Collectors;
8485

8586
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -115,6 +116,8 @@ public class FlinkCatalog extends AbstractCatalog {
115116
protected final String bootstrapServers;
116117
protected final Map<String, String> securityConfigs;
117118
protected final LakeFlinkCatalog lakeFlinkCatalog;
119+
protected volatile Map<String, String> lakeCatalogProperties;
120+
protected final Supplier<Map<String, String>> lakeCatalogPropertiesSupplier;
118121
protected Connection connection;
119122
protected Admin admin;
120123

@@ -123,13 +126,15 @@ public FlinkCatalog(
123126
String defaultDatabase,
124127
String bootstrapServers,
125128
ClassLoader classLoader,
126-
Map<String, String> securityConfigs) {
129+
Map<String, String> securityConfigs,
130+
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
127131
this(
128132
name,
129133
defaultDatabase,
130134
bootstrapServers,
131135
classLoader,
132136
securityConfigs,
137+
lakeCatalogPropertiesSupplier,
133138
new LakeFlinkCatalog(name, classLoader));
134139
}
135140

@@ -140,13 +145,15 @@ public FlinkCatalog(
140145
String bootstrapServers,
141146
ClassLoader classLoader,
142147
Map<String, String> securityConfigs,
148+
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier,
143149
LakeFlinkCatalog lakeFlinkCatalog) {
144150
super(name, defaultDatabase);
145151
this.catalogName = name;
146152
this.defaultDatabase = defaultDatabase;
147153
this.bootstrapServers = bootstrapServers;
148154
this.classLoader = classLoader;
149155
this.securityConfigs = securityConfigs;
156+
this.lakeCatalogPropertiesSupplier = lakeCatalogPropertiesSupplier;
150157
this.lakeFlinkCatalog = lakeFlinkCatalog;
151158
}
152159

@@ -312,8 +319,12 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
312319
objectPath.getDatabaseName(),
313320
tableName.split("\\" + LAKE_TABLE_SPLITTER)[0])));
314321
}
322+
315323
return getLakeTable(
316-
objectPath.getDatabaseName(), tableName, tableInfo.getProperties());
324+
objectPath.getDatabaseName(),
325+
tableName,
326+
tableInfo.getProperties(),
327+
getLakeCatalogProperties());
317328
} else {
318329
tableInfo = admin.getTableInfo(tablePath).get();
319330
}
@@ -347,7 +358,10 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
347358
}
348359

349360
protected CatalogBaseTable getLakeTable(
350-
String databaseName, String tableName, Configuration properties)
361+
String databaseName,
362+
String tableName,
363+
Configuration properties,
364+
Map<String, String> lakeCatalogProperties)
351365
throws TableNotExistException, CatalogException {
352366
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
353367
if (tableComponents.length == 1) {
@@ -359,7 +373,7 @@ protected CatalogBaseTable getLakeTable(
359373
tableName = String.join("", tableComponents);
360374
}
361375
return lakeFlinkCatalog
362-
.getLakeCatalog(properties)
376+
.getLakeCatalog(properties, lakeCatalogProperties)
363377
.getTable(new ObjectPath(databaseName, tableName));
364378
}
365379

@@ -772,4 +786,16 @@ public Procedure getProcedure(ObjectPath procedurePath)
772786
public Map<String, String> getSecurityConfigs() {
773787
return securityConfigs;
774788
}
789+
790+
@VisibleForTesting
791+
public Map<String, String> getLakeCatalogProperties() {
792+
if (lakeCatalogProperties == null) {
793+
synchronized (this) {
794+
if (lakeCatalogProperties == null) {
795+
lakeCatalogProperties = lakeCatalogPropertiesSupplier.get();
796+
}
797+
}
798+
}
799+
return lakeCatalogProperties;
800+
}
775801
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818
package org.apache.fluss.flink.catalog;
1919

2020
import org.apache.fluss.flink.FlinkConnectorOptions;
21+
import org.apache.fluss.metadata.DataLakeFormat;
2122

2223
import org.apache.flink.configuration.ConfigOption;
2324
import org.apache.flink.table.factories.CatalogFactory;
2425
import org.apache.flink.table.factories.FactoryUtil;
2526

27+
import java.util.ArrayList;
2628
import java.util.Collections;
29+
import java.util.HashMap;
30+
import java.util.List;
2731
import java.util.Map;
2832
import java.util.Set;
2933

@@ -35,6 +39,15 @@ public class FlinkCatalogFactory implements CatalogFactory {
3539

3640
public static final String IDENTIFIER = "fluss";
3741

42+
public static final List<String> PREFIXES_TO_SKIP_VALIDATE = new ArrayList<>();
43+
44+
static {
45+
PREFIXES_TO_SKIP_VALIDATE.add(CLIENT_SECURITY_PREFIX);
46+
for (DataLakeFormat value : DataLakeFormat.values()) {
47+
PREFIXES_TO_SKIP_VALIDATE.add(value.toString());
48+
}
49+
}
50+
3851
@Override
3952
public String factoryIdentifier() {
4053
return IDENTIFIER;
@@ -54,7 +67,7 @@ public Set<ConfigOption<?>> optionalOptions() {
5467
public FlinkCatalog createCatalog(Context context) {
5568
final FactoryUtil.CatalogFactoryHelper helper =
5669
FactoryUtil.createCatalogFactoryHelper(this, context);
57-
helper.validateExcept(CLIENT_SECURITY_PREFIX);
70+
helper.validateExcept(PREFIXES_TO_SKIP_VALIDATE.toArray(new String[0]));
5871
Map<String, String> options = context.getOptions();
5972
Map<String, String> securityConfigs = extractPrefix(options, CLIENT_SECURITY_PREFIX);
6073

@@ -63,6 +76,13 @@ public FlinkCatalog createCatalog(Context context) {
6376
helper.getOptions().get(FlinkCatalogOptions.DEFAULT_DATABASE),
6477
helper.getOptions().get(FlinkConnectorOptions.BOOTSTRAP_SERVERS),
6578
context.getClassLoader(),
66-
securityConfigs);
79+
securityConfigs,
80+
() -> {
81+
Map<String, String> lakeCatalogProperties = new HashMap<>();
82+
for (DataLakeFormat lakeFormat : DataLakeFormat.values()) {
83+
lakeCatalogProperties.putAll(extractPrefix(options, lakeFormat.toString()));
84+
}
85+
return lakeCatalogProperties;
86+
});
6787
}
6888
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.config.Configuration;
2222
import org.apache.fluss.flink.utils.DataLakeUtils;
2323
import org.apache.fluss.metadata.DataLakeFormat;
24+
import org.apache.fluss.utils.PropertiesUtils;
2425

2526
import org.apache.flink.table.catalog.Catalog;
2627
import org.apache.paimon.catalog.CatalogContext;
@@ -49,7 +50,8 @@ public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) {
4950
this.classLoader = classLoader;
5051
}
5152

52-
public Catalog getLakeCatalog(Configuration tableOptions) {
53+
public Catalog getLakeCatalog(
54+
Configuration tableOptions, Map<String, String> lakeCatalogProperties) {
5355
// TODO: Currently, a Fluss cluster only supports a single DataLake storage.
5456
// However, in the
5557
// future, it may support multiple DataLakes. The following code assumes
@@ -69,12 +71,19 @@ public Catalog getLakeCatalog(Configuration tableOptions) {
6971
+ ConfigOptions.TABLE_DATALAKE_FORMAT.key()
7072
+ "' is set.");
7173
}
74+
Map<String, String> catalogProperties =
75+
PropertiesUtils.extractAndRemovePrefix(
76+
lakeCatalogProperties, lakeFormat + ".");
77+
78+
catalogProperties.putAll(
79+
DataLakeUtils.extractLakeCatalogProperties(tableOptions));
7280
if (lakeFormat == PAIMON) {
7381
catalog =
74-
PaimonCatalogFactory.create(catalogName, tableOptions, classLoader);
82+
PaimonCatalogFactory.create(
83+
catalogName, catalogProperties, classLoader);
7584
this.lakeFormat = PAIMON;
7685
} else if (lakeFormat == ICEBERG) {
77-
catalog = IcebergCatalogFactory.create(catalogName, tableOptions);
86+
catalog = IcebergCatalogFactory.create(catalogName, catalogProperties);
7887
this.lakeFormat = ICEBERG;
7988
} else {
8089
throw new UnsupportedOperationException(
@@ -111,9 +120,9 @@ public static class PaimonCatalogFactory {
111120
private PaimonCatalogFactory() {}
112121

113122
public static Catalog create(
114-
String catalogName, Configuration tableOptions, ClassLoader classLoader) {
115-
Map<String, String> catalogProperties =
116-
DataLakeUtils.extractLakeCatalogProperties(tableOptions);
123+
String catalogName,
124+
Map<String, String> catalogProperties,
125+
ClassLoader classLoader) {
117126
return FlinkCatalogFactory.createCatalog(
118127
catalogName,
119128
CatalogContext.create(
@@ -131,9 +140,7 @@ private IcebergCatalogFactory() {}
131140
// requires Iceberg 1.5.0+.
132141
// Using reflection to maintain Java 8 compatibility.
133142
// Once Fluss drops Java 8, we can remove the reflection code
134-
public static Catalog create(String catalogName, Configuration tableOptions) {
135-
Map<String, String> catalogProperties =
136-
DataLakeUtils.extractLakeCatalogProperties(tableOptions);
143+
public static Catalog create(String catalogName, Map<String, String> catalogProperties) {
137144
// Map "type" to "catalog-type" (equivalent)
138145
// Required: either "catalog-type" (standard type) or "catalog-impl"
139146
// (fully-qualified custom class, mandatory if "catalog-type" is missing)

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.flink.table.factories.DynamicTableSourceFactory;
2626
import org.apache.flink.table.factories.FactoryUtil;
2727

28+
import java.util.Collections;
29+
2830
/** A factory to create {@link DynamicTableSource} for lake table. */
2931
public class LakeTableFactory {
3032
private final LakeFlinkCatalog lakeFlinkCatalog;
@@ -83,7 +85,7 @@ private DynamicTableSourceFactory getIcebergFactory() {
8385
lakeFlinkCatalog.getLakeCatalog(
8486
// we can pass empty configuration to get catalog
8587
// since the catalog should already be initialized
86-
new Configuration());
88+
new Configuration(), Collections.emptyMap());
8789

8890
// Create FlinkDynamicTableFactory with the catalog
8991
Class<?> icebergFactoryClass =

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ public void testCreateCatalog() {
6464
DB_NAME,
6565
BOOTSTRAP_SERVERS_NAME,
6666
Thread.currentThread().getContextClassLoader(),
67-
Collections.emptyMap());
67+
Collections.emptyMap(),
68+
Collections::emptyMap);
6869

6970
checkEquals(flinkCatalog, actualCatalog);
7071

@@ -75,7 +76,12 @@ public void testCreateCatalog() {
7576
securityMap.put("client.security.sasl.username", "root");
7677
securityMap.put("client.security.sasl.password", "password");
7778

79+
Map<String, String> lakeCatalogMap = new HashMap<>();
80+
lakeCatalogMap.put("paimon.jdbc.user", "admin");
81+
lakeCatalogMap.put("paimon.jdbc.password", "pass");
82+
7883
options.putAll(securityMap);
84+
options.putAll(lakeCatalogMap);
7985
FlinkCatalog actualCatalog2 =
8086
(FlinkCatalog)
8187
FactoryUtil.createCatalog(
@@ -85,6 +91,7 @@ public void testCreateCatalog() {
8591
Thread.currentThread().getContextClassLoader());
8692

8793
assertThat(actualCatalog2.getSecurityConfigs()).isEqualTo(securityMap);
94+
assertThat(actualCatalog2.getLakeCatalogProperties()).isEqualTo(lakeCatalogMap);
8895
}
8996

9097
@Test

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ static void beforeAll() {
115115
DEFAULT_DB,
116116
bootstrapServers,
117117
Thread.currentThread().getContextClassLoader(),
118-
Collections.emptyMap());
118+
Collections.emptyMap(),
119+
Collections::emptyMap);
119120
catalog.open();
120121
}
121122

@@ -769,7 +770,8 @@ void testAuthentication() throws Exception {
769770
DEFAULT_DB,
770771
bootstrapServers,
771772
Thread.currentThread().getContextClassLoader(),
772-
Collections.emptyMap());
773+
Collections.emptyMap(),
774+
Collections::emptyMap);
773775
Catalog finalAuthenticateCatalog = authenticateCatalog;
774776
assertThatThrownBy(finalAuthenticateCatalog::open)
775777
.cause()
@@ -787,7 +789,8 @@ void testAuthentication() throws Exception {
787789
DEFAULT_DB,
788790
bootstrapServers,
789791
Thread.currentThread().getContextClassLoader(),
790-
clientConfig);
792+
clientConfig,
793+
Collections::emptyMap);
791794
authenticateCatalog.open();
792795
assertThat(authenticateCatalog.listDatabases())
793796
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(DEFAULT_DB));
@@ -815,6 +818,20 @@ void testCreateCatalogWithUnexistedDatabase() {
815818
"The configured default-database 'non-exist' does not exist in the Fluss cluster.");
816819
}
817820

821+
@Test
822+
void testCreateCatalogWithLakeProperties() {
823+
Map<String, String> properties = new HashMap<>();
824+
properties.put("paimon.jdbc.password", "pass");
825+
tEnv.executeSql(
826+
String.format(
827+
"create catalog test_catalog_with_lake_properties with ('type' = 'fluss', '%s' = '%s', 'paimon.jdbc.password' = 'pass')",
828+
BOOTSTRAP_SERVERS.key(), FLUSS_CLUSTER_EXTENSION.getBootstrapServers()));
829+
FlinkCatalog catalog =
830+
(FlinkCatalog) tEnv.getCatalog("test_catalog_with_lake_properties").get();
831+
832+
assertOptionsEqual(catalog.getLakeCatalogProperties(), properties);
833+
}
834+
818835
/**
819836
* Before Flink 2.1, the {@link Schema} did not include an index field. Starting from Flink 2.1,
820837
* Flink introduced the concept of an index, and in Fluss, the primary key is considered as an

0 commit comments

Comments
 (0)