Skip to content

Commit 35afa46

Browse files
committed
FIX: change converting type to bigDecimal
1 parent eb61a68 commit 35afa46

2 files changed

Lines changed: 7 additions & 5 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.debezium.jdbc.JdbcConnection;
2323
import io.debezium.relational.TableId;
2424

25+
import java.math.BigDecimal;
2526
import java.math.BigInteger;
2627
import java.sql.Connection;
2728
import java.sql.PreparedStatement;
@@ -79,14 +80,14 @@ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
7980

8081
// PreparedStatement#setObject method will be converted to long type when handling bigint
8182
// unsigned, which poses a data overflow issue.
82-
// Therefore, we need to handle the overflow issue by converting the long value to BigInteger.
83+
// Therefore, we need to handle the overflow issue by converting the bigint unsigned to BigDecimal.
8384
public static void setSafeObject(PreparedStatement ps, int parameterIndex, Object value)
8485
throws SQLException {
85-
if (value instanceof Long && (Long) value < 0L) {
86-
ps.setObject(parameterIndex, new BigInteger(Long.toUnsignedString((Long) value)));
87-
return;
86+
if (value instanceof BigInteger) {
87+
ps.setBigDecimal(parameterIndex, new BigDecimal((BigInteger) value));
88+
} else {
89+
ps.setObject(parameterIndex, value);
8890
}
89-
ps.setObject(parameterIndex, value);
9091
}
9192

9293
public static Object queryMin(

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1513,6 +1513,7 @@ void testUnsignedBigintPrimaryKeyChunking() throws Exception {
15131513
.deserializer(deserializer)
15141514
.startupOptions(StartupOptions.initial())
15151515
.chunkKeyColumn(new ObjectPath(db, table), "order_id")
1516+
.splitSize(2)
15161517
.build();
15171518

15181519
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

0 commit comments

Comments
 (0)