Skip to content

Commit dbe41e7

Browse files
jinkachychenhongyu05
and
chenhongyu05
authored
[Feature][Jdbc] Add String type column split Support by charset-based splitting algorithm (#9002)
Co-authored-by: chenhongyu05 <[email protected]>
1 parent 3b54837 commit dbe41e7

File tree

18 files changed

+1055
-54
lines changed

18 files changed

+1055
-54
lines changed

Diff for: docs/en/connector-v2/source/Jdbc.md

+5-3
Large diffs are not rendered by default.

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java

+19-5
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import org.apache.seatunnel.api.sink.DataSaveMode;
2323
import org.apache.seatunnel.api.sink.SchemaSaveMode;
2424
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
25+
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.StringSplitMode;
2526

26-
import java.math.BigDecimal;
2727
import java.util.List;
2828
import java.util.Map;
2929

@@ -171,14 +171,14 @@ public interface JdbcOptions {
171171
.noDefaultValue()
172172
.withDescription("partition column");
173173

174-
Option<BigDecimal> PARTITION_UPPER_BOUND =
174+
Option<String> PARTITION_UPPER_BOUND =
175175
Options.key("partition_upper_bound")
176-
.bigDecimalType()
176+
.stringType()
177177
.noDefaultValue()
178178
.withDescription("partition upper bound");
179-
Option<BigDecimal> PARTITION_LOWER_BOUND =
179+
Option<String> PARTITION_LOWER_BOUND =
180180
Options.key("partition_lower_bound")
181-
.bigDecimalType()
181+
.stringType()
182182
.noDefaultValue()
183183
.withDescription("partition lower bound");
184184
Option<Integer> PARTITION_NUM =
@@ -225,4 +225,18 @@ public interface JdbcOptions {
225225
.mapType()
226226
.noDefaultValue()
227227
.withDescription("additional connection configuration parameters");
228+
229+
Option<StringSplitMode> STRING_SPLIT_MODE =
230+
Options.key("split.string_split_mode")
231+
.enumType(StringSplitMode.class)
232+
.defaultValue(StringSplitMode.SAMPLE)
233+
.withDescription(
234+
"Supports different string splitting algorithms. By default, `sample` is used to determine the split by sampling the string value. You can switch to `charset_based` to enable charset-based string splitting algorithm. When set to `charset_based`, the algorithm assumes characters of partition_column are within ASCII range 32-126, which covers most character-based splitting scenarios.");
235+
236+
Option<String> STRING_SPLIT_MODE_COLLATE =
237+
Options.key("split.string_split_mode_collate")
238+
.stringType()
239+
.noDefaultValue()
240+
.withDescription(
241+
"Specifies the collation to use when string_split_mode is set to `charset_based` and the table has a special collation. If not specified, the database's default collation will be used.");
228242
}

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.jdbc.config;
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.StringSplitMode;
2122

2223
import lombok.Builder;
2324
import lombok.Data;
@@ -44,6 +45,10 @@ public class JdbcSourceConfig implements Serializable {
4445
private int splitInverseSamplingRate;
4546
private boolean decimalTypeNarrowing;
4647

48+
private StringSplitMode stringSplitMode;
49+
50+
private String stringSplitModeCollate;
51+
4752
public static JdbcSourceConfig of(ReadonlyConfig config) {
4853
JdbcSourceConfig.Builder builder = JdbcSourceConfig.builder();
4954
builder.jdbcConnectionConfig(JdbcConnectionConfig.of(config));
@@ -55,7 +60,8 @@ public static JdbcSourceConfig of(ReadonlyConfig config) {
5560
config.getOptional(JdbcOptions.QUERY).isPresent()
5661
&& config.getOptional(JdbcOptions.PARTITION_COLUMN).isPresent();
5762
builder.useDynamicSplitter(!isOldVersion);
58-
63+
builder.stringSplitMode(config.get(JdbcOptions.STRING_SPLIT_MODE));
64+
builder.stringSplitModeCollate(config.get(JdbcOptions.STRING_SPLIT_MODE_COLLATE));
5965
builder.splitSize(config.get(JdbcSourceOptions.SPLIT_SIZE));
6066
builder.splitEvenDistributionFactorUpperBound(
6167
config.get(JdbcSourceOptions.SPLIT_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND));

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import lombok.experimental.Tolerate;
2828

2929
import java.io.Serializable;
30-
import java.math.BigDecimal;
3130
import java.util.Collections;
3231
import java.util.HashSet;
3332
import java.util.List;
@@ -53,10 +52,10 @@ public class JdbcSourceTableConfig implements Serializable {
5352
private Integer partitionNumber;
5453

5554
@JsonProperty("partition_lower_bound")
56-
private BigDecimal partitionStart;
55+
private String partitionStart;
5756

5857
@JsonProperty("partition_upper_bound")
59-
private BigDecimal partitionEnd;
58+
private String partitionEnd;
6059

6160
@JsonProperty("use_select_count")
6261
private Boolean useSelectCount;

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java

+40
Original file line numberDiff line numberDiff line change
@@ -814,4 +814,44 @@ default boolean isSpecialDefaultValue(Object defaultValue, String sourceDialectN
814814
default String quotesDefaultValue(Object defaultValue) {
815815
return "'" + defaultValue + "'";
816816
}
817+
818+
default String getCollationSequence(Connection connection, String collate) {
819+
StringBuilder sb = new StringBuilder();
820+
String getDual = dualTable();
821+
String baseQuery = "SELECT char_val FROM (";
822+
StringBuilder unionQuery = new StringBuilder();
823+
for (int i = 32; i <= 126; i++) {
824+
if (i > 32) unionQuery.append(" UNION ALL ");
825+
unionQuery.append("SELECT ? AS char_val ").append(getDual);
826+
}
827+
String sortedQuery =
828+
baseQuery + unionQuery + ") ndi_tmp_chars ORDER BY " + getCollateSql(collate);
829+
log.info("sortedCollationQuery is " + sortedQuery);
830+
PreparedStatement preparedStatement;
831+
try {
832+
preparedStatement = connection.prepareStatement(sortedQuery);
833+
for (int i = 32; i <= 126; i++) {
834+
log.debug("setString " + (i - 32) + " => " + (char) i);
835+
preparedStatement.setString(i - 32 + 1, String.valueOf((char) i));
836+
}
837+
838+
ResultSet resultSet = preparedStatement.executeQuery();
839+
while (resultSet.next()) {
840+
sb.append(resultSet.getString("char_val"));
841+
}
842+
return sb.toString();
843+
} catch (SQLException e) {
844+
throw new RuntimeException(e);
845+
}
846+
}
847+
848+
default String getCollateSql(String collate) {
849+
String getCollate =
850+
StringUtils.isNotBlank(collate) ? "char_val COLLATE " + collate : "char_val";
851+
return getCollate;
852+
}
853+
854+
default String dualTable() {
855+
return "";
856+
}
817857
}

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java

+5
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,9 @@ public Optional<String> getUpsertStatement(
9898

9999
return Optional.of(mergeStatement);
100100
}
101+
102+
@Override
103+
public String dualTable() {
104+
return " FROM SYSIBM.SYSDUMMY1 ";
105+
}
101106
}

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java

+20
Original file line numberDiff line numberDiff line change
@@ -482,4 +482,24 @@ private boolean columnIsNullable(Connection connection, TablePath tablePath, Str
482482
return rs.getString("NULLABLE").equals("Y");
483483
}
484484
}
485+
486+
@Override
487+
public String dualTable() {
488+
return " FROM dual ";
489+
}
490+
491+
@Override
492+
public String getCollateSql(String collate) {
493+
if (StringUtils.isNotBlank(collate)) {
494+
StringBuilder sql = new StringBuilder();
495+
sql.append("NLSSORT(")
496+
.append("char_val")
497+
.append(", 'NLS_SORT=")
498+
.append(collate)
499+
.append("')");
500+
return sql.toString();
501+
} else {
502+
return "char_val";
503+
}
504+
}
485505
}

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java

+20
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
2323
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
2424

25+
import org.apache.commons.lang3.StringUtils;
26+
2527
import java.util.Arrays;
2628
import java.util.List;
2729
import java.util.Optional;
@@ -107,4 +109,22 @@ public Optional<String> getUpsertStatement(
107109

108110
return Optional.of(upsertSQL);
109111
}
112+
113+
/**
114+
* <a
115+
* href="https://docs.vertica.com/23.4.x/en/sql-reference/functions/data-type-specific-functions/string-functions/collation/">vertica-collation</a>
116+
*
117+
* @param collate
118+
* @return
119+
*/
120+
@Override
121+
public String getCollateSql(String collate) {
122+
if (StringUtils.isNotBlank(collate)) {
123+
StringBuilder sql = new StringBuilder();
124+
sql.append("COLLATION(").append("char_val").append(", '").append(collate).append("')");
125+
return sql.toString();
126+
} else {
127+
return "char_val";
128+
}
129+
}
110130
}

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java

+10
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,16 @@ public synchronized void close() {
8585
}
8686
}
8787

88+
protected static String filterOutUppercase(String str) {
89+
StringBuilder sb = new StringBuilder();
90+
for (char c : str.toCharArray()) {
91+
if (!Character.isUpperCase(c)) {
92+
sb.append(c);
93+
}
94+
}
95+
return sb.toString();
96+
}
97+
8898
public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) throws Exception {
8999
log.info("Start splitting table {} into chunks...", table.getTablePath());
90100
long start = System.currentTimeMillis();

0 commit comments

Comments
 (0)