diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index 1508ccd7774..676346b0c77 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -31,7 +31,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it. ## Options -| Name | Type | Required | Default | +| Name | Type | Required | Default | |-------------------------------------------|---------|----------|------------------------------| | url | String | Yes | - | | driver | String | Yes | - | @@ -59,8 +59,12 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it. | data_save_mode | Enum | No | APPEND_DATA | | custom_sql | String | No | - | | enable_upsert | Boolean | No | true | -| use_copy_statement | Boolean | No | false | | create_index | Boolean | No | true | +| use_copy_statement | Boolean | No | false | +| write_mode | Enum | No | sql | +| temp_table_name | String | No | - | +| temp_column_batch_code | String | No | - | +| temp_column_row_kind | String | No | - | ### driver [string] @@ -205,18 +209,36 @@ When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL Enable upsert by primary_keys exist, If the task has no key duplicate data, setting this parameter to `false` can speed up data import -### use_copy_statement [boolean] - -Use `COPY ${table} FROM STDIN` statement to import data. Only drivers with `getCopyAPI()` method connections are supported. e.g.: Postgresql driver `org.postgresql.Driver`. - -NOTICE: `MAP`, `ARRAY`, `ROW` types are not supported. - ### create_index [boolean] Create the index(contains primary key and any other indexes) or not when auto-create table. You can use this option to improve the performance of jdbc writes when migrating large tables. Notice: Note that this will sacrifice read performance, so you'll need to manually create indexes after the table migration to improve read performance +### write_mode [Enum] + +The write modes support five modes: SQL, COPY, COPY_SQL, MERGE, COPY_MERGE. + +- SQL (default): The traditional SQL mode using JDBC, supporting both full and incremental writes. +- COPY: Import data using the COPY command (requires DB support such as Postgres), only supports full writes. +- COPY_SQL: Import data using the COPY command (requires DB support), and dynamically switch to SQL mode for writing if there is incremental data. +- MERGE: Import into a temporary table using the COPY command (requires DB support), and then MERGE into the target table (requires DB support), supporting both full and incremental writes. +- COPY_MERGE: Use the COPY command to import full data into the target table (requires DB support); if there is incremental data, dynamically switch to using the COPY command to import into a temporary table, and then MERGE into the target table (requires DB support), supporting both full and incremental writes. + +NOTICE: when use MERGE/COPY_MERGE write mode, it will create a temporary table with the same structure as the target table automatically. + +### temp_table_name [String] + +The temporary table name used in the MERGE/COPY_MERGE write mode. If not specified, the system will generate by origin table name with suffix `_tmp`. + +### temp_column_batch_code [String] + +The temporary column used to batch write data in the MERGE/COPY_MERGE write mode. If not specified, the system will default to `__st_batch_code` column. + +### temp_column_row_kind [String] + +The temporary column used to identify the type of data in the MERGE/COPY_MERGE write mode. If not specified, the system will default to `__st_row_kind` column. + ## tips In the case of is_exactly_once = "true", Xa transactions are used. This requires database support, and some databases require some setup : diff --git a/docs/zh/connector-v2/sink/Jdbc.md b/docs/zh/connector-v2/sink/Jdbc.md index bcfd5d5a6de..0b0f6384f0e 100644 --- a/docs/zh/connector-v2/sink/Jdbc.md +++ b/docs/zh/connector-v2/sink/Jdbc.md @@ -58,6 +58,11 @@ import ChangeLog from '../changelog/connector-jdbc.md'; | custom_sql | String | 否 | - | | enable_upsert | Boolean | 否 | true | | use_copy_statement | Boolean | 否 | false | +| write_mode | Enum | 否 | sql | +| temp_table_name | String | 否 | - | +| temp_column_batch_code | String | 否 | - | +| temp_column_row_kind | String | 否 | - | + ### driver [string] @@ -199,12 +204,28 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md) 启用通过主键更新插入,如果任务没有key重复数据,设置该参数为 false 可以加快数据导入速度 -### use_copy_statement [boolean] +### write_mode [Enum] + +写入模式支持五种模式:SQL, COPY, COPY_SQL, MERGE, COPY_MERGE +- SQL(默认): 传统使用JDBC的SQL模式,支持全量和增量的写入 +- COPY: 使用COPY命令导入数据(需数据库支持如Postgres),仅支持全量写入 +- COPY_SQL: 使用COPY命令导入数据(需数据库支持),如果有增量数据动态切换为SQL模式写入 +- MERGE: 使用COPY命令导入临时表(需数据库支持),然后MERGE到目标表(需数据库支持),支持全量和增量的写入 +- COPY_MERGE: 使用COPY命令将全量数据导入目标表(需数据库支持);如果有增量数据动态切换为使用COPY命令导入到临时表,然后MERGE到目标表(需数据库支持),支持全量和增量的写入 + +注意: 当使用 MERGE/COPY_MERGE 写入模式时,它将自动创建一个与目标表结构相同的临时表。 + +### temp_table_name [String] + +在 MERGE/COPY_MERGE 写入模式中使用的临时表名称。如果未指定,系统将根据原始表名称生成,并添加后缀 `_tmp`。 + +### temp_column_batch_code [String] + +在 MERGE/COPY_MERGE 写入模式中用于批量写入数据的临时列。如果未指定,系统将默认使用 `__st_batch_code` 作为列名。 -使用 `COPY ${table} FROM STDIN` 语句导入数据。仅支持具有 `getCopyAPI()` 方法连接的驱动程序。例如:Postgresql -驱动程序 `org.postgresql.Driver` +### temp_column_row_kind [String] -注意:不支持 `MAP`、`ARRAY`、`ROW`类型 +在 MERGE/COPY_MERGE 写入模式中用于识别数据类型的临时列。如果未指定,系统将默认使用 `__st_row_kind` 作为列名。 ## tips diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java index e34b8ba4374..f3fb4aa8cb8 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java @@ -172,7 +172,8 @@ protected void dropTable() { catalog.dropTable(tablePath, true); } - protected void createTablePreCheck() { + public static void createTablePreCheck( + TablePath tablePath, Catalog catalog, CatalogTable catalogTable) { if (!catalog.databaseExists(tablePath.getDatabaseName())) { try { log.info( @@ -199,7 +200,7 @@ protected void createTablePreCheck() { } protected void createTable() { - createTablePreCheck(); + createTablePreCheck(tablePath, catalog, catalogTable); catalog.createTable(tablePath, catalogTable, true); isNewTableCreated = true; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java index 053ab71a413..cc8f033a70a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java @@ -170,6 +170,8 @@ public static final class Builder { public String kerberosKeytabPath; public String krb5Path = JdbcOptions.KRB5_PATH.defaultValue(); + public String tempTableName; + private Builder() {} public Builder url(String url) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java index ce1ac866cbe..d08d31ac87b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Map; +import static org.apache.seatunnel.api.sink.TablePlaceholder.REPLACE_TABLE_NAME_KEY; + @SuppressWarnings("checkstyle:MagicNumber") public interface JdbcOptions { @@ -152,17 +154,41 @@ public interface JdbcOptions { .defaultValue(true) .withDescription( "is the primary key updated when performing an update operation"); - Option SUPPORT_UPSERT_BY_INSERT_ONLY = - Options.key("support_upsert_by_insert_only") - .booleanType() - .defaultValue(false) - .withDescription("support upsert by insert only"); + + Option WRITE_MODE = + Options.key("write_mode") + .enumType(JdbcSinkConfig.WriteMode.class) + .defaultValue(JdbcSinkConfig.WriteMode.SQL) + .withDescription("write mode: SQL/COPY/COPY_SQL/MERGE/COPY_MERGE"); Option USE_COPY_STATEMENT = Options.key("use_copy_statement") .booleanType() .defaultValue(false) .withDescription("support copy in statement (postgresql)"); + Option TEMP_TABLE_NAME = + Options.key("temp_table_name") + .stringType() + .defaultValue("${" + REPLACE_TABLE_NAME_KEY.getPlaceholder() + "}_tmp") + .withDescription("temp table name"); + + Option TEMP_COLUMN_BATCH_CODE = + Options.key("temp_column_batch_code") + .stringType() + .defaultValue("__st_batch_code") + .withDescription("temp column batch code for merge"); + + Option TEMP_COLUMN_ROW_KIND = + Options.key("temp_column_row_kind") + .stringType() + .defaultValue("__st_row_kind") + .withDescription("temp column row kind for merge"); + + Option SUPPORT_UPSERT_BY_INSERT_ONLY = + Options.key("support_upsert_by_insert_only") + .booleanType() + .defaultValue(false) + .withDescription("support upsert by insert only"); /** source config */ Option PARTITION_COLUMN = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java index 2b90c3a7a7d..cb5d94dd83f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java @@ -18,8 +18,11 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.config; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.TablePlaceholder; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions; +import org.apache.commons.lang3.StringUtils; + import lombok.Builder; import lombok.Data; @@ -43,6 +46,10 @@ public class JdbcSinkConfig implements Serializable { private List primaryKeys; private boolean enableUpsert; @Builder.Default private boolean isPrimaryKeyUpdated = true; + private WriteMode writeMode; + private String tempTableName; + private String tempColumnRowKind; + private String tempColumnBatchCode; private boolean supportUpsertByInsertOnly; private boolean useCopyStatement; @Builder.Default private boolean createIndex = true; @@ -60,6 +67,31 @@ public static JdbcSinkConfig of(ReadonlyConfig config) { builder.simpleSql(config.get(JdbcOptions.QUERY)); builder.useCopyStatement(config.get(JdbcOptions.USE_COPY_STATEMENT)); builder.createIndex(config.get(JdbcCatalogOptions.CREATE_INDEX)); + builder.writeMode(config.get(JdbcOptions.WRITE_MODE)); + String tempTableName = config.get(JdbcOptions.TEMP_TABLE_NAME); + if (StringUtils.isNotBlank(tempTableName) + && config.getOptional(JdbcOptions.TABLE).isPresent()) { + String tableName = config.get(JdbcOptions.TABLE); + int index = tableName.lastIndexOf("."); + if (index > -1) { + tableName = tableName.substring(index + 1); + } + tempTableName = + tempTableName.replace( + "${" + TablePlaceholder.REPLACE_TABLE_NAME_KEY.getPlaceholder() + "}", + tableName); + } + builder.tempTableName(tempTableName); + builder.tempColumnBatchCode(config.get(JdbcOptions.TEMP_COLUMN_BATCH_CODE)); + builder.tempColumnRowKind(config.get(JdbcOptions.TEMP_COLUMN_ROW_KIND)); return builder.build(); } + + public enum WriteMode { + SQL, + COPY, + COPY_SQL, + MERGE, + COPY_MERGE, + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java index 7748823ca48..11619225490 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java @@ -27,7 +27,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.CopyManagerBatchStatementExecutor; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.DynamicBufferedBatchStatementExecutor; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor; @@ -67,12 +67,12 @@ public JdbcOutputFormat build() { jdbcSinkConfig.getDatabase() + "." + jdbcSinkConfig.getTable())); final List primaryKeys = jdbcSinkConfig.getPrimaryKeys(); - if (jdbcSinkConfig.isUseCopyStatement()) { - statementExecutorFactory = - () -> - createCopyInBufferStatementExecutor( - createCopyInBatchStatementExecutor( - dialect, table, tableSchema)); + if ((JdbcSinkConfig.WriteMode.COPY.equals(jdbcSinkConfig.getWriteMode()) + || JdbcSinkConfig.WriteMode.MERGE.equals(jdbcSinkConfig.getWriteMode()) + || JdbcSinkConfig.WriteMode.COPY_MERGE.equals(jdbcSinkConfig.getWriteMode()) + || JdbcSinkConfig.WriteMode.COPY_SQL.equals(jdbcSinkConfig.getWriteMode())) + || jdbcSinkConfig.isUseCopyStatement()) { + statementExecutorFactory = this::createDynamicBufferedExecutor; } else if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) { statementExecutorFactory = () -> @@ -107,6 +107,42 @@ public JdbcOutputFormat build() { statementExecutorFactory); } + private JdbcBatchStatementExecutor createDynamicBufferedExecutor() { + final String database = jdbcSinkConfig.getDatabase(); + final TablePath tablePath = + TablePath.of(jdbcSinkConfig.getDatabase() + "." + jdbcSinkConfig.getTable()); + final String table = dialect.extractTableName(tablePath); + + JdbcBatchStatementExecutor bufferReducedBatchStatementExecutor = null; + final List primaryKeys = jdbcSinkConfig.getPrimaryKeys(); + if (JdbcSinkConfig.WriteMode.COPY_SQL.equals(jdbcSinkConfig.getWriteMode())) { + if (primaryKeys == null || primaryKeys.isEmpty()) { + throw new RuntimeException( + "Primary key is not set, can not execute upsert operation"); + } + bufferReducedBatchStatementExecutor = + createUpsertBufferedExecutor( + dialect, + database, + table, + tableSchema, + databaseTableSchema, + primaryKeys.toArray(new String[0]), + jdbcSinkConfig.isEnableUpsert(), + jdbcSinkConfig.isPrimaryKeyUpdated(), + jdbcSinkConfig.isSupportUpsertByInsertOnly()); + } + final JdbcBatchStatementExecutor finalBufferReducedBatchStatementExecutor = + bufferReducedBatchStatementExecutor; + return new DynamicBufferedBatchStatementExecutor( + tablePath, + tableSchema, + dialect, + jdbcSinkConfig, + finalBufferReducedBatchStatementExecutor, + Function.identity()); + } + private static JdbcBatchStatementExecutor createSimpleBufferedExecutor( JdbcDialect dialect, String database, @@ -222,22 +258,6 @@ private static JdbcBatchStatementExecutor createUpsertExecutor( isPrimaryKeyUpdated); } - private static JdbcBatchStatementExecutor createCopyInBufferStatementExecutor( - CopyManagerBatchStatementExecutor copyManagerBatchStatementExecutor) { - return new BufferedBatchStatementExecutor( - copyManagerBatchStatementExecutor, Function.identity()); - } - - private static CopyManagerBatchStatementExecutor createCopyInBatchStatementExecutor( - JdbcDialect dialect, String table, TableSchema tableSchema) { - String columns = - Arrays.stream(tableSchema.getFieldNames()) - .map(dialect::quoteIdentifier) - .collect(Collectors.joining(",", "(", ")")); - String copyInSql = String.format("COPY %s %s FROM STDIN WITH CSV", table, columns); - return new CopyManagerBatchStatementExecutor(copyInSql, tableSchema); - } - private static JdbcBatchStatementExecutor createInsertOnlyExecutor( JdbcDialect dialect, String database, @@ -352,7 +372,7 @@ private static JdbcBatchStatementExecutor createSimpleExecutor( rowConverter); } - static Function createKeyExtractor(int[] pkFields) { + public static Function createKeyExtractor(int[] pkFields) { return row -> { Object[] fields = new Object[pkFields.length]; for (int i = 0; i < pkFields.length; i++) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index 6ec44d92f84..011dcc421a8 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -239,6 +239,16 @@ default String getRowExistsStatement( Optional getUpsertStatement( String database, String tableName, String[] fieldNames, String[] uniqueKeyFields); + default Optional getMergeStatement( + String sourceSQL, + String database, + String tableName, + String[] fieldNames, + String[] uniqueKeyFields, + boolean isPrimaryKeyUpdated) { + return Optional.empty(); + } + /** * Different dialects optimize their PreparedStatement * diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresCopyBatchStatementExecutor.java similarity index 53% rename from seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java rename to seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresCopyBatchStatementExecutor.java index b485d39de1e..1112d6d3a8e 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresCopyBatchStatementExecutor.java @@ -14,8 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor; +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql; + +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -23,66 +25,80 @@ import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.CopyBatchStatementExecutor; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.csv.QuoteMode; + +import org.postgresql.PGConnection; + +import com.google.auto.service.AutoService; +import lombok.Getter; import java.io.IOException; +import java.io.Reader; import java.io.StringReader; -import java.lang.reflect.InvocationTargetException; -import java.math.BigDecimal; import java.sql.Connection; import java.sql.SQLException; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; + +@AutoService(CopyBatchStatementExecutor.class) +public class PostgresCopyBatchStatementExecutor implements CopyBatchStatementExecutor { -public class CopyManagerBatchStatementExecutor implements JdbcBatchStatementExecutor { + private static final String COMMA = ","; + private static final String DOUBLE_QUOTE = Character.valueOf('"').toString(); + private static final char LF = '\n'; + private static final String EMPTY = ""; - private final String copySql; - private final TableSchema tableSchema; - CopyManagerProxy copyManagerProxy; - CSVFormat csvFormat = CSVFormat.POSTGRESQL_CSV; + protected String copySql; + @Getter protected TableSchema tableSchema; + private Connection connection; + CSVFormat csvFormat = + CSVFormat.DEFAULT + .builder() + .setDelimiter(COMMA) + .setEscape(null) + .setIgnoreEmptyLines(false) + .setQuote(null) + .setRecordSeparator(LF) + .setNullString(EMPTY) + .setQuoteMode(QuoteMode.ALL_NON_NULL) + .build(); CSVPrinter csvPrinter; - public CopyManagerBatchStatementExecutor(String copySql, TableSchema tableSchema) { - this.copySql = copySql; - this.tableSchema = tableSchema; - } + @Getter private boolean flushed = true; - public static void copyManagerProxyChecked(JdbcConnectionProvider connectionProvider) { - try (Connection connection = connectionProvider.getConnection()) { - new CopyManagerProxy(connection); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - throw new JdbcConnectorException( - JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED, - "unable to open CopyManager Operation in this JDBC writer. Please configure option use_copy_statement = false.", - e); - } catch (SQLException e) { - throw new JdbcConnectorException( - JdbcConnectorErrorCode.CREATE_DRIVER_FAILED, "unable to open JDBC writer", e); - } + @Override + public void init(TablePath tablePath, TableSchema tableSchema) { + JdbcDialect dialect = new PostgresDialect(); + this.tableSchema = tableSchema; + String tableName = dialect.extractTableName(tablePath); + String columns = + Arrays.stream(tableSchema.getFieldNames()) + .map(dialect::quoteIdentifier) + .collect(Collectors.joining(",", "(", ")")); + this.copySql = String.format("COPY %s %s FROM STDIN WITH CSV", tableName, columns); } @Override - public void prepareStatements(Connection connection) throws SQLException { + public void prepareStatements(Connection connection) { try { - this.copyManagerProxy = new CopyManagerProxy(connection); + this.connection = connection; this.csvPrinter = new CSVPrinter(new StringBuilder(), csvFormat); - } catch (NoSuchMethodException - | IllegalAccessException - | InvocationTargetException - | IOException e) { + } catch (IOException e) { throw new JdbcConnectorException( JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED, - "unable to open CopyManager Operation in this JDBC writer. Please configure option use_copy_statement = false.", + "unable to open CopyManager Operation in this JDBC writer.", e); - } catch (SQLException e) { - throw new JdbcConnectorException( - JdbcConnectorErrorCode.CREATE_DRIVER_FAILED, "unable to open JDBC writer", e); } } @@ -90,12 +106,13 @@ public void prepareStatements(Connection connection) throws SQLException { public void addToBatch(SeaTunnelRow record) throws SQLException { try { this.csvPrinter.printRecord(toExtract(record)); + flushed = false; } catch (IOException e) { throw new RuntimeException(e); } } - private List toExtract(SeaTunnelRow record) { + protected List toExtract(SeaTunnelRow record) { SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType(); List csvRecord = new ArrayList<>(); for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) { @@ -106,49 +123,52 @@ private List toExtract(SeaTunnelRow record) { continue; } switch (seaTunnelDataType.getSqlType()) { - case STRING: - csvRecord.add((String) record.getField(fieldIndex)); - break; case BOOLEAN: - csvRecord.add((Boolean) record.getField(fieldIndex)); - break; case TINYINT: - csvRecord.add((Byte) record.getField(fieldIndex)); - break; case SMALLINT: - csvRecord.add((Short) record.getField(fieldIndex)); - break; case INT: - csvRecord.add((Integer) record.getField(fieldIndex)); - break; case BIGINT: - csvRecord.add((Long) record.getField(fieldIndex)); - break; case FLOAT: - csvRecord.add((Float) record.getField(fieldIndex)); - break; case DOUBLE: - csvRecord.add((Double) record.getField(fieldIndex)); - break; case DECIMAL: - csvRecord.add((BigDecimal) record.getField(fieldIndex)); + csvRecord.add(record.getField(fieldIndex)); break; case DATE: LocalDate localDate = (LocalDate) record.getField(fieldIndex); - csvRecord.add((java.sql.Date) java.sql.Date.valueOf(localDate)); + csvRecord.add(java.sql.Date.valueOf(localDate)); break; case TIME: LocalTime localTime = (LocalTime) record.getField(fieldIndex); - csvRecord.add((java.sql.Time) java.sql.Time.valueOf(localTime)); + csvRecord.add(java.sql.Time.valueOf(localTime)); break; case TIMESTAMP: LocalDateTime localDateTime = (LocalDateTime) record.getField(fieldIndex); - csvRecord.add((java.sql.Timestamp) java.sql.Timestamp.valueOf(localDateTime)); + csvRecord.add(java.sql.Timestamp.valueOf(localDateTime)); break; case BYTES: - csvRecord.add( - org.apache.commons.codec.binary.Base64.encodeBase64String( - (byte[]) record.getField(fieldIndex))); + StringBuilder hexString = new StringBuilder("\\x"); + for (byte b : (byte[]) record.getField(fieldIndex)) { + hexString.append(String.format("%02x", b)); + } + csvRecord.add(hexString.toString()); + break; + case STRING: + Object val = record.getField(fieldIndex); + if (val != null) { + String strVal = val.toString(); + boolean containsQuote = strVal.contains(DOUBLE_QUOTE); + if (strVal.contains(COMMA) || containsQuote) { + strVal = + containsQuote + ? strVal.replace( + DOUBLE_QUOTE, DOUBLE_QUOTE + DOUBLE_QUOTE) + : strVal; + strVal = DOUBLE_QUOTE + strVal + DOUBLE_QUOTE; + } + csvRecord.add(strVal); + } else { + csvRecord.add(null); + } break; case NULL: csvRecord.add(null); @@ -165,19 +185,24 @@ private List toExtract(SeaTunnelRow record) { return csvRecord; } + private long doCopy(String sql, Reader reader) throws SQLException, IOException { + PGConnection pgConnection = connection.unwrap(PGConnection.class); + return pgConnection.getCopyAPI().copyIn(sql, reader); + } + @Override public void executeBatch() throws SQLException { try { this.csvPrinter.flush(); - this.copyManagerProxy.doCopy( - copySql, new StringReader(this.csvPrinter.getOut().toString())); - } catch (InvocationTargetException | IllegalAccessException | IOException e) { + doCopy(copySql, new StringReader(this.csvPrinter.getOut().toString())); + } catch (SQLException | IOException e) { throw new JdbcConnectorException( CommonErrorCodeDeprecated.SQL_OPERATION_FAILED, "Sql command: " + copySql); } finally { try { this.csvPrinter.close(); this.csvPrinter = new CSVPrinter(new StringBuilder(), csvFormat); + flushed = true; } catch (Exception ignore) { } } @@ -185,11 +210,15 @@ public void executeBatch() throws SQLException { @Override public void closeStatements() throws SQLException { - this.copyManagerProxy = null; try { this.csvPrinter.close(); this.csvPrinter = null; } catch (Exception ignore) { } } + + @Override + public String dialectName() { + return DatabaseIdentifier.POSTGRESQL; + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java index b56930303d7..04bcc26f3d9 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java @@ -176,6 +176,46 @@ public Optional getUpsertStatement( return Optional.of(upsertSQL); } + @Override + public Optional getMergeStatement( + String sourceSQL, + String database, + String tableName, + String[] fieldNames, + String[] uniqueKeyFields, + boolean isPrimaryKeyUpdated) { + String uniqueColumns = + Arrays.stream(uniqueKeyFields) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + String updateClause = + Arrays.stream(fieldNames) + .filter( + fieldName -> + isPrimaryKeyUpdated + || !Arrays.asList(uniqueKeyFields) + .contains(fieldName)) + .map( + fieldName -> + quoteIdentifier(fieldName) + + "=EXCLUDED." + + quoteIdentifier(fieldName)) + .collect(Collectors.joining(", ")); + String columns = + Arrays.stream(fieldNames) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + String insertIntoBySQLStatement = + String.format( + "INSERT INTO %s (%s) %s", + tableIdentifier(database, tableName), columns, sourceSQL); + String mergeSQL = + String.format( + "%s ON CONFLICT (%s) DO UPDATE SET %s", + insertIntoBySQLStatement, uniqueColumns, updateClause); + return Optional.of(mergeSQL); + } + @Override public PreparedStatement creatPreparedStatement( Connection connection, String queryTemplate, int fetchSize) throws SQLException { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyBatchStatementExecutor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyBatchStatementExecutor.java new file mode 100644 index 00000000000..865ead71e77 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyBatchStatementExecutor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor; + +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +public interface CopyBatchStatementExecutor extends JdbcBatchStatementExecutor { + String dialectName(); + + void init(TablePath tablePath, TableSchema tableSchema); + + boolean isFlushed(); +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyBatchStatementExecutorFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyBatchStatementExecutorFactory.java new file mode 100644 index 00000000000..df169508ac6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyBatchStatementExecutorFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; + +import java.util.LinkedList; +import java.util.List; +import java.util.ServiceConfigurationError; +import java.util.ServiceLoader; + +public class CopyBatchStatementExecutorFactory { + public static CopyBatchStatementExecutor create(String dialectOrCompatibleMode) { + try { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + final List result = new LinkedList<>(); + ServiceLoader.load(CopyBatchStatementExecutor.class, classLoader) + .iterator() + .forEachRemaining(result::add); + for (CopyBatchStatementExecutor executor : result) { + if (executor.dialectName().equalsIgnoreCase(dialectOrCompatibleMode)) { + return executor; + } + } + throw new UnsupportedOperationException( + String.format( + "Unsupported dialect or compatible mode: %s for copy batch statement executor.", + dialectOrCompatibleMode)); + } catch (ServiceConfigurationError e) { + throw new JdbcConnectorException( + JdbcConnectorErrorCode.NO_SUITABLE_DIALECT_FACTORY, + "Could not load service provider for jdbc copy batch statement executor factory.", + e); + } + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerProxy.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerProxy.java deleted file mode 100644 index 54d99e345d3..00000000000 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerProxy.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Reader; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.sql.Connection; -import java.sql.SQLException; - -class CopyManagerProxy { - private static final Logger LOG = LoggerFactory.getLogger(CopyManagerProxy.class); - Object connection; - Object copyManager; - Class connectionClazz; - Class copyManagerClazz; - Method getCopyAPIMethod; - Method copyInMethod; - - CopyManagerProxy(Connection connection) - throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, - SQLException { - LOG.info("Proxy connection class: {}", connection.getClass().getName()); - this.connection = connection.unwrap(Connection.class); - LOG.info("Proxy unwrap connection class: {}", this.connection.getClass().getName()); - if (Proxy.isProxyClass(this.connection.getClass())) { - InvocationHandler handler = Proxy.getInvocationHandler(this.connection); - this.connection = getConnectionFromInvocationHandler(handler); - if (null == this.connection) { - throw new InvocationTargetException( - new NullPointerException("Proxy Connection is null.")); - } - LOG.info("Proxy connection class: {}", this.connection.getClass().getName()); - this.connectionClazz = this.connection.getClass(); - } else { - this.connectionClazz = this.connection.getClass(); - } - this.getCopyAPIMethod = this.connectionClazz.getMethod("getCopyAPI"); - this.copyManager = this.getCopyAPIMethod.invoke(this.connection); - this.copyManagerClazz = this.copyManager.getClass(); - this.copyInMethod = this.copyManagerClazz.getMethod("copyIn", String.class, Reader.class); - } - - long doCopy(String sql, Reader reader) - throws InvocationTargetException, IllegalAccessException { - return (long) this.copyInMethod.invoke(this.copyManager, sql, reader); - } - - private static Object getConnectionFromInvocationHandler(InvocationHandler handler) - throws IllegalAccessException { - Class handlerClass = handler.getClass(); - LOG.info("InvocationHandler class: {}", handlerClass.getName()); - for (Field declaredField : handlerClass.getDeclaredFields()) { - boolean tempAccessible = declaredField.isAccessible(); - if (!tempAccessible) { - declaredField.setAccessible(true); - } - Object handlerObject = declaredField.get(handler); - if (handlerObject instanceof Connection) { - if (!tempAccessible) { - declaredField.setAccessible(tempAccessible); - } - return handlerObject; - } else { - if (!tempAccessible) { - declaredField.setAccessible(tempAccessible); - } - } - } - return null; - } -} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/DynamicBufferedBatchStatementExecutor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/DynamicBufferedBatchStatementExecutor.java new file mode 100644 index 00000000000..f04112a7d39 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/DynamicBufferedBatchStatementExecutor.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.UUID; +import java.util.function.Function; + +public class DynamicBufferedBatchStatementExecutor + implements JdbcBatchStatementExecutor { + + private final CopyBatchStatementExecutor copyBatchStatementExecutor; + + private CopyBatchStatementExecutor tmpCopyBatchStatementExecutor; + + private final JdbcBatchStatementExecutor bufferReducedBatchStatementExecutor; + + private final TablePath tablePath; + private TablePath tmpTablePath; + private final TableSchema tableSchema; + private final JdbcDialect dialect; + private final JdbcSinkConfig jdbcSinkConfig; + private final Function valueTransform; + + private Connection connection; + + private Function keyExtractor; + private boolean hasUpsertRowKind = false; + + private String dialectOrCompatibleMode; + + private final LinkedHashMap> buffer = + new LinkedHashMap<>(); + + public DynamicBufferedBatchStatementExecutor( + TablePath tablePath, + TableSchema tableSchema, + JdbcDialect dialect, + JdbcSinkConfig jdbcSinkConfig, + JdbcBatchStatementExecutor bufferReducedBatchStatementExecutor, + Function valueTransform) { + this.tablePath = tablePath; + this.tableSchema = tableSchema; + this.dialect = dialect; + this.jdbcSinkConfig = jdbcSinkConfig; + this.bufferReducedBatchStatementExecutor = bufferReducedBatchStatementExecutor; + this.valueTransform = valueTransform; + + if (jdbcSinkConfig.getJdbcConnectionConfig() != null + && StringUtils.isNotBlank( + jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode())) { + dialectOrCompatibleMode = jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode(); + } else { + dialectOrCompatibleMode = dialect.dialectName(); + } + this.copyBatchStatementExecutor = + CopyBatchStatementExecutorFactory.create(dialectOrCompatibleMode); + this.copyBatchStatementExecutor.init(tablePath, tableSchema); + + if (jdbcSinkConfig.getWriteMode().equals(JdbcSinkConfig.WriteMode.MERGE) + || jdbcSinkConfig.getWriteMode().equals(JdbcSinkConfig.WriteMode.COPY_MERGE)) { + if (jdbcSinkConfig.getPrimaryKeys() == null + || jdbcSinkConfig.getPrimaryKeys().isEmpty()) { + throw new RuntimeException( + "Primary key is not set, can not execute merge operation"); + } + if (StringUtils.isNotBlank(jdbcSinkConfig.getTempTableName())) { + this.tmpTablePath = + TablePath.of( + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + jdbcSinkConfig.getTempTableName().toLowerCase()); + } else { + this.tmpTablePath = + TablePath.of( + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName() + "_tmp"); + } + this.tmpCopyBatchStatementExecutor = + createTmpCopyBufferedExecutor(tmpTablePath, tableSchema); + + List primaryKeys = jdbcSinkConfig.getPrimaryKeys(); + String[] pkNames = primaryKeys.toArray(new String[0]); + int[] pkFields = + Arrays.stream(pkNames) + .mapToInt(tableSchema.toPhysicalRowDataType()::indexOf) + .toArray(); + this.keyExtractor = JdbcOutputFormatBuilder.createKeyExtractor(pkFields); + } + } + + private CopyBatchStatementExecutor createTmpCopyBufferedExecutor( + TablePath tmpTablePath, TableSchema tableSchema) { + List tmpColumnsSchema = new ArrayList<>(tableSchema.getColumns()); + tmpColumnsSchema.add( + new PhysicalColumn( + jdbcSinkConfig.getTempColumnBatchCode(), + BasicType.STRING_TYPE, + null, + null, + false, + null, + null)); + tmpColumnsSchema.add( + new PhysicalColumn( + jdbcSinkConfig.getTempColumnRowKind(), + BasicType.INT_TYPE, + null, + null, + false, + null, + null)); + TableSchema tmpTableSchema = + new TableSchema( + tmpColumnsSchema, + tableSchema.getPrimaryKey(), + tableSchema.getConstraintKeys()); + CopyBatchStatementExecutor copyBatchStatementExecutor = + CopyBatchStatementExecutorFactory.create(dialectOrCompatibleMode); + copyBatchStatementExecutor.init(tmpTablePath, tmpTableSchema); + return copyBatchStatementExecutor; + } + + @Override + public void prepareStatements(Connection connection) throws SQLException { + this.connection = connection; + this.copyBatchStatementExecutor.prepareStatements(connection); + if (this.tmpCopyBatchStatementExecutor != null) { + this.tmpCopyBatchStatementExecutor.prepareStatements(connection); + } + if (this.bufferReducedBatchStatementExecutor != null) { + this.bufferReducedBatchStatementExecutor.prepareStatements(connection); + } + } + + @Override + public void addToBatch(SeaTunnelRow record) throws SQLException { + if (jdbcSinkConfig.getWriteMode().equals(JdbcSinkConfig.WriteMode.COPY) + || jdbcSinkConfig.isUseCopyStatement()) { + if (!RowKind.INSERT.equals(record.getRowKind())) { + throw new RuntimeException("Only support INSERT row kind when writeMode is COPY"); + } + copyBatchStatementExecutor.addToBatch(record); + } else { + if (RowKind.UPDATE_BEFORE.equals(record.getRowKind())) { + hasUpsertRowKind = true; + return; + } + if ((jdbcSinkConfig.getWriteMode().equals(JdbcSinkConfig.WriteMode.COPY_MERGE) + || jdbcSinkConfig + .getWriteMode() + .equals(JdbcSinkConfig.WriteMode.COPY_SQL)) + && !hasUpsertRowKind + && RowKind.INSERT.equals(record.getRowKind())) { + copyBatchStatementExecutor.addToBatch(record); + return; + } + if (!RowKind.INSERT.equals(record.getRowKind())) { + hasUpsertRowKind = true; + } + if (bufferReducedBatchStatementExecutor != null) { + bufferReducedBatchStatementExecutor.addToBatch(record); + return; + } + // reduce row to buffer + if (keyExtractor == null) { + throw new RuntimeException( + "Primary key is not set, can not execute merge operation"); + } + SeaTunnelRow key = keyExtractor.apply(record); + boolean changeFlag = + RowKind.INSERT.equals(record.getRowKind()) + || RowKind.UPDATE_AFTER.equals(record.getRowKind()); + SeaTunnelRow value = valueTransform.apply(record); + buffer.put(key, Pair.of(changeFlag, value)); + } + } + + @Override + public void executeBatch() throws SQLException { + if (jdbcSinkConfig.getWriteMode().equals(JdbcSinkConfig.WriteMode.COPY) + || jdbcSinkConfig.isUseCopyStatement()) { + copyBatchStatementExecutor.executeBatch(); + return; + } + if ((jdbcSinkConfig.getWriteMode().equals(JdbcSinkConfig.WriteMode.COPY_MERGE) + || jdbcSinkConfig.getWriteMode().equals(JdbcSinkConfig.WriteMode.COPY_SQL)) + && !copyBatchStatementExecutor.isFlushed()) { + // handle batch first + copyBatchStatementExecutor.executeBatch(); + } + + if (bufferReducedBatchStatementExecutor != null) { + bufferReducedBatchStatementExecutor.executeBatch(); + return; + } + + boolean originAutoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + try { + String batchCode = UUID.randomUUID().toString().replace("-", ""); + // copy to tmp table + for (Pair pariRow : buffer.values()) { + SeaTunnelRow row = pariRow.getRight(); + Object[] fields = row.getFields(); + Object[] newFields = new Object[fields.length + 2]; + System.arraycopy(fields, 0, newFields, 0, fields.length); + newFields[newFields.length - 2] = batchCode; + newFields[newFields.length - 1] = (int) row.getRowKind().toByteValue(); + SeaTunnelRow newRow = new SeaTunnelRow(newFields); + tmpCopyBatchStatementExecutor.addToBatch(newRow); + } + tmpCopyBatchStatementExecutor.executeBatch(); + + mergeTempTableToTargetTable(batchCode); + + connection.commit(); + buffer.clear(); + } catch (Exception e) { + connection.rollback(); + throw e; + } finally { + connection.setAutoCommit(originAutoCommit); + } + } + + private void mergeTempTableToTargetTable(String batchCode) throws SQLException { + // handle buffer for merge + PrimaryKey primaryKey = tableSchema.getPrimaryKey(); + List primaryKeyColumns = primaryKey.getColumnNames(); + StringJoiner condition = new StringJoiner(" AND "); + for (String primaryKeyColumn : primaryKeyColumns) { + condition.add(primaryKeyColumn + "=tmp." + primaryKeyColumn); + } + String table = dialect.extractTableName(tablePath); + String tmpTable = dialect.extractTableName(tmpTablePath); + String deleteSQL = + String.format( + "DELETE FROM %s WHERE EXISTS (SELECT 1 FROM %s tmp WHERE tmp.%s=? AND tmp.%s=? AND %s)", + dialect.tableIdentifier(jdbcSinkConfig.getDatabase(), table), + dialect.tableIdentifier(jdbcSinkConfig.getDatabase(), tmpTable), + jdbcSinkConfig.getTempColumnBatchCode(), + jdbcSinkConfig.getTempColumnRowKind(), + condition); + try (PreparedStatement pStmt = connection.prepareStatement(deleteSQL)) { + pStmt.setString(1, batchCode); + pStmt.setInt(2, RowKind.DELETE.toByteValue()); + pStmt.executeUpdate(); + } + + // handle upsert row kind + final List primaryKeys = jdbcSinkConfig.getPrimaryKeys(); + StringJoiner baseColumns = new StringJoiner(", "); + for (String fieldName : tableSchema.getFieldNames()) { + baseColumns.add(dialect.quoteIdentifier(fieldName)); + } + String sourceSQL = + String.format( + "SELECT %s FROM %s WHERE %s = ? AND %s != ? ", + baseColumns, + dialect.tableIdentifier(jdbcSinkConfig.getDatabase(), tmpTable), + jdbcSinkConfig.getTempColumnBatchCode(), + jdbcSinkConfig.getTempColumnRowKind()); + Optional mergeStatement = + dialect.getMergeStatement( + sourceSQL, + jdbcSinkConfig.getDatabase(), + table, + tableSchema.getFieldNames(), + primaryKeys.toArray(new String[0]), + jdbcSinkConfig.isPrimaryKeyUpdated()); + if (!mergeStatement.isPresent()) { + throw new UnsupportedOperationException( + String.format( + "unsupported merge statement for dialect: %s", dialect.dialectName())); + } + try (PreparedStatement pStmt = connection.prepareStatement(mergeStatement.get())) { + pStmt.setString(1, batchCode); + pStmt.setInt(2, RowKind.DELETE.toByteValue()); + pStmt.executeUpdate(); + } + + // delete temp date from tmp table + String deleteTmpSQL = + String.format( + "DELETE FROM %s WHERE %s=?", + dialect.tableIdentifier(jdbcSinkConfig.getDatabase(), tmpTable), + jdbcSinkConfig.getTempColumnBatchCode()); + try (PreparedStatement pStmt = connection.prepareStatement(deleteTmpSQL)) { + pStmt.setString(1, batchCode); + pStmt.executeUpdate(); + } + } + + @Override + public void closeStatements() throws SQLException { + if (!buffer.isEmpty()) { + executeBatch(); + } + copyBatchStatementExecutor.closeStatements(); + if (tmpCopyBatchStatementExecutor != null) { + tmpCopyBatchStatementExecutor.closeStatements(); + } + if (bufferReducedBatchStatementExecutor != null) { + bufferReducedBatchStatementExecutor.closeStatements(); + } + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 25fc39005de..30f4810ea1e 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -32,10 +32,15 @@ import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; import org.apache.seatunnel.api.table.schema.SchemaChangeType; +import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.iris.IrisCatalog; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.iris.savemode.IrisSaveModeHandler; @@ -58,6 +63,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Optional; @@ -266,6 +273,16 @@ public Optional getSaveModeHandler() { config.get(JdbcOptions.CUSTOM_SQL), jdbcSinkConfig.isCreateIndex())); } + + TablePath tempTablePath = null; + CatalogTable tempCatalogTable = null; + JdbcSinkConfig.WriteMode writeMode = config.get(JdbcOptions.WRITE_MODE); + if (writeMode == JdbcSinkConfig.WriteMode.MERGE + || writeMode == JdbcSinkConfig.WriteMode.COPY_MERGE) { + tempTablePath = getTempTablePath(tablePath); + tempCatalogTable = getTempCatalogTable(tempTablePath); + } + return Optional.of( new JdbcSaveModeHandler( schemaSaveMode, @@ -273,6 +290,8 @@ public Optional getSaveModeHandler() { catalog, tablePath, catalogTable, + tempTablePath, + tempCatalogTable, config.get(JdbcOptions.CUSTOM_SQL), jdbcSinkConfig.isCreateIndex())); } catch (Exception e) { @@ -283,6 +302,53 @@ public Optional getSaveModeHandler() { return Optional.empty(); } + private CatalogTable getTempCatalogTable(TablePath tempTablePath) { + CatalogTable tempCatalogTable; + String batchCodeColumnName = config.get(JdbcOptions.TEMP_COLUMN_BATCH_CODE); + String rowKindColumnName = config.get(JdbcOptions.TEMP_COLUMN_ROW_KIND); + + List tempTableColumns = new ArrayList<>(catalogTable.getTableSchema().getColumns()); + Column stBatchCodeColumn = + new PhysicalColumn( + batchCodeColumnName, BasicType.STRING_TYPE, 63L, null, false, "", null); + Column stRowKindColumn = + new PhysicalColumn( + rowKindColumnName, BasicType.INT_TYPE, null, null, false, 0, null); + tempTableColumns.add(stBatchCodeColumn); + tempTableColumns.add(stRowKindColumn); + ConstraintKey.ConstraintKeyColumn stBatchCodeColumnKey = + ConstraintKey.ConstraintKeyColumn.of( + batchCodeColumnName, ConstraintKey.ColumnSortType.ASC); + ConstraintKey constraintKey = + ConstraintKey.of( + ConstraintKey.ConstraintType.INDEX_KEY, + "idx_" + tempTablePath.getTableName() + "_stbc", + Collections.singletonList(stBatchCodeColumnKey)); + TableSchema tempTableSchema = + new TableSchema(tempTableColumns, null, Collections.singletonList(constraintKey)); + tempCatalogTable = + CatalogTable.of( + TableIdentifier.of(catalogTable.getCatalogName(), tempTablePath), + tempTableSchema, + new HashMap<>(), + new ArrayList<>(), + null); + return tempCatalogTable; + } + + private TablePath getTempTablePath(TablePath tablePath) { + TablePath tempTablePath = null; + JdbcSinkConfig.WriteMode writeMode = config.get(JdbcOptions.WRITE_MODE); + if (writeMode == JdbcSinkConfig.WriteMode.MERGE + || writeMode == JdbcSinkConfig.WriteMode.COPY_MERGE) { + String tempTableName = jdbcSinkConfig.getTempTableName(); + tempTablePath = + TablePath.of( + tablePath.getDatabaseName(), tablePath.getSchemaName(), tempTableName); + } + return tempTablePath; + } + private Optional getCatalog() { if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) { return Optional.empty(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java index 3af53c88398..6191b8e25ed 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java @@ -71,9 +71,13 @@ import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.SCHEMA_SAVE_MODE; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.TABLE; +import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.TEMP_COLUMN_BATCH_CODE; +import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.TEMP_COLUMN_ROW_KIND; +import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.TEMP_TABLE_NAME; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.TRANSACTION_TIMEOUT_SEC; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.URL; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.USER; +import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.WRITE_MODE; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.XA_DATA_SOURCE_CLASS_NAME; @AutoService(Factory.class) @@ -269,7 +273,8 @@ public OptionRule optionRule() { SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST, PRIMARY_KEYS, COMPATIBLE_MODE, - MULTI_TABLE_SINK_REPLICA) + MULTI_TABLE_SINK_REPLICA, + WRITE_MODE) .conditional( IS_EXACTLY_ONCE, true, @@ -280,6 +285,18 @@ public OptionRule optionRule() { .conditional(GENERATE_SINK_SQL, true, DATABASE) .conditional(GENERATE_SINK_SQL, false, QUERY) .conditional(DATA_SAVE_MODE, DataSaveMode.CUSTOM_PROCESSING, CUSTOM_SQL) + .conditional( + WRITE_MODE, + JdbcSinkConfig.WriteMode.MERGE, + TEMP_TABLE_NAME, + TEMP_COLUMN_BATCH_CODE, + TEMP_COLUMN_ROW_KIND) + .conditional( + WRITE_MODE, + JdbcSinkConfig.WriteMode.COPY_MERGE, + TEMP_TABLE_NAME, + TEMP_COLUMN_BATCH_CODE, + TEMP_COLUMN_ROW_KIND) .build(); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/savemode/JdbcSaveModeHandler.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/savemode/JdbcSaveModeHandler.java index 87a2b7114db..05f8154f0cf 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/savemode/JdbcSaveModeHandler.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/savemode/JdbcSaveModeHandler.java @@ -30,21 +30,43 @@ public class JdbcSaveModeHandler extends DefaultSaveModeHandler { public boolean createIndex; + private final TablePath tempTablePath; + + private final CatalogTable tempCatalogTable; + public JdbcSaveModeHandler( SchemaSaveMode schemaSaveMode, DataSaveMode dataSaveMode, Catalog catalog, TablePath tablePath, CatalogTable catalogTable, + TablePath tempTablePath, + CatalogTable tempCatalogTable, String customSql, boolean createIndex) { super(schemaSaveMode, dataSaveMode, catalog, tablePath, catalogTable, customSql); this.createIndex = createIndex; + this.tempTablePath = tempTablePath; + this.tempCatalogTable = tempCatalogTable; } @Override protected void createTable() { - super.createTablePreCheck(); + createTablePreCheck(tablePath, catalog, catalogTable); catalog.createTable(tablePath, catalogTable, true, createIndex); } + + @Override + protected void createSchemaWhenNotExist() { + if (!tableExists()) { + createTable(); + } + if (tempTablePath != null && tempCatalogTable != null) { + if (!catalog.tableExists(tempTablePath)) { + DefaultSaveModeHandler.createTablePreCheck( + tempTablePath, catalog, tempCatalogTable); + catalog.createTable(tempTablePath, tempCatalogTable, true, true); + } + } + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/savemode/JdbcTempTableSaveModeHandler.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/savemode/JdbcTempTableSaveModeHandler.java new file mode 100644 index 00000000000..5d727a1da16 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/savemode/JdbcTempTableSaveModeHandler.java @@ -0,0 +1,73 @@ +/// * +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +// package org.apache.seatunnel.connectors.seatunnel.jdbc.sink.savemode; +// +// import org.apache.seatunnel.api.sink.DataSaveMode; +// import org.apache.seatunnel.api.sink.DefaultSaveModeHandler; +// import org.apache.seatunnel.api.sink.SaveModeHandler; +// import org.apache.seatunnel.api.sink.SchemaSaveMode; +// import org.apache.seatunnel.api.table.catalog.Catalog; +// import org.apache.seatunnel.api.table.catalog.CatalogTable; +// import org.apache.seatunnel.api.table.catalog.TablePath; +// +// import lombok.Getter; +// import lombok.extern.slf4j.Slf4j; +// +// @Slf4j +// public class JdbcTempTableSaveModeHandler extends JdbcSaveModeHandler implements SaveModeHandler +// { +// @Getter private final TablePath tempTablePath; +// +// @Getter private final CatalogTable tempCatalogTable; +// +// public JdbcTempTableSaveModeHandler( +// SchemaSaveMode schemaSaveMode, +// DataSaveMode dataSaveMode, +// Catalog catalog, +// TablePath tablePath, +// CatalogTable catalogTable, +// TablePath tempTablePath, +// CatalogTable tempCatalogTable, +// String customSql, +// boolean createIndex) { +// super( +// schemaSaveMode, +// dataSaveMode, +// catalog, +// tablePath, +// catalogTable, +// customSql, +// createIndex); +// this.tempTablePath = tempTablePath; +// this.tempCatalogTable = tempCatalogTable; +// } +// +// @Override +// protected void createSchemaWhenNotExist() { +// if (!tableExists()) { +// createTable(); +// } +// if (tempTablePath != null && tempCatalogTable != null) { +// if (!catalog.tableExists(tempTablePath)) { +// DefaultSaveModeHandler.createTablePreCheck( +// tempTablePath, catalog, tempCatalogTable); +// catalog.createTable(tempTablePath, tempCatalogTable, true, true); +// } +// } +// } +// } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java index 916f3be8ef7..c76b0183712 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java @@ -75,10 +75,11 @@ public class JdbcPostgresIT extends TestSuiteBase implements TestResource { private static final List PG_CONFIG_FILE_LIST = Lists.newArrayList( "/jdbc_postgres_source_and_sink.conf", - "/jdbc_postgres_source_and_sink_copy_stmt.conf", "/jdbc_postgres_source_and_sink_parallel.conf", "/jdbc_postgres_source_and_sink_parallel_upper_lower.conf", - "/jdbc_postgres_source_and_sink_xa.conf"); + "/jdbc_postgres_source_and_sink_xa.conf", + "/jdbc_postgres_source_and_sink_copy.conf", + "/jdbc_postgres_source_and_sink_copy_sql.conf"); private PostgreSQLContainer POSTGRESQL_CONTAINER; private static final String PG_SOURCE_DDL = "CREATE TABLE IF NOT EXISTS pg_e2e_source_table (\n" @@ -339,7 +340,8 @@ public void testAutoGenerateSQL(TestContainer container) CONFIG_FILE + " job run failed in " + container.getClass().getSimpleName() - + "."); + + ", cause: " + + execResult.getStderr()); Assertions.assertIterableEquals(querySql(SOURCE_SQL), querySql(SINK_SQL)); } finally { executeSQL("truncate table pg_e2e_sink_table"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy.conf new file mode 100644 index 00000000000..3159f656ec3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy.conf @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source{ + jdbc{ + url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF" + driver = "org.postgresql.Driver" + user = "test" + password = "test" + query ="""select gid, uuid_col, text_col, varchar_col, char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col, double_precision_col, + smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, multipoint, + multilinestring, multipolygon, geometrycollection, geog, json_col, jsonb_col,xml_col from pg_e2e_source_table""" + partition_column = "varchar_col" + partition_num = 2 + } +} + + +sink { + Jdbc { + driver = org.postgresql.Driver + url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF&stringtype=unspecified" + user = test + password = test + generate_sink_sql = true + database = test + table = public.pg_e2e_sink_table + write_mode = copy + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_sql.conf similarity index 98% rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_sql.conf index d36c4f351da..d00b1452c05 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_sql.conf @@ -44,7 +44,7 @@ sink { generate_sink_sql = true database = test table = public.pg_e2e_sink_table - use_copy_statement = true + write_mode = copy_sql primary_keys = ["gid"] } } \ No newline at end of file