Skip to content

[#10541] refactor(flink-connector): split versioned framework with Flink 1.18 baseline#10517

Open
FANNG1 wants to merge 6 commits intoapache:mainfrom
FANNG1:flink_versioned_catalog_entries_118
Open

[#10541] refactor(flink-connector): split versioned framework with Flink 1.18 baseline#10517
FANNG1 wants to merge 6 commits intoapache:mainfrom
FANNG1:flink_versioned_catalog_entries_118

Conversation

@FANNG1
Copy link
Contributor

@FANNG1 FANNG1 commented Mar 23, 2026

What changes were proposed in this pull request?

This PR implements the first step of the Flink multi-version refactor and limits the scope to the framework split plus Flink 1.18 support.

The main changes are:

  • split the original single Flink connector module into flink-common, flink-1.18, and flink-runtime-1.18
  • move shared connector logic and shared test bases into flink-common
  • add versioned Flink 1.18 catalog/factory entry classes and Flink 1.18 runtime packaging
  • introduce typed catalog compatibility hooks for the versioned layout
  • run Flink integration coverage from the Flink 1.18 module instead of the old shared module
  • keep the scope of this PR limited to Flink 1.18 as the baseline lane

This PR intentionally does not include Flink 1.19 or 1.20 support. Those will follow in separate PRs.

Why are the changes needed?

The current single-module Flink connector layout is not a good foundation for maintained multi-version support. A Flink 1.18 baseline is the first step to establish the new versioned architecture.

This PR is the first implementation stage under parent issue #9710.

Fix: #10541

Does this PR introduce any user-facing change?

Yes.

The Flink connector build layout changes from a single Flink module to:

  • flink-common
  • flink-1.18
  • flink-runtime-1.18

The user-facing SQL behavior is unchanged for Flink 1.18, but the connector packaging and internal implementation are now version-scoped to support later minor-version expansion.

How was this patch tested?

The following validations were run:

  • ./gradlew :common:jar :flink-connector:flink-1.18:test -PskipITs :flink-connector:flink-runtime-1.18:test
  • ./gradlew :flink-connector:flink-1.18:test -PskipTests -PtestMode=embedded -PskipDockerTests=false --tests "org.apache.gravitino.flink.connector.integration.test.catalog.GravitinoCatalogManagerIT118" --console=plain

@github-actions
Copy link

github-actions bot commented Mar 23, 2026

Code Coverage Report

Overall Project 64.84% -0.44% 🟢
Files changed 38.04% 🔴

Module Coverage
aliyun 1.73% 🔴
api 47.14% 🟢
authorization-common 85.96% 🟢
aws 1.1% 🔴
azure 2.6% 🔴
catalog-common 10.0% 🔴
catalog-fileset 80.02% 🟢
catalog-hive 80.98% 🟢
catalog-jdbc-clickhouse 79.06% 🟢
catalog-jdbc-common 42.89% 🟢
catalog-jdbc-doris 80.28% 🟢
catalog-jdbc-hologres 54.03% 🟢
catalog-jdbc-mysql 79.23% 🟢
catalog-jdbc-oceanbase 78.38% 🟢
catalog-jdbc-postgresql 82.05% 🟢
catalog-jdbc-starrocks 78.27% 🟢
catalog-kafka 77.01% 🟢
catalog-lakehouse-generic 45.07% 🟢
catalog-lakehouse-hudi 79.1% 🟢
catalog-lakehouse-iceberg 87.15% 🟢
catalog-lakehouse-paimon 77.71% 🟢
catalog-model 77.72% 🟢
cli 44.51% 🟢
client-java 77.83% 🟢
common 49.42% 🟢
core 80.89% 🟢
filesystem-hadoop3 76.97% 🟢
flink 0.0% 🔴
flink-common 38.96% 🔴
flink-runtime 0.0% 🔴
gcp 14.2% 🔴
hadoop-common 10.39% 🔴
hive-metastore-common 45.82% 🟢
iceberg-common 50.21% 🟢
iceberg-rest-server 66.51% 🟢
integration-test-common 0.0% 🔴
jobs 66.17% 🟢
lance-common 23.88% 🔴
lance-rest-server 57.84% 🟢
lineage 53.02% 🟢
optimizer 82.87% 🟢
optimizer-api 21.95% 🔴
server 85.62% 🟢
server-common 70.14% 🟢
spark 32.79% 🔴
spark-common 39.09% 🔴
trino-connector 31.62% 🔴
Files
Module File Coverage
flink GravitinoHiveCatalogFactoryFlink118.java 0.0% 🔴
GravitinoHiveCatalogFlink118.java 0.0% 🔴
GravitinoIcebergCatalogFactoryFlink118.java 0.0% 🔴
GravitinoIcebergCatalogFlink118.java 0.0% 🔴
GravitinoJdbcCatalogFlink118.java 0.0% 🔴
GravitinoMysqlJdbcCatalogFactoryFlink118.java 0.0% 🔴
GravitinoPostgresJdbcCatalogFactoryFlink118.java 0.0% 🔴
GravitinoPaimonCatalogFactoryFlink118.java 0.0% 🔴
GravitinoPaimonCatalogFlink118.java 0.0% 🔴
CatalogCompatFlink118.java 0.0% 🔴
flink-common CatalogPropertiesConverter.java 100.0% 🟢
HiveCatalogPropertiesConverter.java 100.0% 🟢
JdbcPropertiesConstants.java 100.0% 🟢
MysqlPropertiesConverter.java 100.0% 🟢
PostgresqlPropertiesConverter.java 100.0% 🟢
PaimonPropertiesConverter.java 100.0% 🟢
GravitinoCatalogStoreFactoryOptions.java 100.0% 🟢
DefaultCatalogCompat.java 100.0% 🟢
TypeUtils.java 94.78% 🟢
IcebergPropertiesConverter.java 91.67% 🟢
FlinkGenericTableUtil.java 88.89% 🟢
HiveSchemaAndTablePropertiesConverter.java 80.81% 🟢
TableUtils.java 77.78% 🟢
IcebergPropertiesConstants.java 66.67% 🟢
JdbcPropertiesConverter.java 58.82% 🔴
GravitinoCatalogStore.java 33.8% 🔴
GravitinoCatalogStoreFactory.java 32.0% 🔴
BaseCatalog.java 27.42% 🔴
GravitinoCatalogManager.java 1.08% 🔴
DefaultPartitionConverter.java 0.0% 🔴
PartitionConverter.java 0.0% 🔴
SchemaAndTablePropertiesConverter.java 0.0% 🔴
UnsupportPartitionConverter.java 0.0% 🔴
BaseCatalogFactory.java 0.0% 🔴
GravitinoHiveCatalog.java 0.0% 🔴
GravitinoHiveCatalogFactory.java 0.0% 🔴
GravitinoHiveCatalogFactoryOptions.java 0.0% 🔴
GravitinoIcebergCatalog.java 0.0% 🔴
GravitinoIcebergCatalogFactory.java 0.0% 🔴
GravitinoIcebergCatalogFactoryOptions.java 0.0% 🔴
GravitinoJdbcCatalog.java 0.0% 🔴
GravitinoJdbcCatalogFactory.java 0.0% 🔴
GravitinoJdbcCatalogFactoryOptions.java 0.0% 🔴
GravitinoMysqlJdbcCatalogFactory.java 0.0% 🔴
GravitinoPostgresJdbcCatalogFactory.java 0.0% 🔴
GravitinoPaimonCatalog.java 0.0% 🔴
GravitinoPaimonCatalogFactory.java 0.0% 🔴
GravitinoPaimonCatalogFactoryOptions.java 0.0% 🔴
CatalogCompat.java 0.0% 🔴
FactoryUtils.java 0.0% 🔴
PropertyUtils.java 0.0% 🔴

@FANNG1 FANNG1 force-pushed the flink_versioned_catalog_entries_118 branch from a42a75f to 3ef82d0 Compare March 25, 2026 02:23
@FANNG1 FANNG1 requested a review from Copilot March 25, 2026 08:42
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Establishes the new multi-version Flink connector framework by splitting shared logic into flink-common and introducing a Flink 1.18 versioned lane (flink-1.18 + flink-runtime-1.18), including updated SPI registrations and integration test wiring.

Changes:

  • Split Flink connector into flink-common, flink-1.18, and flink-runtime-1.18, and update Gradle settings/workflows accordingly.
  • Introduce version-scoped catalog/factory entrypoints for Flink 1.18 plus typed compatibility hooks (CatalogCompat*).
  • Move/expand shared converters and integration test bases into flink-common, including runtime-jar service-descriptor merge validation.

Reviewed changes

Copilot reviewed 57 out of 100 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
settings.gradle.kts Switch includes to flink-common + versioned 1.18 modules and set projectDirs.
integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestBaseIT.java Add unit tests for new BaseIT helpers.
integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java Factor auxiliary-service name building + normalize local-access host.
gradle/libs.versions.toml Add Flink 1.18-scoped version aliases (flink18, flinkjdbc18, iceberg/paimon for 1.18).
flink-connector/v1.18/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonLocalFileSystemBackendIT118.java Flink 1.18 test class shim.
flink-connector/v1.18/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonJdbcBackendIT118.java Flink 1.18 test class shim.
flink-connector/v1.18/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonHiveBackendIT118.java Flink 1.18 test class shim.
flink-connector/v1.18/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/jdbc/FlinkJdbcMysqlCatalogIT118.java Flink 1.18 test class shim.
flink-connector/v1.18/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergRestCatalogIT118.java Version-specific IT override for Iceberg REST catalog SQL creation test.
flink-connector/v1.18/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT118.java Flink 1.18 test class shim.
flink-connector/v1.18/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT118.java Flink 1.18 test class shim.
flink-connector/v1.18/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT118.java Flink 1.18 test class shim.
flink-connector/v1.18/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/catalog/GravitinoCatalogManagerIT118.java Flink 1.18 test class shim.
flink-connector/v1.18/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory Register Flink 1.18-specific catalog factories via SPI.
flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/utils/CatalogCompatFlink118.java Flink 1.18 implementation of typed catalog/table compatibility hooks.
flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFlink118.java Flink 1.18 Paimon catalog entry overriding compat hook.
flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryFlink118.java Flink 1.18 Paimon factory entry creating Flink118 catalog.
flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/postgresql/GravitinoPostgresJdbcCatalogFactoryFlink118.java Flink 1.18 PostgreSQL JDBC factory entry.
flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/mysql/GravitinoMysqlJdbcCatalogFactoryFlink118.java Flink 1.18 MySQL JDBC factory entry.
flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFlink118.java Flink 1.18 JDBC catalog entry overriding compat hook.
flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink118.java Flink 1.18 Iceberg catalog entry overriding compat hook.
flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryFlink118.java Flink 1.18 Iceberg factory entry.
flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFlink118.java Flink 1.18 Hive catalog entry overriding compat hook.
flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactoryFlink118.java Flink 1.18 Hive factory entry.
flink-connector/v1.18/flink/build.gradle.kts Define Flink 1.18 module deps + test classpath composition from flink-common.
flink-connector/v1.18/flink-runtime/src/test/java/org/apache/gravitino/flink/runtime/TestRuntimeJarDependencies.java Add runtime-jar test to ensure service descriptors are merged.
flink-connector/v1.18/flink-runtime/build.gradle.kts Point runtime packaging at flink-1.18 and enable mergeServiceFiles().
flink-connector/flink-common/src/test/resources/log4j2.properties Add shared Log4j2 test configuration for Flink connector tests.
flink-connector/flink-common/src/test/resources/flink-tests/hive-site.xml Add shared hive-site.xml test resource.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java Add unit tests for new TypeUtils conversions.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/utils/TestDefaultCatalogCompat.java Add tests for DefaultCatalogCompat behavior.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoFlinkConfig.java Add tests for catalog-store client config extraction.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoCatalogStore.java Extend tests for factory discovery error-handling.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java Add unit tests for Paimon properties converter.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/jdbc/TestPostgresqlPropertiesConverter.java Add Postgres converter test suite binding.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/jdbc/TestMysqlPropertiesConverter.java Add MySQL converter test suite binding.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/jdbc/AbstractJdbcPropertiesConverterTestSuite.java Add shared JDBC converter test suite.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java Add shared IT assertion/util helpers.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonLocalFileSystemBackendIT.java Convert shared IT base to abstract for versioned subclasses.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonJdbcBackendIT.java Convert shared IT base to abstract for versioned subclasses.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonHiveBackendIT.java Convert shared IT base to abstract for versioned subclasses.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java Add shared Paimon IT base for versioned lanes.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/jdbc/FlinkJdbcMysqlCatalogIT.java Convert shared IT base to abstract for versioned subclasses.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergRestCatalogIT.java Convert shared IT base to abstract for versioned subclasses.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java Convert shared IT base to abstract for versioned subclasses.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT.java Convert shared IT base to abstract + use shared hive test resources path.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java Convert shared IT base to abstract + use shared hive test resources path.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/catalog/GravitinoCatalogManagerIT.java Convert shared IT base to abstract for versioned subclasses.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java Add shared Flink IT environment base wiring Gravitino/Hive/HDFS/Flink.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java Route CatalogTable creation through DefaultCatalogCompat.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/hive/TestFlinkGenericTableUtil.java Update generic-table util tests to inject CatalogCompat.
flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java Use DefaultCatalogCompat for CatalogTable creation in tests.
flink-connector/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory Register shared catalog-store factory via SPI.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java Introduce Flink↔Gravitino type conversion utilities.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/utils/TableUtils.java Add TableChange column-position conversion utility.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/utils/PropertyUtils.java Add property filtering helper for Hadoop/Hive-prefixed keys.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java Add helper to validate options and warn on unsupported keys.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/utils/DefaultCatalogCompat.java Default CatalogCompat implementation for shared lane.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/utils/CatalogCompat.java Define typed compatibility interface for per-minor hooks.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java Define catalog-store factory options (incl. OAuth2 keys).
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java Implement Flink CatalogStoreFactory for Gravitino-backed catalog store.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java Harden factory discovery by skipping ServiceConfigurationError/LinkageError.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java Add Paimon catalog property conversions.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java Add Paimon factory options (default DB).
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java Refactor to allow version-specific catalog instantiation.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java Add shared Paimon catalog implementation delegating to Paimon Flink catalog.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/postgresql/PostgresqlPropertiesConverter.java Add Postgres JDBC property converter.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/postgresql/GravitinoPostgresJdbcCatalogFactory.java Add Postgres JDBC catalog factory implementation.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/mysql/MysqlPropertiesConverter.java Add MySQL JDBC property converter.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/mysql/GravitinoMysqlJdbcCatalogFactory.java Add MySQL JDBC catalog factory implementation.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConverter.java Add shared JDBC catalog/table property conversion logic.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConstants.java Define shared JDBC property key constants + mappings.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactoryOptions.java Define required/optional JDBC options using shared constants.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java Validate JDBC options and allow version-specific catalog instantiation.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java Generalize underlying catalog type + improve construction flexibility.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java Add Iceberg catalog property conversions.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java Centralize Iceberg property keys/constants.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java Add Iceberg factory options (default DB).
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java Refactor to create Gravitino Iceberg catalog with derived Iceberg options.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java Generalize underlying catalog type + support injected catalog instance.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/HiveSchemaAndTablePropertiesConverter.java Add Hive table/schema property conversion logic.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/HiveCatalogPropertiesConverter.java Add Hive catalog property conversions.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactoryOptions.java Add Hive factory options (metastore URIs).
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java Refactor to allow version-specific catalog instantiation.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java Route generic-table conversion through CatalogCompat-aware helpers.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/FlinkGenericTableUtil.java Make generic-table conversion accept CatalogCompat-provided builders/serializers.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java Add Gravitino-backed catalog manager with simple/Kerberos/OAuth2 auth selection.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java Add shared catalog-factory interface for Gravitino providers.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java Add CatalogCompat hook and route CatalogTable creation through it.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/UnsupportPartitionConverter.java Add partition converter for catalogs without partition support.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/SchemaAndTablePropertiesConverter.java Add shared schema/table properties conversion interface.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/PartitionConverter.java Add shared partition conversion interface.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/DefaultPartitionConverter.java Add default identity-only partition converter.
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/CatalogPropertiesConverter.java Add shared catalog properties conversion interface with bypass-prefix handling.
flink-connector/flink-common/build.gradle.kts Define flink-common artifact + add test-jar publication and cleanup behavior.
.github/workflows/flink-integration-test-action.yml Run Flink ITs from :flink-connector:flink-1.18 and upload versioned logs.
Comments suppressed due to low confidence (5)

flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT.java:252

  • getSharedHiveConfDir() uses fully qualified names (java.nio.file.Paths, org.apache.flink.util.Preconditions) inside the method. Prefer adding imports and using simple names for consistency and readability.
    flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java:83
  • newCatalog(...) uses fully qualified types (java.util.Map, @javax.annotation.Nullable) in the signature. Prefer adding imports and using simple names to keep the code consistent and easier to read.
    flink-connector/flink-common/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java:208
  • getSharedHiveConfDir() uses fully qualified names (e.g., java.nio.file.Paths) inside the method. Prefer standard imports and simpler formatting to match the repo’s Java import hygiene guideline and improve readability.
    flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java:43
  • createCatalog(...) builds a CatalogFactoryHelper but never calls validate() (or validateExcept(...)). This bypasses required/optional option validation and the intended "ignore unconsumed keys" warning behavior in FactoryUtils.GravitinoCatalogFactoryHelper.
    flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java:45
  • createCatalog(...) creates a CatalogFactoryHelper but never calls validate() (or equivalent). This skips option validation and prevents FactoryUtils from warning about unsupported/unconsumed options.

Comment on lines +38 to +41
Map<String, String> catalogOptions,
Map<String, String> icebergCatalogProperties,
CatalogFactory.Context context) {
super(
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor takes CatalogFactory.Context context but doesn’t use it. If it’s not needed for Flink 1.18, consider removing the parameter (and adjusting the factory) or using it (e.g., for classloader-specific behavior) to avoid a dead parameter.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b68f1e918. The 1.18 catalog implementation does not use CatalogFactory.Context, so I removed the dead constructor parameter from GravitinoIcebergCatalogFlink118 and kept the versioned factory path aligned with the actual usage.

java.util.Map<String, String> catalogOptions,
SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
PartitionConverter partitionConverter,
@javax.annotation.Nullable HiveConf hiveConf,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this ? @javax.annotation.Nullable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b68f1e9. I replaced the fully qualified @javax.annotation.Nullable with the normal import to match the repository style.

return DefaultPartitionConverter.INSTANCE;
}

private Map<String, String> toIcebergCatalogOptions(Map<String, String> catalogOptions) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this method. In the versioned split, the common Iceberg factory still needs one place to translate Gravitino catalog properties into the Iceberg-specific option set before delegating to the versioned catalog implementation. This helper keeps that mapping local and avoids duplicating the conversion logic in each version module.

context, defaultDatabase, schemaAndTablePropertiesConverter(), partitionConverter());
}

protected org.apache.flink.table.catalog.Catalog newCatalog(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename org.apache.flink.table.catalog.Catalog to Catalog

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b68f1e918. I switched this back to the imported Catalog type.

context.getOptions().remove(JdbcPropertiesConstants.FLINK_DRIVER);
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtils.createCatalogFactoryHelper(this, context);
helper.validate();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this validation step. The JDBC factory needs to validate the effective option set after the versioned refactor, especially because the Flink-facing options and the backend JDBC catalog options are not identical. Keeping the check here fails invalid configurations early instead of letting them surface later during catalog creation.

@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
return ImmutableSet.of(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why changing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this change. The required option set still needs to be enforced in the common JDBC factory after the versioned split; otherwise the factory can accept incomplete definitions and fail later in a less obvious way. This is a behavior-preserving validation change rather than a formatting change.

context, defaultDatabase, schemaAndTablePropertiesConverter(), partitionConverter());
}

protected Catalog newCatalog(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use an abstract method for newCatalog?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this as-is for now. The current protected template method keeps the version extension point while still allowing a shared default path in the common factory. Making it abstract would force every version module to re-declare the method even when the common behavior is still valid.

/** Typed compatibility hook that is implemented per Flink minor version. */
public interface CatalogCompat {

CatalogTable createCatalogTable(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add java doc for the method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b68f1e918. I added method-level javadocs here to make the compatibility contract clearer across versioned Flink modules.


protected String getSharedHiveConfDir() {
try {
return java.nio.file.Paths.get(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use long package name for this PR, rename java.nio.file.Paths to Paths.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b68f1e918. I changed this to use the imported Paths type.


private static final Logger LOG = LoggerFactory.getLogger(BaseIT.class);
private static final Splitter COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
private static final String LOOPBACK_HOST = "127.0.0.1";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change baseIT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up: I removed the BaseIT-related changes from this PR in 978cac0. After revisiting the scope, I agree this helper/host-normalization change is not required for the Flink versioned split itself, so I dropped it from the PR instead of keeping a revised variant here.


rootProject.name = "gravitino"

val scalaVersion: String = gradle.startParameter.projectProperties["scalaVersion"]?.toString()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why changing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted the formatting-only part in b68f1e918. There was no need to keep this stylistic change in the PR.

include("lance:lance-common")
include("lance:lance-rest-server")
include("authorizations:authorization-ranger", "authorizations:authorization-common", "authorizations:authorization-chain")
include(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't change this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted this formatting-only change in b68f1e918 to keep the PR diff focused.

project(":spark-connector:spark-3.3").projectDir = file("spark-connector/v3.3/spark")
project(":spark-connector:spark-runtime-3.3").projectDir = file("spark-connector/v3.3/spark-runtime")
include("spark-connector:spark-3.4", "spark-connector:spark-runtime-3.4", "spark-connector:spark-3.5", "spark-connector:spark-runtime-3.5")
project(":spark-connector:spark-runtime-3.3").projectDir =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't change this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this change. This include wiring is the actual semantic part of the PR because the versioned Flink modules need to be added to the build. I only reverted the unrelated formatting churn around it.

@lasdf1234
Copy link
Contributor

cc @FANNG1 I'm here.

protected CatalogCompat catalogCompat() {
// Versioned catalog entry classes override this hook when the Flink minor has a different
// catalog/table API path.
return DefaultCatalogCompat.INSTANCE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have any other oss project compatible with multiple versions,Is it implemented in the same way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think this needs a code change in this PR. The current structure uses a shared common layer plus version-specific adapters/hooks (catalogCompat() and the versioned catalog/factory classes) to isolate Flink minor-version API differences. That is the main reason for this split: keep the common behavior in one place and localize the Flink-version-specific parts in each version module.


protected CatalogTable toFlinkGenericTable(Table table) {
return FlinkGenericTableUtil.toFlinkGenericTable(table, catalogCompat());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move these methods back to the Util class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to keep these thin wrapper methods on the catalog class. They delegate to FlinkGenericTableUtil, but they also bind the call to the catalog instance so the version-specific catalogCompat() hook can be applied naturally by subclasses such as the Flink 1.18 catalog. Moving them back to the util class would make the compat path more scattered rather than simpler.


public static final ConfigOption<String> DRIVER =
ConfigOptions.key(JdbcPropertiesConstants.FLINK_DRIVER).stringType().noDefaultValue();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used old version and configured this properties,Flink can run normally.Is it necessary to put these attributes here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept these options here intentionally. This does not introduce a new user-facing requirement compared with the old implementation; it centralizes the Flink-side JDBC option definitions so requiredOptions(), optionalOptions(), and option lookups use the same source of truth after the versioned refactor. That keeps the factory logic clearer and avoids scattering the option keys across the implementation.

@FANNG1 FANNG1 changed the title [#9710] refactor(flink-connector): split versioned framework with Flink 1.18 baseline [#10541] refactor(flink-connector): split versioned framework with Flink 1.18 baseline Mar 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Flink] Refactor connector framework and add Flink 1.18 baseline support

3 participants