diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.java index c19677b07..76882c66f 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.java @@ -33,12 +33,15 @@ public class JdbcExecutionOptions implements Serializable { private final long batchIntervalMs; private final int batchSize; private final int maxRetries; + private final boolean bulkInsertEnabled; - private JdbcExecutionOptions(long batchIntervalMs, int batchSize, int maxRetries) { + private JdbcExecutionOptions( + long batchIntervalMs, int batchSize, int maxRetries, boolean bulkInsertEnabled) { Preconditions.checkArgument(maxRetries >= 0); this.batchIntervalMs = batchIntervalMs; this.batchSize = batchSize; this.maxRetries = maxRetries; + this.bulkInsertEnabled = bulkInsertEnabled; } public long getBatchIntervalMs() { @@ -53,6 +56,16 @@ public int getMaxRetries() { return maxRetries; } + /** + * Returns whether the dialect's bulk insert optimization is enabled. The selected dialect must + * implement {@link + * org.apache.flink.connector.jdbc.core.database.dialect.JdbcBulkInsertDialect}; otherwise the + * sink fails fast at build time. + */ + public boolean isBulkInsertEnabled() { + return bulkInsertEnabled; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -64,12 +77,13 @@ public boolean equals(Object o) { JdbcExecutionOptions that = (JdbcExecutionOptions) o; return batchIntervalMs == that.batchIntervalMs && batchSize == that.batchSize - && maxRetries == that.maxRetries; + && maxRetries == that.maxRetries + && bulkInsertEnabled == that.bulkInsertEnabled; } @Override public int hashCode() { - return Objects.hash(batchIntervalMs, batchSize, maxRetries); + return Objects.hash(batchIntervalMs, batchSize, maxRetries, bulkInsertEnabled); } public static Builder builder() { @@ -86,6 +100,7 @@ public static final class Builder { private long intervalMs = DEFAULT_INTERVAL_MILLIS; private int size = DEFAULT_SIZE; private int maxRetries = DEFAULT_MAX_RETRY_TIMES; + private boolean bulkInsertEnabled = false; public Builder withBatchSize(int size) { this.size = size; @@ -102,8 +117,21 @@ public Builder withMaxRetries(int maxRetries) { return this; } + /** + * Enable or disable the dialect's bulk insert optimization. The selected dialect must + * implement {@link + * org.apache.flink.connector.jdbc.core.database.dialect.JdbcBulkInsertDialect}; when + * enabled with a dialect that does not, the sink fails fast at build time. + * + *

Default is {@code false}. + */ + public Builder withBulkInsertEnabled(boolean enabled) { + this.bulkInsertEnabled = enabled; + return this; + } + public JdbcExecutionOptions build() { - return new JdbcExecutionOptions(intervalMs, size, maxRetries); + return new JdbcExecutionOptions(intervalMs, size, maxRetries, bulkInsertEnabled); } } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcBulkInsertDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcBulkInsertDialect.java new file mode 100644 index 000000000..b122a1a2d --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcBulkInsertDialect.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.core.database.dialect; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.TimestampType; + +import java.sql.Date; +import java.sql.Time; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.Optional; + +/** + * Capability interface for dialects that support database-specific bulk insert optimizations (e.g., + * PostgreSQL's {@code UNNEST}). A dialect that does not implement this interface cannot be used + * with bulk insert mode; the sink will fail fast at build time rather than silently falling back. + */ +@PublicEvolving +public interface JdbcBulkInsertDialect extends JdbcDialect { + + /** + * Generate a batch insert statement using the database's bulk insert optimization. + * + * @param tableName the target table name + * @param fieldNames array of field names + * @param fieldTypes array of database type names for each field + * @return Optional containing the optimized batch insert SQL, or empty if not applicable for + * the given inputs (e.g., no fields) + */ + Optional getBatchInsertStatement( + String tableName, String[] fieldNames, String[] fieldTypes); + + /** + * Generate a batch upsert statement using the database's bulk insert optimization with conflict + * handling. + * + * @param tableName the target table name + * @param fieldNames array of all field names + * @param fieldTypes array of database type names for each field + * @param uniqueKeyFields array of unique key field names for conflict detection + * @return Optional containing the optimized batch upsert SQL, or empty if not applicable + */ + Optional getBatchUpsertStatement( + String tableName, String[] fieldNames, String[] fieldTypes, String[] uniqueKeyFields); + + /** + * Database-specific type name used for array creation (e.g., {@code + * Connection.createArrayOf(typeName, values)} and SQL type casting). + * + * @param logicalType the Flink logical type + * @return the database-specific type name for array operations + * @throws UnsupportedOperationException if the type is not supported for array operations + */ + String getArrayTypeName(LogicalType logicalType); + + /** + * Extract a value from {@link RowData} and convert it to a JDBC-compatible Java object suitable + * for {@link java.sql.Connection#createArrayOf(String, Object[])}. Dialects may override this + * to handle database-specific types (e.g., PostgreSQL {@code JSONB}, {@code UUID}). + */ + default Object toJdbcValue(RowData row, int pos, LogicalType type) { + if (row.isNullAt(pos)) { + return null; + } + + switch (type.getTypeRoot()) { + case BOOLEAN: + return row.getBoolean(pos); + case TINYINT: + return (short) row.getByte(pos); + case SMALLINT: + return row.getShort(pos); + case INTEGER: + return row.getInt(pos); + case BIGINT: + return row.getLong(pos); + case FLOAT: + return row.getFloat(pos); + case DOUBLE: + return row.getDouble(pos); + case DECIMAL: + DecimalType decimalType = (DecimalType) type; + return row.getDecimal(pos, decimalType.getPrecision(), decimalType.getScale()) + .toBigDecimal(); + case CHAR: + case VARCHAR: + return row.getString(pos).toString(); + case DATE: + return Date.valueOf(LocalDate.ofEpochDay(row.getInt(pos))); + case TIME_WITHOUT_TIME_ZONE: + return Time.valueOf(LocalTime.ofNanoOfDay(row.getInt(pos) * 1_000_000L)); + case TIMESTAMP_WITHOUT_TIME_ZONE: + TimestampType tsType = (TimestampType) type; + return row.getTimestamp(pos, tsType.getPrecision()).toTimestamp(); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + LocalZonedTimestampType lztsType = (LocalZonedTimestampType) type; + return row.getTimestamp(pos, lztsType.getPrecision()).toTimestamp(); + case VARBINARY: + return row.getBinary(pos); + default: + throw new UnsupportedOperationException( + String.format( + "Type %s is not supported for bulk insert. " + + "Please disable it by setting 'sink.bulk-insert.enabled' = 'false'.", + type)); + } + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java index 37d5362ce..b4185d307 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java @@ -179,6 +179,16 @@ public class JdbcConnectorOptions { .defaultValue(3) .withDescription("The max retry times if writing records to database failed."); + public static final ConfigOption SINK_BULK_INSERT_ENABLED = + ConfigOptions.key("sink.bulk-insert.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Enable the dialect's bulk insert optimization for batch writes (e.g., " + + "PostgreSQL's UNNEST). The selected dialect must implement " + + "JdbcBulkInsertDialect; otherwise the sink fails fast at build time. " + + "Default is false."); + public static final ConfigOption FILTER_HANDLING_POLICY = ConfigOptions.key("filter.handling.policy") .enumType(FilterHandlingPolicy.class) diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java index 8c82f455a..746358280 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java @@ -69,6 +69,7 @@ import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_UPPER_BOUND; import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; +import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SINK_BULK_INSERT_ENABLED; import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SINK_MAX_RETRIES; import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SINK_PARALLELISM; import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.TABLE_NAME; @@ -185,6 +186,7 @@ private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) { builder.withBatchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS)); builder.withBatchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis()); builder.withMaxRetries(config.get(SINK_MAX_RETRIES)); + builder.withBulkInsertEnabled(config.get(SINK_BULK_INSERT_ENABLED)); return builder.build(); } @@ -258,6 +260,7 @@ public Set> optionalOptions() { optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS); optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL); optionalOptions.add(SINK_MAX_RETRIES); + optionalOptions.add(SINK_BULK_INSERT_ENABLED); optionalOptions.add(SINK_PARALLELISM); optionalOptions.add(MAX_RETRY_TIMEOUT); optionalOptions.add(LookupOptions.CACHE_TYPE); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatBuilder.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatBuilder.java index 493cc8d22..01e8c15e0 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatBuilder.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatBuilder.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.jdbc.core.table.sink; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.core.database.dialect.JdbcBulkInsertDialect; import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; @@ -28,6 +29,7 @@ import org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor; import org.apache.flink.connector.jdbc.internal.executor.TableInsertOrUpdateStatementExecutor; import org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor; +import org.apache.flink.connector.jdbc.internal.executor.TableUnnestStatementExecutor; import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement; @@ -37,8 +39,12 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Serializable; import java.util.Arrays; +import java.util.Optional; import java.util.function.Function; import static org.apache.flink.table.data.RowData.createFieldGetter; @@ -49,6 +55,7 @@ public class JdbcOutputFormatBuilder implements Serializable { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(JdbcOutputFormatBuilder.class); private InternalJdbcConnectionOptions jdbcOptions; private JdbcExecutionOptions executionOptions; @@ -86,19 +93,15 @@ public JdbcOutputFormatBuilder setFieldDataTypes(DataType[] fieldDataTypes) { Arrays.stream(fieldDataTypes) .map(DataType::getLogicalType) .toArray(LogicalType[]::new); + + boolean useBulkInsert = shouldUseBulkInsert(executionOptions, dmlOptions.getDialect()); + if (dmlOptions.getKeyFields().isPresent() && dmlOptions.getKeyFields().get().length > 0) { - // upsert query return new JdbcOutputFormat<>( new SimpleJdbcConnectionProvider(jdbcOptions), executionOptions, - () -> createBufferReduceExecutor(dmlOptions, logicalTypes)); + () -> createBufferReduceExecutor(dmlOptions, logicalTypes, useBulkInsert)); } else { - // append only query - final String sql = - dmlOptions - .getDialect() - .getInsertIntoStatement( - dmlOptions.getTableName(), dmlOptions.getFieldNames()); return new JdbcOutputFormat<>( new SimpleJdbcConnectionProvider(jdbcOptions), executionOptions, @@ -107,12 +110,29 @@ public JdbcOutputFormatBuilder setFieldDataTypes(DataType[] fieldDataTypes) { dmlOptions.getDialect(), dmlOptions.getFieldNames(), logicalTypes, - sql)); + dmlOptions.getTableName(), + useBulkInsert)); + } + } + + private static boolean shouldUseBulkInsert( + JdbcExecutionOptions executionOptions, JdbcDialect dialect) { + if (!executionOptions.isBulkInsertEnabled() || executionOptions.getBatchSize() <= 1) { + return false; } + if (!(dialect instanceof JdbcBulkInsertDialect)) { + throw new IllegalStateException( + String.format( + "Bulk insert is enabled but dialect '%s' does not implement " + + "JdbcBulkInsertDialect. Either switch to a dialect that supports it " + + "or set 'sink.bulk-insert.enabled' = 'false'.", + dialect.dialectName())); + } + return true; } private static JdbcBatchStatementExecutor createBufferReduceExecutor( - JdbcDmlOptions opt, LogicalType[] fieldTypes) { + JdbcDmlOptions opt, LogicalType[] fieldTypes, boolean useBulkInsert) { checkArgument(opt.getKeyFields().isPresent()); JdbcDialect dialect = opt.getDialect(); String tableName = opt.getTableName(); @@ -132,16 +152,21 @@ private static JdbcBatchStatementExecutor createBufferReduceExecutor( fieldTypes, pkFields, pkNames, - pkTypes), + pkTypes, + useBulkInsert), createDeleteExecutor(dialect, tableName, pkNames, pkTypes), createRowKeyExtractor(fieldTypes, pkFields)); } private static JdbcBatchStatementExecutor createSimpleBufferedExecutor( - JdbcDialect dialect, String[] fieldNames, LogicalType[] fieldTypes, String sql) { + JdbcDialect dialect, + String[] fieldNames, + LogicalType[] fieldTypes, + String tableName, + boolean useBulkInsert) { return new TableBufferedStatementExecutor( - createSimpleRowExecutor(dialect, fieldNames, fieldTypes, sql)); + createSimpleRowExecutor(dialect, fieldNames, fieldTypes, tableName, useBulkInsert)); } private static JdbcBatchStatementExecutor createUpsertRowExecutor( @@ -151,7 +176,28 @@ private static JdbcBatchStatementExecutor createUpsertRowExecutor( LogicalType[] fieldTypes, int[] pkFields, String[] pkNames, - LogicalType[] pkTypes) { + LogicalType[] pkTypes, + boolean useBulkInsert) { + + if (useBulkInsert) { + JdbcBulkInsertDialect bulkDialect = (JdbcBulkInsertDialect) dialect; + String[] fieldTypeNames = getFieldTypeNames(fieldTypes, bulkDialect); + Optional bulkSql = + bulkDialect.getBatchUpsertStatement( + tableName, fieldNames, fieldTypeNames, pkNames); + + if (bulkSql.isPresent()) { + return new TableUnnestStatementExecutor( + bulkSql.get(), RowType.of(fieldTypes), bulkDialect); + } else { + throw new IllegalStateException( + String.format( + "Bulk insert is enabled but dialect '%s' returned no upsert statement " + + "for the given inputs.", + dialect.dialectName())); + } + } + return dialect.getUpsertStatement(tableName, fieldNames, pkNames) .map(sql -> createSimpleRowExecutor(dialect, fieldNames, fieldTypes, sql)) .orElseGet( @@ -166,6 +212,35 @@ private static JdbcBatchStatementExecutor createUpsertRowExecutor( pkTypes)); } + private static JdbcBatchStatementExecutor createSimpleRowExecutor( + JdbcDialect dialect, + String[] fieldNames, + LogicalType[] fieldTypes, + String tableName, + boolean useBulkInsert) { + + if (useBulkInsert) { + JdbcBulkInsertDialect bulkDialect = (JdbcBulkInsertDialect) dialect; + String[] fieldTypeNames = getFieldTypeNames(fieldTypes, bulkDialect); + Optional bulkSql = + bulkDialect.getBatchInsertStatement(tableName, fieldNames, fieldTypeNames); + + if (bulkSql.isPresent()) { + return new TableUnnestStatementExecutor( + bulkSql.get(), RowType.of(fieldTypes), bulkDialect); + } else { + throw new IllegalStateException( + String.format( + "Bulk insert is enabled but dialect '%s' returned no insert statement " + + "for the given inputs.", + dialect.dialectName())); + } + } + + final String sql = dialect.getInsertIntoStatement(tableName, fieldNames); + return createSimpleRowExecutor(dialect, fieldNames, fieldTypes, sql); + } + private static JdbcBatchStatementExecutor createDeleteExecutor( JdbcDialect dialect, String tableName, String[] pkNames, LogicalType[] pkTypes) { String deleteSql = dialect.getDeleteStatement(tableName, pkNames); @@ -181,6 +256,15 @@ private static JdbcBatchStatementExecutor createSimpleRowExecutor( rowConverter); } + private static String[] getFieldTypeNames( + LogicalType[] fieldTypes, JdbcBulkInsertDialect dialect) { + String[] typeNames = new String[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + typeNames[i] = dialect.getArrayTypeName(fieldTypes[i]); + } + return typeNames; + } + private static JdbcBatchStatementExecutor createInsertOrUpdateExecutor( JdbcDialect dialect, String tableName, diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableUnnestStatementExecutor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableUnnestStatementExecutor.java new file mode 100644 index 000000000..a9eefdb33 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableUnnestStatementExecutor.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.internal.executor; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.JdbcBulkInsertDialect; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Array; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Table API specific bulk-insert batch statement executor. + * + *

Uses a {@link JdbcBulkInsertDialect} to generate the SQL and to convert column values into + * JDBC-compatible objects, which are then bound as SQL arrays (e.g., PostgreSQL's {@code UNNEST}). + * Compatible with both INSERT and UPSERT modes. + */ +@Internal +public class TableUnnestStatementExecutor implements JdbcBatchStatementExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(TableUnnestStatementExecutor.class); + + private final String sql; + private final RowType rowType; + private final JdbcBulkInsertDialect dialect; + + private final List batch; + private transient PreparedStatement statement; + + public TableUnnestStatementExecutor( + String sql, RowType rowType, JdbcBulkInsertDialect dialect) { + this.sql = checkNotNull(sql); + this.rowType = checkNotNull(rowType); + this.dialect = checkNotNull(dialect); + this.batch = new ArrayList<>(); + } + + @Override + public void prepareStatements(Connection connection) throws SQLException { + this.statement = connection.prepareStatement(sql); + } + + @Override + public void addToBatch(RowData record) { + List fieldTypes = rowType.getChildren(); + Object[] values = new Object[fieldTypes.size()]; + + for (int i = 0; i < fieldTypes.size(); i++) { + values[i] = dialect.toJdbcValue(record, i, fieldTypes.get(i)); + } + + batch.add(values); + } + + @Override + public void executeBatch() throws SQLException { + if (batch.isEmpty()) { + return; + } + + try { + List arrays = bindArrays(); + try { + int updateCount = statement.executeUpdate(); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Bulk insert batch affected {} rows for {} records", + updateCount, + batch.size()); + } + } finally { + for (Array array : arrays) { + array.free(); + } + } + } finally { + batch.clear(); + } + } + + private List bindArrays() throws SQLException { + Connection conn = statement.getConnection(); + List fieldTypes = rowType.getChildren(); + int fieldCount = fieldTypes.size(); + List arrays = new ArrayList<>(fieldCount); + + for (int fieldIndex = 0; fieldIndex < fieldCount; fieldIndex++) { + Object[] columnValues = new Object[batch.size()]; + for (int rowIndex = 0; rowIndex < batch.size(); rowIndex++) { + columnValues[rowIndex] = batch.get(rowIndex)[fieldIndex]; + } + + LogicalType fieldType = fieldTypes.get(fieldIndex); + String arrayTypeName = dialect.getArrayTypeName(fieldType); + + Array sqlArray = conn.createArrayOf(arrayTypeName, columnValues); + arrays.add(sqlArray); + statement.setArray(fieldIndex + 1, sqlArray); + + if (LOG.isTraceEnabled()) { + LOG.trace( + "Bound array for field {} (type {}) with {} values", + fieldIndex, + arrayTypeName, + columnValues.length); + } + } + + return arrays; + } + + @Override + public String insertSql() { + return sql; + } + + @Override + public void closeStatements() throws SQLException { + if (statement != null) { + statement.close(); + statement = null; + } + } +} diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java index af549ec6d..f9a7480ff 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java @@ -595,4 +595,36 @@ void clearOutputTable() throws Exception { stat.execute("DELETE FROM " + OUTPUT_TABLE); } } + + @Test + void testBulkInsertEnabledOnNonCapableDialectFailsFast() { + InternalJdbcConnectionOptions jdbcOptions = + InternalJdbcConnectionOptions.builder() + .setDriverName(getMetadata().getDriverClass()) + .setDBUrl(getMetadata().getJdbcUrl()) + .setTableName(OUTPUT_TABLE) + .build(); + JdbcDmlOptions dmlOptions = + JdbcDmlOptions.builder() + .withTableName(jdbcOptions.getTableName()) + .withDialect(jdbcOptions.getDialect()) + .withFieldNames(fieldNames) + .build(); + + assertThatThrownBy( + () -> + new JdbcOutputFormatBuilder() + .setJdbcOptions(jdbcOptions) + .setFieldDataTypes(fieldDataTypes) + .setJdbcDmlOptions(dmlOptions) + .setJdbcExecutionOptions( + JdbcExecutionOptions.builder() + .withBatchSize(10) + .withBulkInsertEnabled(true) + .build()) + .build()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Bulk insert is enabled") + .hasMessageContaining("JdbcBulkInsertDialect"); + } } diff --git a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialect.java b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialect.java index f42ba4807..f95f939ac 100644 --- a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialect.java +++ b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialect.java @@ -20,19 +20,22 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.database.dialect.JdbcBulkInsertDialect; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import java.util.Arrays; import java.util.EnumSet; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; /** JDBC dialect for PostgreSQL. */ @Internal -public class PostgresDialect extends AbstractDialect { +public class PostgresDialect extends AbstractDialect implements JdbcBulkInsertDialect { private static final long serialVersionUID = 1L; @@ -134,4 +137,158 @@ public Set supportedTypes() { LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, LogicalTypeRoot.ARRAY); } + + /** + * Generate UNNEST-based batch insert statement for PostgreSQL. + * + *

Example: {@code INSERT INTO users (id, name) SELECT * FROM UNNEST(?::INTEGER[], + * ?::VARCHAR[]) AS t(id, name)}. + */ + @Override + public Optional getBatchInsertStatement( + String tableName, String[] fieldNames, String[] fieldTypes) { + + if (fieldNames.length == 0) { + return Optional.empty(); + } + + StringBuilder sql = new StringBuilder(); + sql.append("INSERT INTO ").append(quoteIdentifier(tableName)); + + sql.append(" ("); + for (int i = 0; i < fieldNames.length; i++) { + if (i > 0) { + sql.append(", "); + } + sql.append(quoteIdentifier(fieldNames[i])); + } + sql.append(")"); + + sql.append(" SELECT * FROM UNNEST("); + for (int i = 0; i < fieldNames.length; i++) { + if (i > 0) { + sql.append(", "); + } + sql.append("?::").append(extractBaseType(fieldTypes[i])).append("[]"); + } + + sql.append(") AS t("); + for (int i = 0; i < fieldNames.length; i++) { + if (i > 0) { + sql.append(", "); + } + sql.append(quoteIdentifier(fieldNames[i])); + } + sql.append(")"); + + return Optional.of(sql.toString()); + } + + /** + * Generate UNNEST-based batch upsert statement for PostgreSQL, extending the batch insert with + * {@code ON CONFLICT ... DO UPDATE SET} (or {@code DO NOTHING} when all fields are keys). + */ + @Override + public Optional getBatchUpsertStatement( + String tableName, String[] fieldNames, String[] fieldTypes, String[] uniqueKeyFields) { + + Optional batchInsert = getBatchInsertStatement(tableName, fieldNames, fieldTypes); + + if (!batchInsert.isPresent()) { + return Optional.empty(); + } + + StringBuilder sql = new StringBuilder(batchInsert.get()); + + sql.append(" ON CONFLICT ("); + for (int i = 0; i < uniqueKeyFields.length; i++) { + if (i > 0) { + sql.append(", "); + } + sql.append(quoteIdentifier(uniqueKeyFields[i])); + } + + Set keySet = new HashSet<>(Arrays.asList(uniqueKeyFields)); + List nonKeyFields = + Arrays.stream(fieldNames) + .filter(f -> !keySet.contains(f)) + .collect(Collectors.toList()); + + if (nonKeyFields.isEmpty()) { + sql.append(") DO NOTHING"); + } else { + sql.append(") DO UPDATE SET "); + for (int i = 0; i < nonKeyFields.size(); i++) { + if (i > 0) { + sql.append(", "); + } + String field = quoteIdentifier(nonKeyFields.get(i)); + sql.append(field).append("=EXCLUDED.").append(field); + } + } + + return Optional.of(sql.toString()); + } + + /** + * Strip array brackets and precision/scale modifiers from a SQL type name (e.g. {@code + * VARCHAR(255)[] -> VARCHAR}). Used for {@code createArrayOf()} and SQL casts. + */ + private String extractBaseType(String typeName) { + typeName = typeName.replaceAll("\\[\\]", "").trim(); + + int parenIndex = typeName.indexOf('('); + if (parenIndex > 0) { + typeName = typeName.substring(0, parenIndex).trim(); + } + + return typeName; + } + + /** + * PostgreSQL type name used by {@link java.sql.Connection#createArrayOf} and {@code + * ?::typename[]} casts. + */ + @Override + public String getArrayTypeName(LogicalType logicalType) { + switch (logicalType.getTypeRoot()) { + case BOOLEAN: + return "BOOLEAN"; + case TINYINT: + case SMALLINT: + return "SMALLINT"; + case INTEGER: + return "INTEGER"; + case BIGINT: + return "BIGINT"; + case FLOAT: + return "REAL"; + case DOUBLE: + return "DOUBLE PRECISION"; + case DECIMAL: + return "NUMERIC"; + case CHAR: + case VARCHAR: + return "VARCHAR"; + case DATE: + return "DATE"; + case TIME_WITHOUT_TIME_ZONE: + return "TIME"; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return "TIMESTAMP"; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return "TIMESTAMPTZ"; + case VARBINARY: + return "BYTEA"; + default: + throw new UnsupportedOperationException( + String.format( + "Type %s is not supported for bulk insert. " + + "Supported types are: BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, " + + "FLOAT, DOUBLE, DECIMAL, CHAR, VARCHAR, DATE, TIME, TIMESTAMP, " + + "TIMESTAMP_LTZ, VARBINARY. " + + "Please disable it by setting 'sink.bulk-insert.enabled' = 'false'.", + logicalType)); + } + } } diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java index 0ad1fb55f..715e97240 100644 --- a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java +++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; @@ -82,4 +83,158 @@ void testUpsertStatement() { .isEqualTo( "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__) ON CONFLICT (id, name, email, ts, field1, field_2, __field_3__) DO NOTHING"); } + + @Test + void testBatchInsertStatementWithUnnest() { + PostgresDialect dialect = new PostgresDialect(); + final String tableName = "users"; + final String[] fieldNames = {"id", "name", "age"}; + final String[] fieldTypes = {"INTEGER", "VARCHAR", "INTEGER"}; + + Optional result = + dialect.getBatchInsertStatement(tableName, fieldNames, fieldTypes); + + assertThat(result).isPresent(); + String sql = result.get(); + + // Verify SQL structure + assertThat(sql).contains("INSERT INTO users"); + assertThat(sql).contains("(id, name, age)"); + assertThat(sql).contains("SELECT * FROM UNNEST"); + assertThat(sql).contains("?::INTEGER[]"); + assertThat(sql).contains("?::VARCHAR[]"); + assertThat(sql).contains("AS t(id, name, age)"); + + // Verify exact format + assertThat(sql) + .isEqualTo( + "INSERT INTO users (id, name, age) SELECT * FROM UNNEST(?::INTEGER[], ?::VARCHAR[], ?::INTEGER[]) AS t(id, name, age)"); + } + + @Test + void testBatchInsertStatementEmptyFields() { + PostgresDialect dialect = new PostgresDialect(); + final String tableName = "users"; + final String[] fieldNames = {}; + final String[] fieldTypes = {}; + + // Should return empty when no fields + Optional result = + dialect.getBatchInsertStatement(tableName, fieldNames, fieldTypes); + assertThat(result).isEmpty(); + } + + @Test + void testBatchUpsertStatementWithUnnest() { + PostgresDialect dialect = new PostgresDialect(); + final String tableName = "users"; + final String[] fieldNames = {"id", "name", "age"}; + final String[] fieldTypes = {"INTEGER", "VARCHAR", "INTEGER"}; + final String[] uniqueKeyFields = {"id"}; + + Optional result = + dialect.getBatchUpsertStatement(tableName, fieldNames, fieldTypes, uniqueKeyFields); + + assertThat(result).isPresent(); + String sql = result.get(); + + // Verify SQL structure + assertThat(sql).contains("INSERT INTO users"); + assertThat(sql).contains("SELECT * FROM UNNEST"); + assertThat(sql).contains("ON CONFLICT (id)"); + assertThat(sql).contains("DO UPDATE SET"); + assertThat(sql).contains("name=EXCLUDED.name"); + assertThat(sql).contains("age=EXCLUDED.age"); + + // Should not update key field + assertThat(sql).doesNotContain("id=EXCLUDED.id"); + } + + @Test + void testBatchUpsertStatementDoNothing() { + PostgresDialect dialect = new PostgresDialect(); + final String tableName = "users"; + final String[] fieldNames = {"id"}; + final String[] fieldTypes = {"INTEGER"}; + final String[] uniqueKeyFields = {"id"}; + + Optional result = + dialect.getBatchUpsertStatement(tableName, fieldNames, fieldTypes, uniqueKeyFields); + + assertThat(result).isPresent(); + String sql = result.get(); + + // When all fields are keys, should DO NOTHING + assertThat(sql).contains("ON CONFLICT (id) DO NOTHING"); + assertThat(sql).doesNotContain("DO UPDATE SET"); + } + + @Test + void testExtractBaseType() { + PostgresDialect dialect = new PostgresDialect(); + + // Test through getBatchInsertStatement which uses extractBaseType internally + Optional result; + + // VARCHAR(255) should become VARCHAR + result = + dialect.getBatchInsertStatement( + "t", new String[] {"col1"}, new String[] {"VARCHAR(255)"}); + assertThat(result).isPresent(); + assertThat(result.get()).contains("?::VARCHAR[]"); + + // NUMERIC(10,2) should become NUMERIC + result = + dialect.getBatchInsertStatement( + "t", new String[] {"col1"}, new String[] {"NUMERIC(10,2)"}); + assertThat(result).isPresent(); + assertThat(result.get()).contains("?::NUMERIC[]"); + + // TEXT[] should become TEXT + result = + dialect.getBatchInsertStatement( + "t", new String[] {"col1"}, new String[] {"TEXT[]"}); + assertThat(result).isPresent(); + assertThat(result.get()).contains("?::TEXT[]"); + } + + @Test + void testBatchStatementQueryPlanStability() { + PostgresDialect dialect = new PostgresDialect(); + final String tableName = "users"; + final String[] fieldNames = {"id", "name"}; + final String[] fieldTypes = {"INTEGER", "VARCHAR"}; + + // Get SQL multiple times (simulating different batch sizes at runtime) + Optional sql1 = dialect.getBatchInsertStatement(tableName, fieldNames, fieldTypes); + Optional sql2 = dialect.getBatchInsertStatement(tableName, fieldNames, fieldTypes); + Optional sql3 = dialect.getBatchInsertStatement(tableName, fieldNames, fieldTypes); + + // All should be present and IDENTICAL + assertThat(sql1).isPresent(); + assertThat(sql2).isPresent(); + assertThat(sql3).isPresent(); + assertThat(sql1.get()).isEqualTo(sql2.get()); + assertThat(sql2.get()).isEqualTo(sql3.get()); + + // This is the key benefit - same SQL string = single query plan in pg_stat_statements + // regardless of the number of rows in the batch (determined at runtime by array size) + } + + @Test + void testGetArrayTypeName() { + PostgresDialect dialect = new PostgresDialect(); + + // Test common types + assertThat(dialect.getArrayTypeName(new org.apache.flink.table.types.logical.BooleanType())) + .isEqualTo("BOOLEAN"); + assertThat(dialect.getArrayTypeName(new org.apache.flink.table.types.logical.IntType())) + .isEqualTo("INTEGER"); + assertThat(dialect.getArrayTypeName(new org.apache.flink.table.types.logical.BigIntType())) + .isEqualTo("BIGINT"); + assertThat(dialect.getArrayTypeName(new org.apache.flink.table.types.logical.VarCharType())) + .isEqualTo("VARCHAR"); + assertThat(dialect.getArrayTypeName(new org.apache.flink.table.types.logical.DoubleType())) + .isEqualTo("DOUBLE PRECISION"); + } } diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSinkITCase.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSinkITCase.java index 040c883eb..5c8ac48bb 100644 --- a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSinkITCase.java +++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSinkITCase.java @@ -21,7 +21,71 @@ import org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase; import org.apache.flink.connector.jdbc.postgres.PostgresTestBase; import org.apache.flink.connector.jdbc.postgres.database.dialect.PostgresDialect; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; -/** The Table Sink ITCase for {@link PostgresDialect}. */ +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * The Table Sink ITCase for {@link PostgresDialect}. + * + *

This test class inherits all parent tests which validate standard JDBC batching behavior. + * Additional tests are provided to specifically validate PostgreSQL UNNEST optimization. + */ class PostgresDynamicTableSinkITCase extends JdbcDynamicTableSinkITCase - implements PostgresTestBase {} + implements PostgresTestBase { + + /** + * Test UPSERT operations with UNNEST enabled. This validates that the UNNEST optimization + * produces correct results. Standard batching is already validated by the inherited parent + * test. + */ + @Test + void testUpsertWithUnnest() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String viewName = "testData"; + tEnv.createTemporaryView( + viewName, tEnv.fromValues(testData()).as("id", "num", "text", "ts")); + + String tableName = "upsertSinkUnnest"; + List options = + Arrays.asList( + "'sink.buffer-flush.max-rows' = '2'", + "'sink.buffer-flush.interval' = '0'", + "'sink.max-retries' = '0'", + "'sink.bulk-insert.enabled' = 'true'"); + + tEnv.executeSql( + upsertOutputTable.getCreateQueryForFlink(getMetadata(), tableName, options)); + + tEnv.executeSql( + String.format( + "INSERT INTO %s " + + " SELECT cnt, COUNT(len) AS lencnt, cTag, MAX(ts) AS ts " + + " FROM ( " + + " SELECT len, COUNT(id) as cnt, cTag, MAX(ts) AS ts " + + " FROM (SELECT id, CHAR_LENGTH(text) AS len, (CASE WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM %s) " + + " GROUP BY len, cTag " + + " ) " + + " GROUP BY cnt, cTag", + tableName, viewName)) + .await(); + + Map mapTestData = testDataMap(); + assertThat(upsertOutputTable.selectAllTable(getMetadata())) + .containsExactlyInAnyOrder( + Row.of(1L, 5L, 1, mapTestData.get(6).getField(3)), + Row.of(7L, 1L, 1, mapTestData.get(21).getField(3)), + Row.of(9L, 1L, 1, mapTestData.get(15).getField(3))); + } +}