Skip to content

Commit d36ba85

Browse files
authored
[Fix][Connector-V2] Fix partitioning column selection logic (#10319)
1 parent fd79656 commit d36ba85

File tree

2 files changed

+100
-21
lines changed

2 files changed

+100
-21
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
6767
long start = System.currentTimeMillis();
6868

6969
Column splitColumn = getSplitColumn(jdbc, dialect, tableId);
70+
log.info(
71+
"Chosen split column {} for table {}",
72+
splitColumn != null ? splitColumn.name() : "null",
73+
tableId);
7074
List<SnapshotSplit> splits = new ArrayList<>();
7175
if (splitColumn == null) {
7276
if (sourceConfig.isExactlyOnce()) {
@@ -429,15 +433,11 @@ protected Column getSplitColumn(
429433

430434
Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
431435
if (primaryKey.isPresent()) {
432-
List<String> pkColumns = primaryKey.get().getColumnNames();
433-
434-
for (String pkColumn : pkColumns) {
435-
Column column = table.columnWithName(pkColumn);
436-
if (isEvenlySplitColumn(column)) {
437-
splitColumn = columnComparable(splitColumn, column);
438-
if (sqlTypePriority(splitColumn) == 1) {
439-
return splitColumn;
440-
}
436+
Column firstColumn = table.columnWithName(primaryKey.get().getColumnNames().get(0));
437+
if (isEvenlySplitColumn(firstColumn)) {
438+
splitColumn = columnComparable(splitColumn, firstColumn);
439+
if (sqlTypePriority(splitColumn) == 1) {
440+
return splitColumn;
441441
}
442442
}
443443
} else {
@@ -447,15 +447,12 @@ protected Column getSplitColumn(
447447
List<ConstraintKey> uniqueKeys = dialect.getUniqueKeys(jdbc, tableId);
448448
if (!uniqueKeys.isEmpty()) {
449449
for (ConstraintKey uniqueKey : uniqueKeys) {
450-
List<ConstraintKey.ConstraintKeyColumn> uniqueKeyColumns =
451-
uniqueKey.getColumnNames();
452-
for (ConstraintKey.ConstraintKeyColumn uniqueKeyColumn : uniqueKeyColumns) {
453-
Column column = table.columnWithName(uniqueKeyColumn.getColumnName());
454-
if (isEvenlySplitColumn(column)) {
455-
splitColumn = columnComparable(splitColumn, column);
456-
if (sqlTypePriority(splitColumn) == 1) {
457-
return splitColumn;
458-
}
450+
Column firstColumn =
451+
table.columnWithName(uniqueKey.getColumnNames().get(0).getColumnName());
452+
if (isEvenlySplitColumn(firstColumn)) {
453+
splitColumn = columnComparable(splitColumn, firstColumn);
454+
if (sqlTypePriority(splitColumn) == 1) {
455+
return splitColumn;
459456
}
460457
}
461458
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,36 @@
4848
import java.util.List;
4949
import java.util.Optional;
5050

51-
public class JdbcSourceChunkSplitterTest {
51+
class JdbcSourceChunkSplitterTest {
5252

5353
@Test
54-
public void splitColumnTest() throws SQLException {
54+
void splitColumnTest() throws SQLException {
5555
TestJdbcSourceChunkSplitter testJdbcSourceChunkSplitter =
5656
new TestJdbcSourceChunkSplitter(null, new TestSourceDialect());
5757
Column splitColumn =
5858
testJdbcSourceChunkSplitter.getSplitColumn(
5959
null, new TestSourceDialect(), new TableId("", "", ""));
60-
Assertions.assertEquals(splitColumn.typeName(), "tinyint");
60+
Assertions.assertEquals("varchar", splitColumn.typeName());
61+
}
62+
63+
@Test
64+
void splitColumnTestWithUniqueKey() throws SQLException {
65+
TestJdbcSourceChunkSplitter testJdbcSourceChunkSplitter =
66+
new TestJdbcSourceChunkSplitter(null, new TestSourceDialectWithUniqueKey());
67+
Column splitColumn =
68+
testJdbcSourceChunkSplitter.getSplitColumn(
69+
null, new TestSourceDialectWithUniqueKey(), new TableId("", "", ""));
70+
Assertions.assertEquals("bigint", splitColumn.typeName());
71+
}
72+
73+
@Test
74+
void splitColumnTestWithUniqueKey_2() throws SQLException {
75+
TestJdbcSourceChunkSplitter testJdbcSourceChunkSplitter =
76+
new TestJdbcSourceChunkSplitter(null, new TestSourceDialectWithUniqueKey_2());
77+
Column splitColumn =
78+
testJdbcSourceChunkSplitter.getSplitColumn(
79+
null, new TestSourceDialectWithUniqueKey_2(), new TableId("", "", ""));
80+
Assertions.assertEquals("int", splitColumn.typeName());
6181
}
6282

6383
private class TestJdbcSourceChunkSplitter extends AbstractJdbcSourceChunkSplitter {
@@ -247,4 +267,66 @@ public List<ConstraintKey> getUniqueKeys(JdbcConnection jdbcConnection, TableId
247267
return new ArrayList<ConstraintKey>();
248268
}
249269
}
270+
271+
private class TestSourceDialectWithUniqueKey extends TestSourceDialect {
272+
273+
@Override
274+
public Optional<PrimaryKey> getPrimaryKey(JdbcConnection jdbcConnection, TableId tableId)
275+
throws SQLException {
276+
return Optional.of(PrimaryKey.of("pkName", Arrays.asList("bigint_col")));
277+
}
278+
279+
@Override
280+
public List<ConstraintKey> getUniqueKeys(JdbcConnection jdbcConnection, TableId tableId)
281+
throws SQLException {
282+
List<ConstraintKey> keys = new ArrayList<>();
283+
284+
keys.add(
285+
ConstraintKey.of(
286+
ConstraintKey.ConstraintType.UNIQUE_KEY,
287+
"uk_1",
288+
Arrays.asList(
289+
ConstraintKey.ConstraintKeyColumn.of(
290+
"string_col", ConstraintKey.ColumnSortType.ASC),
291+
ConstraintKey.ConstraintKeyColumn.of(
292+
"int", ConstraintKey.ColumnSortType.ASC))));
293+
294+
return keys;
295+
}
296+
}
297+
298+
private class TestSourceDialectWithUniqueKey_2 extends TestSourceDialect {
299+
300+
@Override
301+
public Optional<PrimaryKey> getPrimaryKey(JdbcConnection jdbcConnection, TableId tableId)
302+
throws SQLException {
303+
return Optional.of(PrimaryKey.of("pkName", Arrays.asList("bigint_col")));
304+
}
305+
306+
@Override
307+
public List<ConstraintKey> getUniqueKeys(JdbcConnection jdbcConnection, TableId tableId)
308+
throws SQLException {
309+
List<ConstraintKey> keys = new ArrayList<>();
310+
311+
keys.add(
312+
ConstraintKey.of(
313+
ConstraintKey.ConstraintType.UNIQUE_KEY,
314+
"uk_1",
315+
Arrays.asList(
316+
ConstraintKey.ConstraintKeyColumn.of(
317+
"string_col", ConstraintKey.ColumnSortType.ASC))));
318+
319+
keys.add(
320+
ConstraintKey.of(
321+
ConstraintKey.ConstraintType.UNIQUE_KEY,
322+
"uk_2",
323+
Arrays.asList(
324+
ConstraintKey.ConstraintKeyColumn.of(
325+
"int", ConstraintKey.ColumnSortType.ASC),
326+
ConstraintKey.ConstraintKeyColumn.of(
327+
"smallint", ConstraintKey.ColumnSortType.ASC))));
328+
329+
return keys;
330+
}
331+
}
250332
}

0 commit comments

Comments
 (0)