Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@ public class JdbcExecutionOptions implements Serializable {
private final long batchIntervalMs;
private final int batchSize;
private final int maxRetries;
private final boolean bulkInsertEnabled;

private JdbcExecutionOptions(long batchIntervalMs, int batchSize, int maxRetries) {
private JdbcExecutionOptions(
long batchIntervalMs, int batchSize, int maxRetries, boolean bulkInsertEnabled) {
Preconditions.checkArgument(maxRetries >= 0);
this.batchIntervalMs = batchIntervalMs;
this.batchSize = batchSize;
this.maxRetries = maxRetries;
this.bulkInsertEnabled = bulkInsertEnabled;
}

public long getBatchIntervalMs() {
Expand All @@ -53,6 +56,16 @@ public int getMaxRetries() {
return maxRetries;
}

/**
* Returns whether the dialect's bulk insert optimization is enabled. The selected dialect must
* implement {@link
* org.apache.flink.connector.jdbc.core.database.dialect.JdbcBulkInsertDialect}; otherwise the
* sink fails fast at build time.
*/
public boolean isBulkInsertEnabled() {
return bulkInsertEnabled;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -64,12 +77,13 @@ public boolean equals(Object o) {
JdbcExecutionOptions that = (JdbcExecutionOptions) o;
return batchIntervalMs == that.batchIntervalMs
&& batchSize == that.batchSize
&& maxRetries == that.maxRetries;
&& maxRetries == that.maxRetries
&& bulkInsertEnabled == that.bulkInsertEnabled;
}

@Override
public int hashCode() {
return Objects.hash(batchIntervalMs, batchSize, maxRetries);
return Objects.hash(batchIntervalMs, batchSize, maxRetries, bulkInsertEnabled);
}

public static Builder builder() {
Expand All @@ -86,6 +100,7 @@ public static final class Builder {
private long intervalMs = DEFAULT_INTERVAL_MILLIS;
private int size = DEFAULT_SIZE;
private int maxRetries = DEFAULT_MAX_RETRY_TIMES;
private boolean bulkInsertEnabled = false;

public Builder withBatchSize(int size) {
this.size = size;
Expand All @@ -102,8 +117,21 @@ public Builder withMaxRetries(int maxRetries) {
return this;
}

/**
* Enable or disable the dialect's bulk insert optimization. The selected dialect must
* implement {@link
* org.apache.flink.connector.jdbc.core.database.dialect.JdbcBulkInsertDialect}; when
* enabled with a dialect that does not, the sink fails fast at build time.
*
* <p>Default is {@code false}.
*/
public Builder withBulkInsertEnabled(boolean enabled) {
this.bulkInsertEnabled = enabled;
return this;
}

public JdbcExecutionOptions build() {
return new JdbcExecutionOptions(intervalMs, size, maxRetries);
return new JdbcExecutionOptions(intervalMs, size, maxRetries, bulkInsertEnabled);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.jdbc.core.database.dialect;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;

import java.sql.Date;
import java.sql.Time;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Optional;

/**
* Capability interface for dialects that support database-specific bulk insert optimizations (e.g.,
* PostgreSQL's {@code UNNEST}). A dialect that does not implement this interface cannot be used
* with bulk insert mode; the sink will fail fast at build time rather than silently falling back.
*/
@PublicEvolving
public interface JdbcBulkInsertDialect extends JdbcDialect {

/**
* Generate a batch insert statement using the database's bulk insert optimization.
*
* @param tableName the target table name
* @param fieldNames array of field names
* @param fieldTypes array of database type names for each field
* @return Optional containing the optimized batch insert SQL, or empty if not applicable for
* the given inputs (e.g., no fields)
*/
Optional<String> getBatchInsertStatement(
String tableName, String[] fieldNames, String[] fieldTypes);

/**
* Generate a batch upsert statement using the database's bulk insert optimization with conflict
* handling.
*
* @param tableName the target table name
* @param fieldNames array of all field names
* @param fieldTypes array of database type names for each field
* @param uniqueKeyFields array of unique key field names for conflict detection
* @return Optional containing the optimized batch upsert SQL, or empty if not applicable
*/
Optional<String> getBatchUpsertStatement(
String tableName, String[] fieldNames, String[] fieldTypes, String[] uniqueKeyFields);

/**
* Database-specific type name used for array creation (e.g., {@code
* Connection.createArrayOf(typeName, values)} and SQL type casting).
*
* @param logicalType the Flink logical type
* @return the database-specific type name for array operations
* @throws UnsupportedOperationException if the type is not supported for array operations
*/
String getArrayTypeName(LogicalType logicalType);

/**
* Extract a value from {@link RowData} and convert it to a JDBC-compatible Java object suitable
* for {@link java.sql.Connection#createArrayOf(String, Object[])}. Dialects may override this
* to handle database-specific types (e.g., PostgreSQL {@code JSONB}, {@code UUID}).
*/
default Object toJdbcValue(RowData row, int pos, LogicalType type) {
if (row.isNullAt(pos)) {
return null;
}

switch (type.getTypeRoot()) {
case BOOLEAN:
return row.getBoolean(pos);
case TINYINT:
return (short) row.getByte(pos);
case SMALLINT:
return row.getShort(pos);
case INTEGER:
return row.getInt(pos);
case BIGINT:
return row.getLong(pos);
case FLOAT:
return row.getFloat(pos);
case DOUBLE:
return row.getDouble(pos);
case DECIMAL:
DecimalType decimalType = (DecimalType) type;
return row.getDecimal(pos, decimalType.getPrecision(), decimalType.getScale())
.toBigDecimal();
case CHAR:
case VARCHAR:
return row.getString(pos).toString();
case DATE:
return Date.valueOf(LocalDate.ofEpochDay(row.getInt(pos)));
case TIME_WITHOUT_TIME_ZONE:
return Time.valueOf(LocalTime.ofNanoOfDay(row.getInt(pos) * 1_000_000L));
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType tsType = (TimestampType) type;
return row.getTimestamp(pos, tsType.getPrecision()).toTimestamp();
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType lztsType = (LocalZonedTimestampType) type;
return row.getTimestamp(pos, lztsType.getPrecision()).toTimestamp();
case VARBINARY:
return row.getBinary(pos);
default:
throw new UnsupportedOperationException(
String.format(
"Type %s is not supported for bulk insert. "
+ "Please disable it by setting 'sink.bulk-insert.enabled' = 'false'.",
type));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,16 @@ public class JdbcConnectorOptions {
.defaultValue(3)
.withDescription("The max retry times if writing records to database failed.");

public static final ConfigOption<Boolean> SINK_BULK_INSERT_ENABLED =
ConfigOptions.key("sink.bulk-insert.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Enable the dialect's bulk insert optimization for batch writes (e.g., "
+ "PostgreSQL's UNNEST). The selected dialect must implement "
+ "JdbcBulkInsertDialect; otherwise the sink fails fast at build time. "
+ "Default is false.");

public static final ConfigOption<FilterHandlingPolicy> FILTER_HANDLING_POLICY =
ConfigOptions.key("filter.handling.policy")
.enumType(FilterHandlingPolicy.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_UPPER_BOUND;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SINK_BULK_INSERT_ENABLED;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SINK_MAX_RETRIES;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SINK_PARALLELISM;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.TABLE_NAME;
Expand Down Expand Up @@ -185,6 +186,7 @@ private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) {
builder.withBatchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.withBatchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
builder.withMaxRetries(config.get(SINK_MAX_RETRIES));
builder.withBulkInsertEnabled(config.get(SINK_BULK_INSERT_ENABLED));
return builder.build();
}

Expand Down Expand Up @@ -258,6 +260,7 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS);
optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);
optionalOptions.add(SINK_MAX_RETRIES);
optionalOptions.add(SINK_BULK_INSERT_ENABLED);
optionalOptions.add(SINK_PARALLELISM);
optionalOptions.add(MAX_RETRY_TIMEOUT);
optionalOptions.add(LookupOptions.CACHE_TYPE);
Expand Down
Loading