Skip to content

Commit 1de9849

Browse files
suhwan-cheonThorneANN
authored andcommitted
[FLINK-38247][MySQL] Handle BIGINT UNSIGNED overflow correctly in PreparedStatement. (apache#4117)
1 parent ffb8b14 commit 1de9849

3 files changed

Lines changed: 235 additions & 8 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.debezium.jdbc.JdbcConnection;
2323
import io.debezium.relational.TableId;
2424

25+
import java.math.BigDecimal;
26+
import java.math.BigInteger;
2527
import java.sql.Connection;
2628
import java.sql.PreparedStatement;
2729
import java.sql.SQLException;
@@ -76,6 +78,24 @@ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
7678
});
7779
}
7880

81+
// PreparedStatement#setObject method will be converted to long type when handling bigint
82+
// unsigned, which poses a data overflow issue for values exceeding Long.MAX_VALUE.
83+
// Therefore, we need to convert to BigDecimal when the value is outside the long range
84+
public static void setSafeObject(PreparedStatement ps, int parameterIndex, Object value)
85+
throws SQLException {
86+
if (value instanceof BigInteger) {
87+
BigInteger bigIntValue = (BigInteger) value;
88+
if (bigIntValue.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0
89+
|| bigIntValue.compareTo(BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
90+
ps.setBigDecimal(parameterIndex, new BigDecimal(bigIntValue));
91+
} else {
92+
ps.setObject(parameterIndex, bigIntValue.longValueExact());
93+
}
94+
} else {
95+
ps.setObject(parameterIndex, value);
96+
}
97+
}
98+
7999
public static Object queryMin(
80100
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
81101
throws SQLException {
@@ -85,7 +105,7 @@ public static Object queryMin(
85105
quote(columnName), quote(tableId), quote(columnName));
86106
return jdbc.prepareQueryAndMap(
87107
minQuery,
88-
ps -> ps.setObject(1, excludedLowerBound),
108+
ps -> setSafeObject(ps, 1, excludedLowerBound),
89109
rs -> {
90110
if (!rs.next()) {
91111
// this should never happen
@@ -118,7 +138,7 @@ public static Object queryNextChunkMax(
118138
chunkSize);
119139
return jdbc.prepareQueryAndMap(
120140
query,
121-
ps -> ps.setObject(1, includedLowerBound),
141+
ps -> setSafeObject(ps, 1, includedLowerBound),
122142
rs -> {
123143
if (!rs.next()) {
124144
// this should never happen
@@ -204,18 +224,18 @@ public static PreparedStatement readTableSplitDataStatement(
204224
}
205225
if (isFirstSplit) {
206226
for (int i = 0; i < primaryKeyNum; i++) {
207-
statement.setObject(i + 1, splitEnd[i]);
208-
statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]);
227+
setSafeObject(statement, i + 1, splitEnd[i]);
228+
setSafeObject(statement, i + 1 + primaryKeyNum, splitEnd[i]);
209229
}
210230
} else if (isLastSplit) {
211231
for (int i = 0; i < primaryKeyNum; i++) {
212-
statement.setObject(i + 1, splitStart[i]);
232+
setSafeObject(statement, i + 1, splitStart[i]);
213233
}
214234
} else {
215235
for (int i = 0; i < primaryKeyNum; i++) {
216-
statement.setObject(i + 1, splitStart[i]);
217-
statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]);
218-
statement.setObject(i + 1 + 2 * primaryKeyNum, splitEnd[i]);
236+
setSafeObject(statement, i + 1, splitStart[i]);
237+
setSafeObject(statement, i + 1 + primaryKeyNum, splitEnd[i]);
238+
setSafeObject(statement, i + 1 + 2 * primaryKeyNum, splitEnd[i]);
219239
}
220240
}
221241
return statement;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.flink.table.catalog.ObjectPath;
5757
import org.apache.flink.table.catalog.ResolvedSchema;
5858
import org.apache.flink.table.catalog.UniqueConstraint;
59+
import org.apache.flink.table.data.DecimalData;
5960
import org.apache.flink.table.data.RowData;
6061
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
6162
import org.apache.flink.table.types.DataType;
@@ -1447,6 +1448,99 @@ private boolean hasNextData(final CloseableIterator<?> iterator)
14471448
}
14481449
}
14491450

1451+
@Test
1452+
void testUnsignedBigintPrimaryKeyChunking() throws Exception {
1453+
customDatabase.createAndInitialize();
1454+
1455+
String db = customDatabase.getDatabaseName();
1456+
String table = "unsigned_bigint_pk";
1457+
try (MySqlConnection connection = getConnection()) {
1458+
connection.setAutoCommit(false);
1459+
String createSql =
1460+
String.format(
1461+
"CREATE TABLE %s.%s (\n"
1462+
+ " `order_id` BIGINT UNSIGNED NOT NULL,\n"
1463+
+ " `desc` VARCHAR(512) NOT NULL,\n"
1464+
+ " PRIMARY KEY (`order_id`)\n"
1465+
+ ") ENGINE=InnoDB DEFAULT CHARSET=utf8;",
1466+
StatementUtils.quote(db), StatementUtils.quote(table));
1467+
// Insert sample data including values near UNSIGNED BIGINT max
1468+
String insertSql =
1469+
String.format(
1470+
"INSERT INTO %s.%s (`order_id`, `desc`) VALUES "
1471+
+ "(1, 'flink'),(2, 'flink'),(3, 'flink'),(4, 'flink'),(5, 'flink'),"
1472+
+ "(6, 'flink'),(7, 'flink'),(8, 'flink'),(9, 'flink'),(10, 'flink'),"
1473+
+ "(11, 'flink'),(12, 'flink'),"
1474+
+ "(18446744073709551604, 'flink'),(18446744073709551605, 'flink'),"
1475+
+ "(18446744073709551606, 'flink'),(18446744073709551607, 'flink'),"
1476+
+ "(18446744073709551608, 'flink'),(18446744073709551609, 'flink'),"
1477+
+ "(18446744073709551610, 'flink'),(18446744073709551611, 'flink'),"
1478+
+ "(18446744073709551612, 'flink'),(18446744073709551613, 'flink'),"
1479+
+ "(18446744073709551614, 'flink'),(18446744073709551615, 'flink');",
1480+
StatementUtils.quote(db), StatementUtils.quote(table));
1481+
// Drop if exists to be idempotent across runs, then create and insert
1482+
connection.execute(
1483+
String.format(
1484+
"DROP TABLE IF EXISTS %s.%s;",
1485+
StatementUtils.quote(db), StatementUtils.quote(table)),
1486+
createSql,
1487+
insertSql);
1488+
connection.commit();
1489+
}
1490+
1491+
// Build a source reading only the unsigned_bigint_pk table
1492+
DataType dataType =
1493+
DataTypes.ROW(
1494+
DataTypes.FIELD("order_id", DataTypes.DECIMAL(20, 0)),
1495+
DataTypes.FIELD("desc", DataTypes.STRING()));
1496+
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
1497+
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
1498+
RowDataDebeziumDeserializeSchema deserializer =
1499+
RowDataDebeziumDeserializeSchema.newBuilder()
1500+
.setPhysicalRowType((RowType) dataType.getLogicalType())
1501+
.setResultTypeInfo(typeInfo)
1502+
.build();
1503+
1504+
MySqlSource<RowData> source =
1505+
MySqlSource.<RowData>builder()
1506+
.hostname(MYSQL_CONTAINER.getHost())
1507+
.port(MYSQL_CONTAINER.getDatabasePort())
1508+
.username(customDatabase.getUsername())
1509+
.password(customDatabase.getPassword())
1510+
.serverTimeZone("UTC")
1511+
.databaseList(db)
1512+
.tableList(db + "." + table)
1513+
.deserializer(deserializer)
1514+
.startupOptions(StartupOptions.initial())
1515+
.chunkKeyColumn(new ObjectPath(db, table), "order_id")
1516+
.splitSize(2)
1517+
.build();
1518+
1519+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
1520+
env.setParallelism(1);
1521+
try (CloseableIterator<RowData> it =
1522+
env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
1523+
.executeAndCollect()) {
1524+
// Expect 24 records as inserted above
1525+
List<String> result = fetchRowData(it, 24, this::stringifyUnsignedPkRow);
1526+
// Validate a couple of boundary values exist to ensure chunking across unsigned range
1527+
// works
1528+
assertThat(result)
1529+
.contains(
1530+
"+I[1, flink]",
1531+
"+I[12, flink]",
1532+
"+I[18446744073709551604, flink]",
1533+
"+I[18446744073709551615, flink]");
1534+
}
1535+
}
1536+
1537+
private String stringifyUnsignedPkRow(RowData row) {
1538+
DecimalData decimal = row.getDecimal(0, 20, 0);
1539+
String orderId = decimal.toBigDecimal().toPlainString();
1540+
String desc = row.getString(1).toString();
1541+
return "+I[" + orderId + ", " + desc + "]";
1542+
}
1543+
14501544
/**
14511545
* A {@link DebeziumDeserializationSchema} implementation which sleep given milliseconds after
14521546
* deserialize per record, this class is designed for test.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.mysql.source.utils;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import java.lang.reflect.Proxy;
23+
import java.math.BigDecimal;
24+
import java.math.BigInteger;
25+
import java.sql.PreparedStatement;
26+
import java.sql.SQLException;
27+
import java.util.HashMap;
28+
import java.util.Map;
29+
30+
import static org.assertj.core.api.Assertions.assertThat;
31+
32+
/** Unit test for {@link org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils}. */
33+
class StatementUtilsTest {
34+
35+
@Test
36+
void testSetSafeObjectConvertsBigIntegerToBigDecimal() throws SQLException {
37+
Map<String, Object> invocationDetails = new HashMap<>();
38+
PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails);
39+
40+
// Create a BigInteger value that exceeds Long.MAX_VALUE
41+
BigInteger bigIntValue = new BigInteger("9223372036854775808"); // Long.MAX_VALUE + 1
42+
BigDecimal expectedBigDecimal = new BigDecimal(bigIntValue);
43+
44+
// Use the safe method
45+
StatementUtils.setSafeObject(psProxy, 1, bigIntValue);
46+
47+
// Assert that it correctly used setBigDecimal with the converted BigDecimal value
48+
assertThat(invocationDetails.get("methodName")).isEqualTo("setBigDecimal");
49+
assertThat(invocationDetails.get("value")).isInstanceOf(BigDecimal.class);
50+
assertThat(invocationDetails.get("value")).isEqualTo(expectedBigDecimal);
51+
}
52+
53+
@Test
54+
void testSetSafeObjectHandlesLargeBigIntegerValues() throws SQLException {
55+
Map<String, Object> invocationDetails = new HashMap<>();
56+
PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails);
57+
58+
// Test with BIGINT UNSIGNED max value
59+
BigInteger maxUnsignedBigInt = new BigInteger("18446744073709551615"); // 2^64 - 1
60+
BigDecimal expectedBigDecimal = new BigDecimal(maxUnsignedBigInt);
61+
62+
StatementUtils.setSafeObject(psProxy, 1, maxUnsignedBigInt);
63+
64+
assertThat(invocationDetails.get("methodName")).isEqualTo("setBigDecimal");
65+
assertThat(invocationDetails.get("value")).isEqualTo(expectedBigDecimal);
66+
}
67+
68+
@Test
69+
void testSetSafeObjectHandlesRegularValues() throws SQLException {
70+
Map<String, Object> invocationDetails = new HashMap<>();
71+
PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails);
72+
73+
// Test with a common Long
74+
StatementUtils.setSafeObject(psProxy, 1, 123L);
75+
assertThat(invocationDetails.get("methodName")).isEqualTo("setObject");
76+
assertThat(invocationDetails.get("value")).isEqualTo(123L);
77+
invocationDetails.clear();
78+
79+
// Test with a String
80+
StatementUtils.setSafeObject(psProxy, 2, "test");
81+
assertThat(invocationDetails.get("methodName")).isEqualTo("setObject");
82+
assertThat(invocationDetails.get("value")).isEqualTo("test");
83+
invocationDetails.clear();
84+
85+
// Test with an Integer
86+
StatementUtils.setSafeObject(psProxy, 3, 456);
87+
assertThat(invocationDetails.get("methodName")).isEqualTo("setObject");
88+
assertThat(invocationDetails.get("value")).isEqualTo(456);
89+
invocationDetails.clear();
90+
91+
// Test with null
92+
StatementUtils.setSafeObject(psProxy, 4, null);
93+
assertThat(invocationDetails.get("methodName")).isEqualTo("setObject");
94+
assertThat(invocationDetails.get("value")).isNull();
95+
}
96+
97+
private PreparedStatement createPreparedStatementProxy(Map<String, Object> invocationDetails) {
98+
return (PreparedStatement)
99+
Proxy.newProxyInstance(
100+
StatementUtilsTest.class.getClassLoader(),
101+
new Class<?>[] {PreparedStatement.class},
102+
(proxy, method, args) -> {
103+
String methodName = method.getName();
104+
if (methodName.equals("setObject")
105+
|| methodName.equals("setBigDecimal")) {
106+
invocationDetails.put("methodName", methodName);
107+
invocationDetails.put("parameterIndex", args[0]);
108+
invocationDetails.put("value", args[1]);
109+
}
110+
return null;
111+
});
112+
}
113+
}

0 commit comments

Comments
 (0)