Skip to content

Commit 8369177

Browse files
committed
Revert "[flink] Allow pass lake catalog property info when create flink catalog (apache#1937)"
This reverts commit 35d92c5.
1 parent a313d4f commit 8369177

File tree

13 files changed

+36
-135
lines changed

13 files changed

+36
-135
lines changed

fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.List;
3434
import java.util.Map;
3535
import java.util.Optional;
36-
import java.util.function.Supplier;
3736

3837
/** A {@link FlinkCatalog} used for Flink 2.1. */
3938
public class Flink21Catalog extends FlinkCatalog {
@@ -43,15 +42,8 @@ public Flink21Catalog(
4342
String defaultDatabase,
4443
String bootstrapServers,
4544
ClassLoader classLoader,
46-
Map<String, String> securityConfigs,
47-
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
48-
super(
49-
name,
50-
defaultDatabase,
51-
bootstrapServers,
52-
classLoader,
53-
securityConfigs,
54-
lakeCatalogPropertiesSupplier);
45+
Map<String, String> securityConfigs) {
46+
super(name, defaultDatabase, bootstrapServers, classLoader, securityConfigs);
5547
}
5648

5749
@VisibleForTesting
@@ -61,15 +53,13 @@ public Flink21Catalog(
6153
String bootstrapServers,
6254
ClassLoader classLoader,
6355
Map<String, String> securityConfigs,
64-
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier,
6556
LakeFlinkCatalog lakeFlinkCatalog) {
6657
super(
6758
name,
6859
defaultDatabase,
6960
bootstrapServers,
7061
classLoader,
7162
securityConfigs,
72-
lakeCatalogPropertiesSupplier,
7363
lakeFlinkCatalog);
7464
}
7565

fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ public FlinkCatalog createCatalog(Context context) {
2929
catalog.defaultDatabase,
3030
catalog.bootstrapServers,
3131
catalog.classLoader,
32-
catalog.securityConfigs,
33-
catalog.lakeCatalogPropertiesSupplier);
32+
catalog.securityConfigs);
3433
}
3534
}

fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ static void beforeAll() {
4242
catalog.defaultDatabase,
4343
catalog.bootstrapServers,
4444
catalog.classLoader,
45-
catalog.securityConfigs,
46-
catalog.lakeCatalogPropertiesSupplier);
45+
catalog.securityConfigs);
4746
catalog.open();
4847
}
4948

fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ protected FlinkCatalog initCatalog(
4343
bootstrapServers,
4444
Thread.currentThread().getContextClassLoader(),
4545
Collections.emptyMap(),
46-
Collections::emptyMap,
4746
lakeFlinkCatalog);
4847
}
4948

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
import java.util.Map;
8181
import java.util.Objects;
8282
import java.util.Optional;
83-
import java.util.function.Supplier;
8483
import java.util.stream.Collectors;
8584

8685
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -116,8 +115,6 @@ public class FlinkCatalog extends AbstractCatalog {
116115
protected final String bootstrapServers;
117116
protected final Map<String, String> securityConfigs;
118117
protected final LakeFlinkCatalog lakeFlinkCatalog;
119-
protected volatile Map<String, String> lakeCatalogProperties;
120-
protected final Supplier<Map<String, String>> lakeCatalogPropertiesSupplier;
121118
protected Connection connection;
122119
protected Admin admin;
123120

@@ -126,15 +123,13 @@ public FlinkCatalog(
126123
String defaultDatabase,
127124
String bootstrapServers,
128125
ClassLoader classLoader,
129-
Map<String, String> securityConfigs,
130-
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
126+
Map<String, String> securityConfigs) {
131127
this(
132128
name,
133129
defaultDatabase,
134130
bootstrapServers,
135131
classLoader,
136132
securityConfigs,
137-
lakeCatalogPropertiesSupplier,
138133
new LakeFlinkCatalog(name, classLoader));
139134
}
140135

@@ -145,15 +140,13 @@ public FlinkCatalog(
145140
String bootstrapServers,
146141
ClassLoader classLoader,
147142
Map<String, String> securityConfigs,
148-
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier,
149143
LakeFlinkCatalog lakeFlinkCatalog) {
150144
super(name, defaultDatabase);
151145
this.catalogName = name;
152146
this.defaultDatabase = defaultDatabase;
153147
this.bootstrapServers = bootstrapServers;
154148
this.classLoader = classLoader;
155149
this.securityConfigs = securityConfigs;
156-
this.lakeCatalogPropertiesSupplier = lakeCatalogPropertiesSupplier;
157150
this.lakeFlinkCatalog = lakeFlinkCatalog;
158151
}
159152

@@ -319,12 +312,8 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
319312
objectPath.getDatabaseName(),
320313
tableName.split("\\" + LAKE_TABLE_SPLITTER)[0])));
321314
}
322-
323315
return getLakeTable(
324-
objectPath.getDatabaseName(),
325-
tableName,
326-
tableInfo.getProperties(),
327-
getLakeCatalogProperties());
316+
objectPath.getDatabaseName(), tableName, tableInfo.getProperties());
328317
} else {
329318
tableInfo = admin.getTableInfo(tablePath).get();
330319
}
@@ -358,10 +347,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
358347
}
359348

360349
protected CatalogBaseTable getLakeTable(
361-
String databaseName,
362-
String tableName,
363-
Configuration properties,
364-
Map<String, String> lakeCatalogProperties)
350+
String databaseName, String tableName, Configuration properties)
365351
throws TableNotExistException, CatalogException {
366352
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
367353
if (tableComponents.length == 1) {
@@ -373,7 +359,7 @@ protected CatalogBaseTable getLakeTable(
373359
tableName = String.join("", tableComponents);
374360
}
375361
return lakeFlinkCatalog
376-
.getLakeCatalog(properties, lakeCatalogProperties)
362+
.getLakeCatalog(properties)
377363
.getTable(new ObjectPath(databaseName, tableName));
378364
}
379365

@@ -786,16 +772,4 @@ public Procedure getProcedure(ObjectPath procedurePath)
786772
public Map<String, String> getSecurityConfigs() {
787773
return securityConfigs;
788774
}
789-
790-
@VisibleForTesting
791-
public Map<String, String> getLakeCatalogProperties() {
792-
if (lakeCatalogProperties == null) {
793-
synchronized (this) {
794-
if (lakeCatalogProperties == null) {
795-
lakeCatalogProperties = lakeCatalogPropertiesSupplier.get();
796-
}
797-
}
798-
}
799-
return lakeCatalogProperties;
800-
}
801775
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,12 @@
1818
package org.apache.fluss.flink.catalog;
1919

2020
import org.apache.fluss.flink.FlinkConnectorOptions;
21-
import org.apache.fluss.metadata.DataLakeFormat;
2221

2322
import org.apache.flink.configuration.ConfigOption;
2423
import org.apache.flink.table.factories.CatalogFactory;
2524
import org.apache.flink.table.factories.FactoryUtil;
2625

27-
import java.util.ArrayList;
2826
import java.util.Collections;
29-
import java.util.HashMap;
30-
import java.util.List;
3127
import java.util.Map;
3228
import java.util.Set;
3329

@@ -39,15 +35,6 @@ public class FlinkCatalogFactory implements CatalogFactory {
3935

4036
public static final String IDENTIFIER = "fluss";
4137

42-
public static final List<String> PREFIXES_TO_SKIP_VALIDATE = new ArrayList<>();
43-
44-
static {
45-
PREFIXES_TO_SKIP_VALIDATE.add(CLIENT_SECURITY_PREFIX);
46-
for (DataLakeFormat value : DataLakeFormat.values()) {
47-
PREFIXES_TO_SKIP_VALIDATE.add(value.toString());
48-
}
49-
}
50-
5138
@Override
5239
public String factoryIdentifier() {
5340
return IDENTIFIER;
@@ -67,7 +54,7 @@ public Set<ConfigOption<?>> optionalOptions() {
6754
public FlinkCatalog createCatalog(Context context) {
6855
final FactoryUtil.CatalogFactoryHelper helper =
6956
FactoryUtil.createCatalogFactoryHelper(this, context);
70-
helper.validateExcept(PREFIXES_TO_SKIP_VALIDATE.toArray(new String[0]));
57+
helper.validateExcept(CLIENT_SECURITY_PREFIX);
7158
Map<String, String> options = context.getOptions();
7259
Map<String, String> securityConfigs = extractPrefix(options, CLIENT_SECURITY_PREFIX);
7360

@@ -76,13 +63,6 @@ public FlinkCatalog createCatalog(Context context) {
7663
helper.getOptions().get(FlinkCatalogOptions.DEFAULT_DATABASE),
7764
helper.getOptions().get(FlinkConnectorOptions.BOOTSTRAP_SERVERS),
7865
context.getClassLoader(),
79-
securityConfigs,
80-
() -> {
81-
Map<String, String> lakeCatalogProperties = new HashMap<>();
82-
for (DataLakeFormat lakeFormat : DataLakeFormat.values()) {
83-
lakeCatalogProperties.putAll(extractPrefix(options, lakeFormat.toString()));
84-
}
85-
return lakeCatalogProperties;
86-
});
66+
securityConfigs);
8767
}
8868
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.fluss.config.Configuration;
2222
import org.apache.fluss.flink.utils.DataLakeUtils;
2323
import org.apache.fluss.metadata.DataLakeFormat;
24-
import org.apache.fluss.utils.PropertiesUtils;
2524

2625
import org.apache.flink.table.catalog.Catalog;
2726
import org.apache.paimon.catalog.CatalogContext;
@@ -50,8 +49,7 @@ public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) {
5049
this.classLoader = classLoader;
5150
}
5251

53-
public Catalog getLakeCatalog(
54-
Configuration tableOptions, Map<String, String> lakeCatalogProperties) {
52+
public Catalog getLakeCatalog(Configuration tableOptions) {
5553
// TODO: Currently, a Fluss cluster only supports a single DataLake storage.
5654
// However, in the
5755
// future, it may support multiple DataLakes. The following code assumes
@@ -71,19 +69,12 @@ public Catalog getLakeCatalog(
7169
+ ConfigOptions.TABLE_DATALAKE_FORMAT.key()
7270
+ "' is set.");
7371
}
74-
Map<String, String> catalogProperties =
75-
PropertiesUtils.extractAndRemovePrefix(
76-
lakeCatalogProperties, lakeFormat + ".");
77-
78-
catalogProperties.putAll(
79-
DataLakeUtils.extractLakeCatalogProperties(tableOptions));
8072
if (lakeFormat == PAIMON) {
8173
catalog =
82-
PaimonCatalogFactory.create(
83-
catalogName, catalogProperties, classLoader);
74+
PaimonCatalogFactory.create(catalogName, tableOptions, classLoader);
8475
this.lakeFormat = PAIMON;
8576
} else if (lakeFormat == ICEBERG) {
86-
catalog = IcebergCatalogFactory.create(catalogName, catalogProperties);
77+
catalog = IcebergCatalogFactory.create(catalogName, tableOptions);
8778
this.lakeFormat = ICEBERG;
8879
} else {
8980
throw new UnsupportedOperationException(
@@ -120,9 +111,9 @@ public static class PaimonCatalogFactory {
120111
private PaimonCatalogFactory() {}
121112

122113
public static Catalog create(
123-
String catalogName,
124-
Map<String, String> catalogProperties,
125-
ClassLoader classLoader) {
114+
String catalogName, Configuration tableOptions, ClassLoader classLoader) {
115+
Map<String, String> catalogProperties =
116+
DataLakeUtils.extractLakeCatalogProperties(tableOptions);
126117
return FlinkCatalogFactory.createCatalog(
127118
catalogName,
128119
CatalogContext.create(
@@ -140,7 +131,9 @@ private IcebergCatalogFactory() {}
140131
// requires Iceberg 1.5.0+.
141132
// Using reflection to maintain Java 8 compatibility.
142133
// Once Fluss drops Java 8, we can remove the reflection code
143-
public static Catalog create(String catalogName, Map<String, String> catalogProperties) {
134+
public static Catalog create(String catalogName, Configuration tableOptions) {
135+
Map<String, String> catalogProperties =
136+
DataLakeUtils.extractLakeCatalogProperties(tableOptions);
144137
// Map "type" to "catalog-type" (equivalent)
145138
// Required: either "catalog-type" (standard type) or "catalog-impl"
146139
// (fully-qualified custom class, mandatory if "catalog-type" is missing)

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import org.apache.flink.table.factories.DynamicTableSourceFactory;
2626
import org.apache.flink.table.factories.FactoryUtil;
2727

28-
import java.util.Collections;
29-
3028
/** A factory to create {@link DynamicTableSource} for lake table. */
3129
public class LakeTableFactory {
3230
private final LakeFlinkCatalog lakeFlinkCatalog;
@@ -85,7 +83,7 @@ private DynamicTableSourceFactory getIcebergFactory() {
8583
lakeFlinkCatalog.getLakeCatalog(
8684
// we can pass empty configuration to get catalog
8785
// since the catalog should already be initialized
88-
new Configuration(), Collections.emptyMap());
86+
new Configuration());
8987

9088
// Create FlinkDynamicTableFactory with the catalog
9189
Class<?> icebergFactoryClass =

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ public void testCreateCatalog() {
6464
DB_NAME,
6565
BOOTSTRAP_SERVERS_NAME,
6666
Thread.currentThread().getContextClassLoader(),
67-
Collections.emptyMap(),
68-
Collections::emptyMap);
67+
Collections.emptyMap());
6968

7069
checkEquals(flinkCatalog, actualCatalog);
7170

@@ -76,12 +75,7 @@ public void testCreateCatalog() {
7675
securityMap.put("client.security.sasl.username", "root");
7776
securityMap.put("client.security.sasl.password", "password");
7877

79-
Map<String, String> lakeCatalogMap = new HashMap<>();
80-
lakeCatalogMap.put("paimon.jdbc.user", "admin");
81-
lakeCatalogMap.put("paimon.jdbc.password", "pass");
82-
8378
options.putAll(securityMap);
84-
options.putAll(lakeCatalogMap);
8579
FlinkCatalog actualCatalog2 =
8680
(FlinkCatalog)
8781
FactoryUtil.createCatalog(
@@ -91,7 +85,6 @@ public void testCreateCatalog() {
9185
Thread.currentThread().getContextClassLoader());
9286

9387
assertThat(actualCatalog2.getSecurityConfigs()).isEqualTo(securityMap);
94-
assertThat(actualCatalog2.getLakeCatalogProperties()).isEqualTo(lakeCatalogMap);
9588
}
9689

9790
@Test

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,7 @@ static void beforeAll() {
115115
DEFAULT_DB,
116116
bootstrapServers,
117117
Thread.currentThread().getContextClassLoader(),
118-
Collections.emptyMap(),
119-
Collections::emptyMap);
118+
Collections.emptyMap());
120119
catalog.open();
121120
}
122121

@@ -770,8 +769,7 @@ void testAuthentication() throws Exception {
770769
DEFAULT_DB,
771770
bootstrapServers,
772771
Thread.currentThread().getContextClassLoader(),
773-
Collections.emptyMap(),
774-
Collections::emptyMap);
772+
Collections.emptyMap());
775773
Catalog finalAuthenticateCatalog = authenticateCatalog;
776774
assertThatThrownBy(finalAuthenticateCatalog::open)
777775
.cause()
@@ -789,8 +787,7 @@ void testAuthentication() throws Exception {
789787
DEFAULT_DB,
790788
bootstrapServers,
791789
Thread.currentThread().getContextClassLoader(),
792-
clientConfig,
793-
Collections::emptyMap);
790+
clientConfig);
794791
authenticateCatalog.open();
795792
assertThat(authenticateCatalog.listDatabases())
796793
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(DEFAULT_DB));
@@ -818,20 +815,6 @@ void testCreateCatalogWithUnexistedDatabase() {
818815
"The configured default-database 'non-exist' does not exist in the Fluss cluster.");
819816
}
820817

821-
@Test
822-
void testCreateCatalogWithLakeProperties() {
823-
Map<String, String> properties = new HashMap<>();
824-
properties.put("paimon.jdbc.password", "pass");
825-
tEnv.executeSql(
826-
String.format(
827-
"create catalog test_catalog_with_lake_properties with ('type' = 'fluss', '%s' = '%s', 'paimon.jdbc.password' = 'pass')",
828-
BOOTSTRAP_SERVERS.key(), FLUSS_CLUSTER_EXTENSION.getBootstrapServers()));
829-
FlinkCatalog catalog =
830-
(FlinkCatalog) tEnv.getCatalog("test_catalog_with_lake_properties").get();
831-
832-
assertOptionsEqual(catalog.getLakeCatalogProperties(), properties);
833-
}
834-
835818
/**
836819
* Before Flink 2.1, the {@link Schema} did not include an index field. Starting from Flink 2.1,
837820
* Flink introduced the concept of an index, and in Fluss, the primary key is considered as an

0 commit comments

Comments
 (0)