[FLINK-39009] Support PostgreSQL unnest insert in dynamic table.#188
[FLINK-39009] Support PostgreSQL unnest insert in dynamic table.#188gaurav7261 wants to merge 1 commit into
Conversation
|
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
|
@wuchong can you please review? |
|
@wuchong can you please check and guide me to correct maintainer for this repo |
|
@och5351 can you please look it |
|
Hi, @gaurav7261 ! I am on vacation, so I think I will be able to check in two days. Thank you for your contribution. |
och5351
left a comment
There was a problem hiding this comment.
Hello, @gaurav7261!
I think most users of PostgreSQL JDBC will welcome this PR. I feel the same way.
However, I have discovered a few issues. Please let me know if I am mistaken about anything.
| LogicalType fieldType = fieldTypes.get(fieldIndex); | ||
| String arrayTypeName = dialect.getArrayTypeName(fieldType); | ||
|
|
||
| Array sqlArray = conn.createArrayOf(arrayTypeName, columnValues.toArray()); |
There was a problem hiding this comment.
It appears that array objects need to be released when the work is finished.
If the batch row size is greater than 1000, we may need to consider back pressure issues.
Please Take a look. |
cfcf9cf to
c9da8a5
Compare
|
@och5351 can you review again please |
eskabetxe
left a comment
There was a problem hiding this comment.
Thank you for your contribution @gaurav7261
I left some comments
| * | ||
| * @return true if UNNEST optimization is enabled, false otherwise | ||
| */ | ||
| public boolean isPostgresUnnestEnabled() { |
There was a problem hiding this comment.
Can we review the naming?
core module should not be database aware
| return url; | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Should we consider moving all these methods to a new interface and avoiding default implementations?
My main concern is that if the 'unnest' config gets activated for any reason, the sink might fail silently for databases that don't implement it.
Then we can check if unnest is active and if dialect implements unnest interface, or fail otherwise
| .defaultValue(3) | ||
| .withDescription("The max retry times if writing records to database failed."); | ||
|
|
||
| public static final ConfigOption<Boolean> SINK_POSTGRES_UNNEST_ENABLED = |
There was a problem hiding this comment.
review the naming as Jdbc default configurations should not be database aware.
If we end up needing a specific database configuration, let's look into adding it to the database module
| * @param type the logical type | ||
| * @return the extracted value in JDBC-compatible format | ||
| */ | ||
| private Object extractValue(RowData row, int pos, LogicalType type) { |
There was a problem hiding this comment.
should this be database implemented?
c9da8a5 to
e9b908a
Compare
|
@eskabetxe changes done, please review |
There was a problem hiding this comment.
Hi, @gaurav7261 !
I found array.free. thank you.
Could you please change the PR title to match the commit message?
And I found some issues.
Please take a look and let me know if i have misunderstood anything.
| } | ||
| } finally { | ||
| for (Array array : arrays) { | ||
| array.free(); |
There was a problem hiding this comment.
I found the deallocation . thank you.
| default: | ||
| throw new UnsupportedOperationException( | ||
| String.format( | ||
| "Type %s is not supported for bulk insert. " |
There was a problem hiding this comment.
Could you please indicate the support types when an exception occurs?
| List<Array> arrays = new ArrayList<>(fieldCount); | ||
|
|
||
| for (int fieldIndex = 0; fieldIndex < fieldCount; fieldIndex++) { | ||
| List<Object> columnValues = new ArrayList<>(batch.size()); |
There was a problem hiding this comment.
Could you please consider assigning the array directly instead of ArrayList.toArray()?
Object[] columnValues = new Object[batch.size()];
for (int rowIndex = 0; rowIndex < batch.size(); rowIndex++){
columnValues[rowIndex] = batch.get(rowIndex)[fieldIndex];
}
...
Array sqlArray = conn.createArrayOf(arrayTypeName, columnValues);The ArrayList.toArray method must allocate a new array.
Therefore, it appears that memory usage will increase.
This is likely to be even more the case, especially when the batch size is large.
https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/ArrayList.html#toArray()
e9b908a to
2e3f6b5
Compare
|
done @och5351 |
och5351
left a comment
There was a problem hiding this comment.
I have noticed some minor changes.
Also, please revise all comments to be concise.
| * @throws UnsupportedOperationException if the type is not supported for UNNEST | ||
| */ | ||
| @Override | ||
| public String getArrayTypeName(LogicalType logicalType) { |
There was a problem hiding this comment.
It appears that the uppercase type is actually being used. It would be best to change it to uppercase.
PostgresDialectTest.java
| typeName = typeName.substring(0, parenIndex).trim(); | ||
| } | ||
|
|
||
| return typeName.toUpperCase(); |
There was a problem hiding this comment.
If you use uppercase for getArrayTypeName, the toUpperCase() method seems unnecessary.
2e3f6b5 to
3917ab6
Compare
|
done @och5351 |
|
@och5351 can you please review |
1 similar comment
|
@och5351 can you please review |
[FLINK-39009] Support PostgreSQL unnest insert in dynamic table.](2e3f6b5)
1. Motivation
Currently, when using Flink's JDBC connector to sink data to PostgreSQL, the connector uses standard JDBC batching, which executes multiple individual
INSERTstatements. While functional, this approach has significant performance limitations:Performance Issues
The Problem
Standard JDBC batching in PostgreSQL:
This creates a unique query plan for each batch size, causing:
PostgreSQL Batching Already Works - Why Add UNNEST?
Important: PostgreSQL JDBC driver DOES support efficient batching with multi-value INSERTs, similar to MySQL's
rewriteBatchedStatements:So why add UNNEST optimization?
The issue isn't that batching doesn't work - it's about query planning overhead at scale. As documented in the Tiger Analytics blog on PostgreSQL query plan management, PostgreSQL generates a unique query plan for each distinct prepared statement:
INSERT ... VALUES (?,?,?), ...← 30 parameters → Plan 1INSERT ... VALUES (?,?,?), ...← 300 parameters → Plan 2INSERT ... VALUES (?,?,?), ...← 1,569 parameters → Plan 523In real-world streaming scenarios (CDC, event processing), batch sizes vary constantly due to timing, backpressure, and checkpointing. This creates query plan cache pollution.
UNNEST optimization provides:
pg_stat_statementsThis is an optimization ON TOP OF working batching, not a replacement.
Comparison Table
(Multi-value INSERT)
INSERT ... VALUES (...), (...), (...)(3 × row count)
(one per batch size)
(re-planning per size)
(This PR!)
INSERT ... UNNEST(?::type[]...)(one array per column)
(same for all batch sizes)
(plan reuse)
Key Insight: PostgreSQL batching works well, but UNNEST reduces query planning overhead by maintaining a stable query plan across varying batch sizes. This is especially valuable in streaming scenarios where batch sizes fluctuate.
Industry Solution
PostgreSQL's
UNNEST()function provides a native way to bulk insert multiple rows in a single SQL statement:Benefits:
This approach has been successfully implemented in:
This PR brings this proven optimization to Apache Flink!
2. Solution
This PR introduces PostgreSQL UNNEST optimization for bulk inserts and upserts in Flink's JDBC connector. The implementation:
sink.postgres.unnest.enabled(default:false)Architecture
The UNNEST executor is integrated as an inner executor within Flink's existing executor hierarchy:
This design ensures:
3. Implementation Details
3.1. Core Changes
A. Configuration (
JdbcExecutionOptions.java)B. Table API Configuration (
JdbcConnectorOptions.java)C. Dialect Interface (
JdbcDialect.java)Design Note: Using interface methods (not reflection!) following Flink's existing patterns like
getUpsertStatement().D. PostgreSQL Implementation (
PostgresDialect.java)E. UNNEST Executor (
TableUnnestStatementExecutor.java)Key Design Decisions:
RowData: Simpler than usingJdbcDialectConvertercreateArrayOf(): Standard JDBC API, no PostgreSQL-specific driver dependenciesF. Executor Selection (
JdbcOutputFormatBuilder.java)Fail-Fast Approach: No silent fallback. If UNNEST is enabled but not supported, the job fails immediately with a clear error message.
3.2. Supported Data Types
3.3. Integration Points
JdbcDynamicTableFactory: Readssink.postgres.unnest.enabledfrom Table DDL and passes toJdbcExecutionOptionsJdbcOutputFormatBuilder: Decides whether to use UNNEST based on configuration and batch size4. Usage Examples
4.1. Table API / SQL (Recommended)
Simple INSERT (Append-Only)
UPSERT (with Primary Key / CDC)
4.2. DataStream API
4.3. Disabling UNNEST (Fallback to Standard Batching)
6. Error Handling
Unsupported Type
Error Message:
Wrong Database Dialect
Error Message:
7. Testing
Unit Tests - PostgresDialectTest (9 new tests ✅)
Location:
flink-connector-jdbc-postgres/src/test/java/.../PostgresDialectTest.javaNew UNNEST-specific tests (compiles successfully, requires Docker to run):
testBatchInsertStatementWithUnnest- Verify UNNEST INSERT SQL structuretestBatchInsertStatementEmptyFields- Edge case: empty field arraystestBatchUpsertStatementWithUnnest- Verify UNNEST UPSERT SQL withON CONFLICTtestBatchUpsertStatementDoNothing- TestDO NOTHINGwhen all fields are keystestExtractBaseType- Verify type normalization (VARCHAR(255)→VARCHAR)testBatchStatementQueryPlanStability- Critical: Verify SQL is identical across callstestGetArrayTypeName- Test PostgreSQL type name mappingPlus existing tests:
testUpsertStatement- Standard UPSERT SQLIntegration Tests
PostgresDynamicTableSinkITCase (1 new test ✅)
Location:
flink-connector-jdbc-postgres/src/test/java/.../PostgresDynamicTableSinkITCase.javaThis test class extends
JdbcDynamicTableSinkITCase, which means:New test:
testUpsertWithUnnest()- Validates UPSERT with UNNEST enabled'sink.postgres.unnest.enabled' = 'true'Why this approach?
Other PostgreSQL Integration Tests (existing)
PostgresDynamicTableSourceITCase- Source operationsPostgresCatalogITCase- Catalog operationsTotal: 32 integration test files across all database modules
8. Backward Compatibility
✅ Fully Backward Compatible
sink.postgres.unnest.enabled = false9. Related Work
10. Future Work
11. Checklist
13. References
This PR significantly improves Flink's PostgreSQL sink performance while maintaining full backward compatibility. The implementation follows Flink's existing patterns and has been proven in production by other projects like Debezium.