Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions .github/workflows/flink-integration-test-action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ jobs:

- name: Flink Integration Test
id: integrationTest
# run embedded mode and deploy mode integration tests
run: |
./gradlew -PskipTests -PtestMode=embedded -PskipDockerTests=false :flink-connector:flink:test --tests "org.apache.gravitino.flink.connector.integration.test.**"
./gradlew -PskipTests -PtestMode=deploy -PskipDockerTests=false :flink-connector:flink:test --tests "org.apache.gravitino.flink.connector.integration.test.**"
./gradlew -PskipTests -PtestMode=embedded -PskipDockerTests=false :flink-connector:flink-1.18:test --tests "org.apache.gravitino.flink.connector.integration.test.**"
./gradlew -PskipTests -PtestMode=deploy -PskipDockerTests=false :flink-connector:flink-1.18:test --tests "org.apache.gravitino.flink.connector.integration.test.**"

- name: Upload integrate tests reports
uses: actions/upload-artifact@v7
Expand All @@ -58,9 +57,9 @@ jobs:
name: flink-connector-integrate-test-reports-${{ inputs.java-version }}
path: |
build/reports
flink-connector/flink/build/*.log
flink-connector/flink/build/*.tar
flink-connector/v1.18/flink/build/*.log
flink-connector/v1.18/flink/build/*.tar
distribution/package/logs/gravitino-server.out
distribution/package/logs/gravitino-server.log
catalogs/**/*.log
catalogs/**/*.tar
catalogs/**/*.tar
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,18 @@ repositories {
mavenCentral()
}

var paimonVersion: String = libs.versions.paimon.get()
val flinkVersion: String = libs.versions.flink.get()
val flinkVersion: String = libs.versions.flink18.get()
val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".")

val icebergVersion: String = libs.versions.iceberg4connector.get()
val icebergVersion: String = libs.versions.iceberg4flink18.get()
val paimonVersion: String = libs.versions.paimon4flink18.get()

// The Flink only support scala 2.12, and all scala api will be removed in a future version.
// You can find more detail at the following issues:
// https://issues.apache.org/jira/browse/FLINK-23986,
// https://issues.apache.org/jira/browse/FLINK-20845,
// https://issues.apache.org/jira/browse/FLINK-13414.
val scalaVersion: String = "2.12"
val artifactName = "${rootProject.name}-flink-${flinkMajorVersion}_$scalaVersion"
val artifactName = "${rootProject.name}-flink-common"

dependencies {
implementation(project(":catalogs:catalog-common")) {
Expand All @@ -47,13 +46,12 @@ dependencies {
implementation(libs.guava)

compileOnly(project(":clients:client-java-runtime", configuration = "shadow"))

compileOnly("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion")
compileOnly("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
compileOnly("org.apache.flink:flink-table-common:$flinkVersion")
compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion")
compileOnly("org.apache.paimon:paimon-flink-1.18:$paimonVersion")
compileOnly(libs.flinkjdbc)
compileOnly("org.apache.paimon:paimon-flink-$flinkMajorVersion:$paimonVersion")
compileOnly(libs.flinkjdbc18)

compileOnly(libs.hive2.exec) {
artifact {
Expand Down Expand Up @@ -96,7 +94,7 @@ dependencies {
testImplementation(libs.testcontainers.junit.jupiter)
testImplementation(libs.testcontainers.mysql)
testImplementation(libs.metrics.core)
testImplementation(libs.flinkjdbc)
testImplementation(libs.flinkjdbc18)
testImplementation(libs.minikdc)

testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
Expand Down Expand Up @@ -182,7 +180,6 @@ dependencies {
tasks.test {
val skipITs = project.hasProperty("skipITs")
if (skipITs) {
// Exclude integration tests
exclude("**/integration/test/**")
} else {
dependsOn(tasks.jar)
Expand All @@ -194,10 +191,33 @@ tasks.withType<Jar> {
archiveBaseName.set(artifactName)
}

val testJar by tasks.registering(Jar::class) {
archiveClassifier.set("tests")
archiveBaseName.set(artifactName)
from(sourceSets["test"].output)
}

configurations {
create("testArtifacts")
}

artifacts {
add("testArtifacts", testJar)
}

publishing {
publications {
withType<MavenPublication>().configureEach {
artifactId = artifactName
}
}
}

tasks.clean {
delete("derby.log")
delete("metastore_db")
}

tasks.named<Jar>("sourcesJar") {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.utils.CatalogCompat;
import org.apache.gravitino.flink.connector.utils.DefaultCatalogCompat;
import org.apache.gravitino.flink.connector.utils.TableUtils;
import org.apache.gravitino.flink.connector.utils.TypeUtils;
import org.apache.gravitino.rel.Column;
Expand Down Expand Up @@ -570,7 +572,21 @@ protected CatalogBaseTable toFlinkTable(Table table, ObjectPath tablePath) {
schemaAndTablePropertiesConverter.toFlinkTableProperties(
catalogOptions, table.properties(), tablePath);
List<String> partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning());
return CatalogTable.of(builder.build(), table.comment(), partitionKeys, flinkTableProperties);
return newCatalogTable(builder.build(), table.comment(), partitionKeys, flinkTableProperties);
}

protected CatalogTable newCatalogTable(
org.apache.flink.table.api.Schema schema,
String comment,
List<String> partitionKeys,
Map<String, String> options) {
return catalogCompat().createCatalogTable(schema, comment, partitionKeys, options);
}

protected CatalogCompat catalogCompat() {
// Versioned catalog entry classes override this hook when the Flink minor has a different
// catalog/table API path.
return DefaultCatalogCompat.INSTANCE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have any other oss project compatible with multiple versions,Is it implemented in the same way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think this needs a code change in this PR. The current structure uses a shared common layer plus version-specific adapters/hooks (catalogCompat() and the versioned catalog/factory classes) to isolate Flink minor-version API differences. That is the main reason for this split: keep the common behavior in one place and localize the Flink-version-specific parts in each version module.

}

private static Optional<List<String>> getFlinkPrimaryKey(Table table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.table.catalog.CatalogPropertiesUtil;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.gravitino.flink.connector.utils.CatalogCompat;
import org.apache.gravitino.flink.connector.utils.DefaultCatalogCompat;
import org.apache.gravitino.rel.Table;

final class FlinkGenericTableUtil {
Expand Down Expand Up @@ -62,7 +65,12 @@ static boolean isGenericTableWhenLoad(Map<String, String> properties) {
}

static Map<String, String> toGravitinoGenericTableProperties(ResolvedCatalogTable resolvedTable) {
Map<String, String> properties = CatalogPropertiesUtil.serializeCatalogTable(resolvedTable);
return toGravitinoGenericTableProperties(resolvedTable, DefaultCatalogCompat.INSTANCE);
}

static Map<String, String> toGravitinoGenericTableProperties(
ResolvedCatalogTable resolvedTable, CatalogCompat catalogCompat) {
Map<String, String> properties = catalogCompat.serializeCatalogTable(resolvedTable);
if (!properties.containsKey(CONNECTOR)) {
properties.put(CONNECTOR, MANAGED_TABLE_IDENTIFIER);
}
Expand All @@ -72,6 +80,14 @@ static Map<String, String> toGravitinoGenericTableProperties(ResolvedCatalogTabl
}

static CatalogTable toFlinkGenericTable(Table table) {
return toFlinkGenericTable(table, DefaultCatalogCompat.INSTANCE);
}

static CatalogTable toFlinkGenericTable(Table table, CatalogCompat catalogCompat) {
return toFlinkGenericTable(table, catalogCompat::createCatalogTable);
}

static CatalogTable toFlinkGenericTable(Table table, CatalogTableBuilder tableBuilder) {
Map<String, String> flinkProperties = unmaskFlinkProperties(table.properties());
CatalogTable catalogTable = CatalogPropertiesUtil.deserializeCatalogTable(flinkProperties);
if (catalogTable.getUnresolvedSchema().getColumns().isEmpty()) {
Expand All @@ -82,13 +98,22 @@ static CatalogTable toFlinkGenericTable(Table table) {
if (MANAGED_TABLE_IDENTIFIER.equalsIgnoreCase(options.get(CONNECTOR))) {
options.remove(CONNECTOR);
}
return CatalogTable.of(
return tableBuilder.create(
catalogTable.getUnresolvedSchema(),
catalogTable.getComment(),
catalogTable.getPartitionKeys(),
options);
}

@FunctionalInterface
interface CatalogTableBuilder {
CatalogTable create(
org.apache.flink.table.api.Schema schema,
String comment,
List<String> partitionKeys,
Map<String, String> options);
}

private static String getConnectorFromProperties(Map<String, String> properties) {
String connector = properties.get(CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX + CONNECTOR);
if (connector == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogPropertiesUtil;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
Expand Down Expand Up @@ -111,8 +112,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig

NameIdentifier identifier =
NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName());
Map<String, String> properties =
FlinkGenericTableUtil.toGravitinoGenericTableProperties(resolvedTable);
Map<String, String> properties = toGravitinoGenericTableProperties(resolvedTable);

try {
catalog()
Expand Down Expand Up @@ -146,7 +146,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
.asTableCatalog()
.loadTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()));
if (FlinkGenericTableUtil.isGenericTableWhenLoad(table.properties())) {
return FlinkGenericTableUtil.toFlinkGenericTable(table);
return toFlinkGenericTable(table);
}
return super.toFlinkTable(table, tablePath);
} catch (NoSuchTableException e) {
Expand Down Expand Up @@ -218,8 +218,7 @@ private void applyGenericTableAlter(
throws TableNotExistException, CatalogException {
NameIdentifier identifier =
NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName());
Map<String, String> updatedProperties =
FlinkGenericTableUtil.toGravitinoGenericTableProperties(newTable);
Map<String, String> updatedProperties = toGravitinoGenericTableProperties(newTable);
Map<String, String> currentProperties =
existingTable.properties() == null ? Collections.emptyMap() : existingTable.properties();

Expand Down Expand Up @@ -252,4 +251,13 @@ private void applyGenericTableAlter(
throw new CatalogException(e);
}
}

protected Map<String, String> toGravitinoGenericTableProperties(
ResolvedCatalogTable resolvedTable) {
return FlinkGenericTableUtil.toGravitinoGenericTableProperties(resolvedTable, catalogCompat());
}

protected CatalogTable toFlinkGenericTable(Table table) {
return FlinkGenericTableUtil.toFlinkGenericTable(table, catalogCompat());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move these methods back to the Util class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to keep these thin wrapper methods on the catalog class. They delegate to FlinkGenericTableUtil, but they also bind the call to the catalog instance so the version-specific catalogCompat() hook can be applied naturally by subclasses such as the Flink 1.18 catalog. Moving them back to the util class would make the compat path more scattered rather than simpler.

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import static org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions.IDENTIFIER;

import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.hive.HiveCatalog;
Expand Down Expand Up @@ -63,7 +65,7 @@ public Catalog createCatalog(Context context) {
PropertyUtils.getHadoopAndHiveProperties(context.getOptions()).forEach(hiveConf::set);
SchemaAndTablePropertiesConverter tablePropertiesConverter =
new HiveSchemaAndTablePropertiesConverter(hiveConf);
return new GravitinoHiveCatalog(
return newCatalog(
context.getName(),
helper.getOptions().get(HiveCatalogFactoryOptions.DEFAULT_DATABASE),
context.getOptions(),
Expand All @@ -73,6 +75,24 @@ public Catalog createCatalog(Context context) {
helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION));
}

protected Catalog newCatalog(
String catalogName,
String defaultDatabase,
Map<String, String> catalogOptions,
SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
PartitionConverter partitionConverter,
@Nullable HiveConf hiveConf,
@Nullable String hiveVersion) {
return new GravitinoHiveCatalog(
catalogName,
defaultDatabase,
catalogOptions,
schemaAndTablePropertiesConverter,
partitionConverter,
hiveConf,
hiveVersion);
}

@Override
public String factoryIdentifier() {
return IDENTIFIER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,51 @@
import java.util.Map;
import java.util.Optional;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
import org.apache.iceberg.flink.FlinkCatalog;
import org.apache.iceberg.flink.FlinkCatalogFactory;

/** Gravitino Iceberg Catalog. */
public class GravitinoIcebergCatalog extends BaseCatalog {

private final FlinkCatalog icebergCatalog;
private final AbstractCatalog icebergCatalog;

protected GravitinoIcebergCatalog(
String catalogName,
String defaultDatabase,
SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
PartitionConverter partitionConverter,
Map<String, String> flinkCatalogProperties) {
Map<String, String> catalogOptions,
Map<String, String> icebergCatalogProperties,
CatalogFactory.Context context) {
super(
catalogName,
flinkCatalogProperties,
catalogOptions,
defaultDatabase,
schemaAndTablePropertiesConverter,
partitionConverter);
FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();
this.icebergCatalog =
(FlinkCatalog) flinkCatalogFactory.createCatalog(catalogName, flinkCatalogProperties);
(AbstractCatalog)
new FlinkCatalogFactory().createCatalog(catalogName, icebergCatalogProperties);
}

protected GravitinoIcebergCatalog(
String catalogName,
String defaultDatabase,
SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
PartitionConverter partitionConverter,
Map<String, String> catalogOptions,
AbstractCatalog icebergCatalog) {
super(
catalogName,
catalogOptions,
defaultDatabase,
schemaAndTablePropertiesConverter,
partitionConverter);
this.icebergCatalog = icebergCatalog;
}

@Override
Expand Down
Loading
Loading