Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

/** A {@link FlinkCatalog} used for Flink 2.1. */
public class Flink21Catalog extends FlinkCatalog {
Expand All @@ -42,8 +43,15 @@ public Flink21Catalog(
String defaultDatabase,
String bootstrapServers,
ClassLoader classLoader,
Map<String, String> securityConfigs) {
super(name, defaultDatabase, bootstrapServers, classLoader, securityConfigs);
Map<String, String> securityConfigs,
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
super(
name,
defaultDatabase,
bootstrapServers,
classLoader,
securityConfigs,
lakeCatalogPropertiesSupplier);
}

@VisibleForTesting
Expand All @@ -53,13 +61,15 @@ public Flink21Catalog(
String bootstrapServers,
ClassLoader classLoader,
Map<String, String> securityConfigs,
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier,
LakeFlinkCatalog lakeFlinkCatalog) {
super(
name,
defaultDatabase,
bootstrapServers,
classLoader,
securityConfigs,
lakeCatalogPropertiesSupplier,
lakeFlinkCatalog);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public FlinkCatalog createCatalog(Context context) {
catalog.defaultDatabase,
catalog.bootstrapServers,
catalog.classLoader,
catalog.securityConfigs);
catalog.securityConfigs,
catalog.lakeCatalogPropertiesSupplier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ static void beforeAll() {
catalog.defaultDatabase,
catalog.bootstrapServers,
catalog.classLoader,
catalog.securityConfigs);
catalog.securityConfigs,
catalog.lakeCatalogPropertiesSupplier);
catalog.open();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ protected FlinkCatalog initCatalog(
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap(),
Collections::emptyMap,
lakeFlinkCatalog);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -115,6 +116,8 @@ public class FlinkCatalog extends AbstractCatalog {
protected final String bootstrapServers;
protected final Map<String, String> securityConfigs;
protected final LakeFlinkCatalog lakeFlinkCatalog;
protected volatile Map<String, String> lakeCatalogProperties;
protected final Supplier<Map<String, String>> lakeCatalogPropertiesSupplier;
protected Connection connection;
protected Admin admin;

Expand All @@ -123,13 +126,15 @@ public FlinkCatalog(
String defaultDatabase,
String bootstrapServers,
ClassLoader classLoader,
Map<String, String> securityConfigs) {
Map<String, String> securityConfigs,
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
this(
name,
defaultDatabase,
bootstrapServers,
classLoader,
securityConfigs,
lakeCatalogPropertiesSupplier,
new LakeFlinkCatalog(name, classLoader));
}

Expand All @@ -140,13 +145,15 @@ public FlinkCatalog(
String bootstrapServers,
ClassLoader classLoader,
Map<String, String> securityConfigs,
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier,
LakeFlinkCatalog lakeFlinkCatalog) {
super(name, defaultDatabase);
this.catalogName = name;
this.defaultDatabase = defaultDatabase;
this.bootstrapServers = bootstrapServers;
this.classLoader = classLoader;
this.securityConfigs = securityConfigs;
this.lakeCatalogPropertiesSupplier = lakeCatalogPropertiesSupplier;
this.lakeFlinkCatalog = lakeFlinkCatalog;
}

Expand Down Expand Up @@ -312,8 +319,12 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
objectPath.getDatabaseName(),
tableName.split("\\" + LAKE_TABLE_SPLITTER)[0])));
}

return getLakeTable(
objectPath.getDatabaseName(), tableName, tableInfo.getProperties());
objectPath.getDatabaseName(),
tableName,
tableInfo.getProperties(),
getLakeCatalogProperties());
} else {
tableInfo = admin.getTableInfo(tablePath).get();
}
Expand Down Expand Up @@ -347,7 +358,10 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
}

protected CatalogBaseTable getLakeTable(
String databaseName, String tableName, Configuration properties)
String databaseName,
String tableName,
Configuration properties,
Map<String, String> lakeCatalogProperties)
throws TableNotExistException, CatalogException {
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
if (tableComponents.length == 1) {
Expand All @@ -359,7 +373,7 @@ protected CatalogBaseTable getLakeTable(
tableName = String.join("", tableComponents);
}
return lakeFlinkCatalog
.getLakeCatalog(properties)
.getLakeCatalog(properties, lakeCatalogProperties)
.getTable(new ObjectPath(databaseName, tableName));
}

Expand Down Expand Up @@ -772,4 +786,16 @@ public Procedure getProcedure(ObjectPath procedurePath)
public Map<String, String> getSecurityConfigs() {
return securityConfigs;
}

@VisibleForTesting
public Map<String, String> getLakeCatalogProperties() {
if (lakeCatalogProperties == null) {
synchronized (this) {
if (lakeCatalogProperties == null) {
lakeCatalogProperties = lakeCatalogPropertiesSupplier.get();
}
}
}
return lakeCatalogProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package org.apache.fluss.flink.catalog;

import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.metadata.DataLakeFormat;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -35,6 +39,15 @@ public class FlinkCatalogFactory implements CatalogFactory {

public static final String IDENTIFIER = "fluss";

public static final List<String> PREFIXES_TO_SKIP_VALIDATE = new ArrayList<>();

static {
PREFIXES_TO_SKIP_VALIDATE.add(CLIENT_SECURITY_PREFIX);
for (DataLakeFormat value : DataLakeFormat.values()) {
PREFIXES_TO_SKIP_VALIDATE.add(value.toString());
}
}

@Override
public String factoryIdentifier() {
return IDENTIFIER;
Expand All @@ -54,7 +67,7 @@ public Set<ConfigOption<?>> optionalOptions() {
public FlinkCatalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validateExcept(CLIENT_SECURITY_PREFIX);
helper.validateExcept(PREFIXES_TO_SKIP_VALIDATE.toArray(new String[0]));
Map<String, String> options = context.getOptions();
Map<String, String> securityConfigs = extractPrefix(options, CLIENT_SECURITY_PREFIX);

Expand All @@ -63,6 +76,13 @@ public FlinkCatalog createCatalog(Context context) {
helper.getOptions().get(FlinkCatalogOptions.DEFAULT_DATABASE),
helper.getOptions().get(FlinkConnectorOptions.BOOTSTRAP_SERVERS),
context.getClassLoader(),
securityConfigs);
securityConfigs,
() -> {
Map<String, String> lakeCatalogProperties = new HashMap<>();
for (DataLakeFormat lakeFormat : DataLakeFormat.values()) {
lakeCatalogProperties.putAll(extractPrefix(options, lakeFormat.toString()));
}
return lakeCatalogProperties;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.utils.DataLakeUtils;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.utils.PropertiesUtils;

import org.apache.flink.table.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
Expand Down Expand Up @@ -49,7 +50,8 @@ public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) {
this.classLoader = classLoader;
}

public Catalog getLakeCatalog(Configuration tableOptions) {
public Catalog getLakeCatalog(
Configuration tableOptions, Map<String, String> lakeCatalogProperties) {
// TODO: Currently, a Fluss cluster only supports a single DataLake storage.
// However, in the
// future, it may support multiple DataLakes. The following code assumes
Expand All @@ -69,12 +71,19 @@ public Catalog getLakeCatalog(Configuration tableOptions) {
+ ConfigOptions.TABLE_DATALAKE_FORMAT.key()
+ "' is set.");
}
Map<String, String> catalogProperties =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be

Map<String, String> catalogProperties =
                            PropertiesUtils.extractAndRemovePrefix(
                                    lakeCatalogProperties, lakeFormat + ".");

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

PropertiesUtils.extractAndRemovePrefix(
lakeCatalogProperties, lakeFormat + ".");

catalogProperties.putAll(
DataLakeUtils.extractLakeCatalogProperties(tableOptions));
if (lakeFormat == PAIMON) {
catalog =
PaimonCatalogFactory.create(catalogName, tableOptions, classLoader);
PaimonCatalogFactory.create(
catalogName, catalogProperties, classLoader);
this.lakeFormat = PAIMON;
} else if (lakeFormat == ICEBERG) {
catalog = IcebergCatalogFactory.create(catalogName, tableOptions);
catalog = IcebergCatalogFactory.create(catalogName, catalogProperties);
this.lakeFormat = ICEBERG;
} else {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -111,9 +120,9 @@ public static class PaimonCatalogFactory {
private PaimonCatalogFactory() {}

public static Catalog create(
String catalogName, Configuration tableOptions, ClassLoader classLoader) {
Map<String, String> catalogProperties =
DataLakeUtils.extractLakeCatalogProperties(tableOptions);
String catalogName,
Map<String, String> catalogProperties,
ClassLoader classLoader) {
return FlinkCatalogFactory.createCatalog(
catalogName,
CatalogContext.create(
Expand All @@ -131,9 +140,7 @@ private IcebergCatalogFactory() {}
// requires Iceberg 1.5.0+.
// Using reflection to maintain Java 8 compatibility.
// Once Fluss drops Java 8, we can remove the reflection code
public static Catalog create(String catalogName, Configuration tableOptions) {
Map<String, String> catalogProperties =
DataLakeUtils.extractLakeCatalogProperties(tableOptions);
public static Catalog create(String catalogName, Map<String, String> catalogProperties) {
// Map "type" to "catalog-type" (equivalent)
// Required: either "catalog-type" (standard type) or "catalog-impl"
// (fully-qualified custom class, mandatory if "catalog-type" is missing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;

import java.util.Collections;

/** A factory to create {@link DynamicTableSource} for lake table. */
public class LakeTableFactory {
private final LakeFlinkCatalog lakeFlinkCatalog;
Expand Down Expand Up @@ -83,7 +85,7 @@ private DynamicTableSourceFactory getIcebergFactory() {
lakeFlinkCatalog.getLakeCatalog(
// we can pass empty configuration to get catalog
// since the catalog should already be initialized
new Configuration());
new Configuration(), Collections.emptyMap());

// Create FlinkDynamicTableFactory with the catalog
Class<?> icebergFactoryClass =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public void testCreateCatalog() {
DB_NAME,
BOOTSTRAP_SERVERS_NAME,
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap());
Collections.emptyMap(),
Collections::emptyMap);

checkEquals(flinkCatalog, actualCatalog);

Expand All @@ -75,7 +76,12 @@ public void testCreateCatalog() {
securityMap.put("client.security.sasl.username", "root");
securityMap.put("client.security.sasl.password", "password");

Map<String, String> lakeCatalogMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add test in FlinkCatalogITCase to verify SQL can pass the properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a new test testCreateCatalogWithLakeProperties is added.

lakeCatalogMap.put("paimon.jdbc.user", "admin");
lakeCatalogMap.put("paimon.jdbc.password", "pass");

options.putAll(securityMap);
options.putAll(lakeCatalogMap);
FlinkCatalog actualCatalog2 =
(FlinkCatalog)
FactoryUtil.createCatalog(
Expand All @@ -85,6 +91,7 @@ public void testCreateCatalog() {
Thread.currentThread().getContextClassLoader());

assertThat(actualCatalog2.getSecurityConfigs()).isEqualTo(securityMap);
assertThat(actualCatalog2.getLakeCatalogProperties()).isEqualTo(lakeCatalogMap);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ static void beforeAll() {
DEFAULT_DB,
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap());
Collections.emptyMap(),
Collections::emptyMap);
catalog.open();
}

Expand Down Expand Up @@ -769,7 +770,8 @@ void testAuthentication() throws Exception {
DEFAULT_DB,
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap());
Collections.emptyMap(),
Collections::emptyMap);
Catalog finalAuthenticateCatalog = authenticateCatalog;
assertThatThrownBy(finalAuthenticateCatalog::open)
.cause()
Expand All @@ -787,7 +789,8 @@ void testAuthentication() throws Exception {
DEFAULT_DB,
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
clientConfig);
clientConfig,
Collections::emptyMap);
authenticateCatalog.open();
assertThat(authenticateCatalog.listDatabases())
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(DEFAULT_DB));
Expand Down Expand Up @@ -815,6 +818,20 @@ void testCreateCatalogWithUnexistedDatabase() {
"The configured default-database 'non-exist' does not exist in the Fluss cluster.");
}

@Test
void testCreateCatalogWithLakeProperties() {
Map<String, String> properties = new HashMap<>();
properties.put("paimon.jdbc.password", "pass");
tEnv.executeSql(
String.format(
"create catalog test_catalog_with_lake_properties with ('type' = 'fluss', '%s' = '%s', 'paimon.jdbc.password' = 'pass')",
BOOTSTRAP_SERVERS.key(), FLUSS_CLUSTER_EXTENSION.getBootstrapServers()));
FlinkCatalog catalog =
(FlinkCatalog) tEnv.getCatalog("test_catalog_with_lake_properties").get();

assertOptionsEqual(catalog.getLakeCatalogProperties(), properties);
}

/**
* Before Flink 2.1, the {@link Schema} did not include an index field. Starting from Flink 2.1,
* Flink introduced the concept of an index, and in Fluss, the primary key is considered as an
Expand Down
Loading