-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Improve][Connector-V2] Support dynamic write mode for JDBC Sink #8305
base: dev
Are you sure you want to change the base?
Conversation
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java
Outdated
Show resolved
Hide resolved
...or-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
Outdated
Show resolved
Hide resolved
...g/apache/seatunnel/connectors/seatunnel/jdbc/sink/savemode/JdbcTempTableSaveModeHandler.java
Outdated
Show resolved
Hide resolved
...or-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
Outdated
Show resolved
Hide resolved
...jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for dynamic write modes (SQL, COPY, COPY_SQL, MERGE, COPY_MERGE) for the JDBC Sink and updates related logic for temporary tables, configuration options, and documentation. Key changes include:
- Enhancements to the sink implementation (e.g., DynamicBufferedBatchStatementExecutor) to support merge operations via temporary tables.
- Updates to configuration options and JDBC dialects (e.g., PostgresDialect) to incorporate the new write modes.
- Modifications to e2e tests and documentation to reflect the new functionality.
Reviewed Changes
Copilot reviewed 20 out of 21 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
seatunnel-e2e/.../JdbcPostgresIT.java | Updated test configuration files and enhanced error messages with stderr output. |
seatunnel-connectors-v2/connector-jdbc/sink/savemode/JdbcSaveModeHandler.java | Modified to accept temporary table parameters and update schema creation accordingly. |
seatunnel-connectors-v2/connector-jdbc/sink/JdbcSinkFactory.java & JdbcSink.java | Added handling for new write modes and temporary table logic in sink creation. |
seatunnel-connectors-v2/connector-jdbc/internal/executor/DynamicBufferedBatchStatementExecutor.java | New executor supports dynamic buffering and merge operations using a temporary table. |
seatunnel-connectors-v2/connector-jdbc/internal/dialect/psql/PostgresDialect.java | Extended to provide a merge statement for PostgreSQL dialect. |
seatunnel-connectors-v2/connector-jdbc/internal/dialect/psql/PostgresCopyBatchStatementExecutor.java | Updated CSV handling and copy command preparation for the new write modes. |
Various config and documentation files | Added new options (write_mode, temp_table_name, temp_column_batch_code, temp_column_row_kind) and updated documentation accordingly. |
Files not reviewed (1)
- seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy.conf: Language not supported
Comments suppressed due to low confidence (2)
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/DynamicBufferedBatchStatementExecutor.java:242
- [nitpick] The variable name 'pariRow' appears to be a typo. Consider renaming it to 'pair' or 'rowPair' for improved clarity.
for (Pair<Boolean, SeaTunnelRow> pariRow : buffer.values()) {
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java:210
- If 'updateClause' is empty, the generated MERGE statement may be invalid. Consider handling this case (for example, by using 'DO NOTHING') when there is nothing to update.
String mergeSQL = String.format("%s ON CONFLICT (%s) DO UPDATE SET %s", insertIntoBySQLStatement, uniqueColumns, updateClause);
Purpose of this pull request
Support dynamic write mode: sql, copy/copy_sql/merge/copy_merge for JDBC Sink
Does this PR introduce any user-facing change?
no
How was this patch tested?
exist e2e test.
Check list
New License Guide
release-note
.