Skip to content

[FLINK-37288] Add Google Cloud Spanner dialect and catalog#156

Open
laughingman7743 wants to merge 8 commits into
apache:mainfrom
laughingman7743:jdbc_spanner_connector
Open

[FLINK-37288] Add Google Cloud Spanner dialect and catalog#156
laughingman7743 wants to merge 8 commits into
apache:mainfrom
laughingman7743:jdbc_spanner_connector

Conversation

@laughingman7743

@laughingman7743 laughingman7743 commented Feb 9, 2025

Copy link
Copy Markdown
Contributor

What is the purpose of the change

Adds Google Cloud Spanner support to flink-connector-jdbc as a new flink-connector-jdbc-spanner module.

  • Dialect: Type mapping between Spanner and Flink types, including array type write support
  • Catalog: Database/table discovery via Spanner Admin API and INFORMATION_SCHEMA, with semicolon-delimited URL parameter handling (;param=value)
  • Lineage: Extracts Spanner-specific location info (project, instance, database) from JDBC URLs
  • Docs: Added Spanner section to JDBC connector docs (English and Chinese)

Changes to core module

  • AbstractJdbcCatalog: Added protected constructor and hooks (calculateUrlFunction, validateConnectionProperties) for dialects with non-standard URL formats
  • FieldNamedPreparedStatement: Array type support

Testing

All features are tested against the Spanner emulator via Testcontainers: dialect type mapping, catalog operations, end-to-end read/write, and factory SPI registration.

Jira

https://issues.apache.org/jira/browse/FLINK-37288

Discussion

https://lists.apache.org/thread/4344z501pszv5h7gmr0311fw7h6h4sw4

@boring-cyborg

boring-cyborg Bot commented Feb 9, 2025

Copy link
Copy Markdown

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@laughingman7743 laughingman7743 marked this pull request as ready for review February 9, 2025 06:45
@laughingman7743 laughingman7743 changed the title [FLINK-37288] Add flink-connector-jdbc-spnner [FLINK-37288] Add Google Cloud Spanner dialect and catalog Feb 9, 2025
Comment on lines -133 to -138
this.connectionProperties = Preconditions.checkNotNull(connectionProperties);
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY)));
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(
connectionProperties.getProperty(PASSWORD_KEY)));

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spanner does not use password authentication.

Comment on lines -579 to -588
/**
* URL has to be without database, like "jdbc:dialect://localhost:1234/" or
* "jdbc:dialect://localhost:1234" rather than "jdbc:dialect://localhost:1234/db".
*/
protected static void validateJdbcUrl(String url) {
String[] parts = url.trim().split("\\/+");

checkArgument(parts.length == 2);
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of Spanner, the URL is as follows, so I have deleted this validation.

jdbc:cloudspanner://hostname/projects/gcp_project_id/instances/instance_id/databases/database_id


/** Test for {@link AbstractJdbcCatalog}. */
class AbstractJdbcCatalogTest {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is no longer needed because URL validation has been removed.

}

public Schema getTableSchema() {
public Schema getTableSchema(String pkConstraintName) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of Spanner, the contract name of the primary key is different, so it is possible to specify the contract name of the primary key.

@laughingman7743 laughingman7743 force-pushed the jdbc_spanner_connector branch 8 times, most recently from c7f0a5a to 3a73e0b Compare February 9, 2025 15:17
@laughingman7743

Copy link
Copy Markdown
Contributor Author

I have applied code formatting using Spotless and updated the documentation. It is ready for review.

@laughingman7743 laughingman7743 force-pushed the jdbc_spanner_connector branch 2 times, most recently from e4559c9 to 4ec422d Compare February 12, 2025 06:19
| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) |
| Trino | `io.trino` | `trino-jdbc` | [Download](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) |
| OceanBase | `com.oceanbase` | `oceanbase-client` | [Download](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) |
| Spanner | `com.google.cloud` | `google-cloud-spanner-jdbc` | [Download](https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner-jdbc) |

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the download url points to Maven like the spanner doc. How do I get the jar file?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data Type Mapping
----------------
Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2 and OceanBase. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.
Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2, OceanBase and Spanner. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:
connect -> the connection
which uses dialect like -> using dialects e.g.
The Derby dialect usually used for testing purpose. -> The Derby dialect is usually used for testing purpose.
JDBC table -> a JDBC table

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: 3c7d225

@laughingman7743

Copy link
Copy Markdown
Contributor Author

Does this repository not have an owner? Many PRs adding dialects are being left unreviewed. Does this mean that contributions are not accepted?

Comment on lines +135 to +148
// Check if base-url contains query parameters
int questionMarkIndex = baseUrl.indexOf('?');

if (questionMarkIndex == -1) {
// No parameters: traditional baseUrl + databaseName
return baseUrl + databaseName;
}

// Parameters present: insert database name before '?'
// Example: "jdbc:postgresql://localhost:5432/?sslmode=require"
// -> "jdbc:postgresql://localhost:5432/mydb?sslmode=require"
String urlWithoutParams = baseUrl.substring(0, questionMarkIndex);
String params = baseUrl.substring(questionMarkIndex);
return urlWithoutParams + databaseName + params;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Spanner tests, it is necessary to configure autoConfigEmulator to automatically set up the emulator. This enables parameter passing.

Comment on lines +177 to +187
@Override
public void setArray(int fieldIndex, Array x) throws SQLException {
for (int index : indexMapping[fieldIndex]) {
statement.setArray(index, x);
}
}

@Override
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
return connection.createArrayOf(typeName, elements);
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These implementations are necessary to support writing to Array types.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation introduces the first full ARRAY type write support in the
flink-connector-jdbc project. Currently, PostgreSQL dialect only supports reading
ARRAY types (throws IllegalStateException on write - see
PostgresDialectConverter.java:61-67).

If this change is merged, the implementation pattern here could serve as a reference
for adding ARRAY write support to PostgreSQL and other dialects in the future.

@laughingman7743

Copy link
Copy Markdown
Contributor Author

@davidradl Could you take another look when you have time? I've addressed the previous feedback and added some fixes for the test issues.

@laughingman7743 laughingman7743 force-pushed the jdbc_spanner_connector branch 3 times, most recently from fb7917c to 053359f Compare March 12, 2026 16:09
@github-actions

Copy link
Copy Markdown

This PR is being marked as stale since it has not had any activity in the last 90 days.
If you would like to keep this PR alive, please leave a comment asking for a review.
If the PR has merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out to the
community, contact details can be found here: https://flink.apache.org/what-is-flink/community/

If this PR is no longer valid or desired, please feel free to close it.
If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions Bot added the stale label Jun 11, 2026
@laughingman7743 laughingman7743 force-pushed the jdbc_spanner_connector branch from 053359f to df497eb Compare June 11, 2026 08:12
@laughingman7743

Copy link
Copy Markdown
Contributor Author

Rebased onto the latest main to resolve the stale status. This PR is still active and awaiting review. Any feedback would be greatly appreciated.

@github-actions github-actions Bot removed the stale label Jun 12, 2026
@laughingman7743 laughingman7743 force-pushed the jdbc_spanner_connector branch from df497eb to 8851d69 Compare June 18, 2026 05:15
@RocMarshal RocMarshal self-assigned this Jun 22, 2026
@RocMarshal

Copy link
Copy Markdown
Contributor

Hi, @laughingman7743 Could you help Run 'mvn spotless:apply' before the next review?
Thanks

@laughingman7743 laughingman7743 force-pushed the jdbc_spanner_connector branch from 8851d69 to cac553f Compare June 23, 2026 00:19
@laughingman7743

Copy link
Copy Markdown
Contributor Author

Hi @RocMarshal, thanks for taking a look!

I've run mvn spotless:apply and pushed the fix (the violation was a comment line in SpannerCatalog.java exceeding the line-length limit). I also rebased the branch onto the latest main. spotless:check now passes for all modules locally.

It looks like the CI needs maintainer approval to run — could you kindly approve the workflow when you have a chance? Thanks again for the review! 🙏

@laughingman7743 laughingman7743 force-pushed the jdbc_spanner_connector branch from cac553f to c0e976d Compare June 23, 2026 01:09
@laughingman7743

Copy link
Copy Markdown
Contributor Author

CI failure is a pre-existing flaky test on main, not caused by this PR

The red job (https://github.com/apache/flink-connector-jdbc/actions/runs/27995114250/job/82864660173) fails in DerbyDynamicTableSourceITCase.testLimit:

Caused by: java.sql.SQLNonTransientConnectionException: No current connection.  (ERROR 08003)
  at ...EmbedPreparedStatement.executeQuery(...)

This is not introduced by this PR:

Root cause of the flaky testLimit

testLimit runs SELECT * FROM t LIMIT 1 over a source split into 2 partitions (scan.partition.num=2). With LIMIT 1 the job completes after the first row and cancels the source while the split-fetcher thread is still opening the second split. Cancellation tears the JDBC connection down, so the in-flight JdbcSourceSplitReader.openResultSetForSplitWhenAtLeastOnce() -> statement.executeQuery() runs against an already-closed connection -> 08003: No current connection.

Two gaps in JdbcSourceSplitReader make this fatal and racy:

How the fix differs from the closed PR #191

That PR guarded only resultSet.isClosed() before extract() / resultSet.next() inside the record loop. But per the stack trace the failure is at executeQuery() when opening the next split, which that PR does not touch — so it would not reliably fix this. It was closed without merge.

The fix I'd propose is different and targets the actual cause — cooperative cancellation:

  1. wakeUp() / close() set a volatile boolean wakeup flag.
  2. checkSplitOrStartNext() checks the flag before opening a new split (the executeQuery() path) and returns "no more records" instead of starting a query during shutdown.
  3. In fetch(), if a SQLException occurs while the reader is shutting down (flag set / thread interrupted), treat it as a graceful end-of-split instead of rethrowing — this closes the residual race window where cancellation lands during the Derby call.

This covers the executeQuery() failure point that #191 missed, and removes the "shutdown error -> job failure" behavior. I'm happy to open a separate, focused PR + JIRA for this so both this PR and main go green.

@laughingman7743

Copy link
Copy Markdown
Contributor Author

Follow-up on the root cause above: I filed FLINK-39975 and opened #200 to fix the flaky DerbyDynamicTableSourceITCase.testLimit.

The fix makes JdbcSourceSplitReader recover from a connection that is closed while opening a split during source cancellation: it re-establishes the connection and retries the split-open (bounded, mirroring the retry/reconnect handling in JdbcOutputFormat.flush()). It is non-masking — a genuine query error on a healthy connection is rethrown immediately, and an unreachable database still fails the job after the retry budget. A deterministic regression test reproduces the exact 08003 "No current connection" error (the test fails without the fix and passes with it).

Once #200 lands on main, this flaky test should stop reddening unrelated PRs (including this one).

@laughingman7743 laughingman7743 force-pushed the jdbc_spanner_connector branch from c0e976d to b5f1c30 Compare June 23, 2026 15:25
@RocMarshal

Copy link
Copy Markdown
Contributor

Hi, @davidradl @eskabetxe
Could you help take a look if you had the free time ? thx

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants