Skip to content

[Feature][Jdbc] Add String type column split Support by charset-based splitting algorithm #9002

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
79ea2a6
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 18, 2025
e77b661
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 18, 2025
336c4d3
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 20, 2025
97698b0
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 20, 2025
c8e6f04
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 21, 2025
f245da8
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 21, 2025
cc32cad
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 21, 2025
b78bdbd
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 24, 2025
e7c438b
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 24, 2025
a6de31e
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 24, 2025
9cb25b8
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 24, 2025
d5ed2a5
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 24, 2025
54ed6d9
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 24, 2025
35b42a4
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 24, 2025
0eeec9f
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 24, 2025
ba9ecf1
[Feature][Jdbc] Add String type column split Support by charset-based…
Mar 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions docs/en/connector-v2/source/Jdbc.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.StringSplitMode;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -171,14 +171,14 @@ public interface JdbcOptions {
.noDefaultValue()
.withDescription("partition column");

Option<BigDecimal> PARTITION_UPPER_BOUND =
Option<String> PARTITION_UPPER_BOUND =
Options.key("partition_upper_bound")
.bigDecimalType()
.stringType()
.noDefaultValue()
.withDescription("partition upper bound");
Option<BigDecimal> PARTITION_LOWER_BOUND =
Option<String> PARTITION_LOWER_BOUND =
Options.key("partition_lower_bound")
.bigDecimalType()
.stringType()
.noDefaultValue()
.withDescription("partition lower bound");
Option<Integer> PARTITION_NUM =
Expand Down Expand Up @@ -225,4 +225,18 @@ public interface JdbcOptions {
.mapType()
.noDefaultValue()
.withDescription("additional connection configuration parameters");

Option<StringSplitMode> STRING_SPLIT_MODE =
Options.key("split.string_split_mode")
.enumType(StringSplitMode.class)
.defaultValue(StringSplitMode.SAMPLE)
.withDescription(
"Supports different string splitting algorithms. By default, `sample` is used to determine the split by sampling the string value. You can switch to `charset_based` to enable charset-based string splitting algorithm. When set to `charset_based`, the algorithm assumes characters of partition_column are within ASCII range 32-126, which covers most character-based splitting scenarios.");

Option<String> STRING_SPLIT_MODE_COLLATE =
Options.key("split.string_split_mode_collate")
.stringType()
.noDefaultValue()
.withDescription(
"Specifies the collation to use when string_split_mode is set to `charset_based` and the table has a special collation. If not specified, the database's default collation will be used.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.StringSplitMode;

import lombok.Builder;
import lombok.Data;
Expand All @@ -44,6 +45,10 @@ public class JdbcSourceConfig implements Serializable {
private int splitInverseSamplingRate;
private boolean decimalTypeNarrowing;

private StringSplitMode stringSplitMode;

private String stringSplitModeCollate;

public static JdbcSourceConfig of(ReadonlyConfig config) {
JdbcSourceConfig.Builder builder = JdbcSourceConfig.builder();
builder.jdbcConnectionConfig(JdbcConnectionConfig.of(config));
Expand All @@ -55,7 +60,8 @@ public static JdbcSourceConfig of(ReadonlyConfig config) {
config.getOptional(JdbcOptions.QUERY).isPresent()
&& config.getOptional(JdbcOptions.PARTITION_COLUMN).isPresent();
builder.useDynamicSplitter(!isOldVersion);

builder.stringSplitMode(config.get(JdbcOptions.STRING_SPLIT_MODE));
builder.stringSplitModeCollate(config.get(JdbcOptions.STRING_SPLIT_MODE_COLLATE));
builder.splitSize(config.get(JdbcSourceOptions.SPLIT_SIZE));
builder.splitEvenDistributionFactorUpperBound(
config.get(JdbcSourceOptions.SPLIT_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import lombok.experimental.Tolerate;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand All @@ -53,10 +52,10 @@ public class JdbcSourceTableConfig implements Serializable {
private Integer partitionNumber;

@JsonProperty("partition_lower_bound")
private BigDecimal partitionStart;
private String partitionStart;

@JsonProperty("partition_upper_bound")
private BigDecimal partitionEnd;
private String partitionEnd;

@JsonProperty("use_select_count")
private Boolean useSelectCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,4 +814,44 @@ default boolean isSpecialDefaultValue(Object defaultValue, String sourceDialectN
default String quotesDefaultValue(Object defaultValue) {
return "'" + defaultValue + "'";
}

default String getCollationSequence(Connection connection, String collate) {
StringBuilder sb = new StringBuilder();
String getDual = dualTable();
String baseQuery = "SELECT char_val FROM (";
StringBuilder unionQuery = new StringBuilder();
for (int i = 32; i <= 126; i++) {
if (i > 32) unionQuery.append(" UNION ALL ");
unionQuery.append("SELECT ? AS char_val ").append(getDual);
}
String sortedQuery =
baseQuery + unionQuery + ") ndi_tmp_chars ORDER BY " + getCollateSql(collate);
log.info("sortedCollationQuery is " + sortedQuery);
PreparedStatement preparedStatement;
try {
preparedStatement = connection.prepareStatement(sortedQuery);
for (int i = 32; i <= 126; i++) {
log.debug("setString " + (i - 32) + " => " + (char) i);
preparedStatement.setString(i - 32 + 1, String.valueOf((char) i));
}

ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
sb.append(resultSet.getString("char_val"));
}
return sb.toString();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

default String getCollateSql(String collate) {
String getCollate =
StringUtils.isNotBlank(collate) ? "char_val COLLATE " + collate : "char_val";
return getCollate;
}

default String dualTable() {
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,9 @@ public Optional<String> getUpsertStatement(

return Optional.of(mergeStatement);
}

@Override
public String dualTable() {
return " FROM SYSIBM.SYSDUMMY1 ";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -482,4 +482,24 @@ private boolean columnIsNullable(Connection connection, TablePath tablePath, Str
return rs.getString("NULLABLE").equals("Y");
}
}

@Override
public String dualTable() {
return " FROM dual ";
}

@Override
public String getCollateSql(String collate) {
if (StringUtils.isNotBlank(collate)) {
StringBuilder sql = new StringBuilder();
sql.append("NLSSORT(")
.append("char_val")
.append(", 'NLS_SORT=")
.append(collate)
.append("')");
return sql.toString();
} else {
return "char_val";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -107,4 +109,22 @@ public Optional<String> getUpsertStatement(

return Optional.of(upsertSQL);
}

/**
* <a
* href="https://docs.vertica.com/23.4.x/en/sql-reference/functions/data-type-specific-functions/string-functions/collation/">vertica-collation</a>
*
* @param collate
* @return
*/
@Override
public String getCollateSql(String collate) {
if (StringUtils.isNotBlank(collate)) {
StringBuilder sql = new StringBuilder();
sql.append("COLLATION(").append("char_val").append(", '").append(collate).append("')");
return sql.toString();
} else {
return "char_val";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ public synchronized void close() {
}
}

protected static String filterOutUppercase(String str) {
StringBuilder sb = new StringBuilder();
for (char c : str.toCharArray()) {
if (!Character.isUpperCase(c)) {
sb.append(c);
}
}
return sb.toString();
}

public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) throws Exception {
log.info("Start splitting table {} into chunks...", table.getTablePath());
long start = System.currentTimeMillis();
Expand Down
Loading
Loading