Skip to content

Commit 97d81b6

Browse files
committed
fix decimal division failure
1 parent 34753fc commit 97d81b6

4 files changed

Lines changed: 31 additions & 19 deletions

File tree

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/CommonConverter.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
3434
import org.apache.flink.cdc.common.types.ArrayType;
3535
import org.apache.flink.cdc.common.types.DataType;
36+
import org.apache.flink.cdc.common.types.DecimalType;
3637
import org.apache.flink.cdc.common.types.MapType;
3738
import org.apache.flink.cdc.common.types.RowType;
3839
import org.apache.flink.cdc.common.types.VariantType;
@@ -153,13 +154,21 @@ static StringData convertToStringData(Object obj) {
153154
"Cannot convert " + obj + " of type " + obj.getClass() + " to STRING DATA.");
154155
}
155156

156-
static DecimalData convertToDecimalData(Object obj) {
157+
static DecimalData convertToDecimalData(Object obj, DecimalType decimalType) {
157158
if (obj instanceof DecimalData) {
158-
return (DecimalData) obj;
159+
DecimalData dd = (DecimalData) obj;
160+
// Re-convert to target precision and scale if different
161+
if (dd.precision() == decimalType.getPrecision()
162+
&& dd.scale() == decimalType.getScale()) {
163+
return dd;
164+
}
165+
return DecimalData.fromBigDecimal(
166+
dd.toBigDecimal(), decimalType.getPrecision(), decimalType.getScale());
159167
}
160168
if (obj instanceof BigDecimal) {
161169
BigDecimal bd = (BigDecimal) obj;
162-
return DecimalData.fromBigDecimal(bd, bd.precision(), bd.scale());
170+
return DecimalData.fromBigDecimal(
171+
bd, decimalType.getPrecision(), decimalType.getScale());
163172
}
164173
throw new RuntimeException(
165174
"Cannot convert " + obj + " of type " + obj.getClass() + " to DECIMAL DATA.");

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalObjectConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public Function<Object, byte[]> visit(VarBinaryType varBinaryType) {
8787

8888
@Override
8989
public Function<Object, DecimalData> visit(DecimalType decimalType) {
90-
return CommonConverter::convertToDecimalData;
90+
return obj -> CommonConverter.convertToDecimalData(obj, decimalType);
9191
}
9292

9393
@Override

flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalObjectConverterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ void testConvertToDecimal() {
195195
.hasToString("4.2");
196196
assertThat(convertToInternal(new BigDecimal("-3.1415926"), DataTypes.DECIMAL(20, 10)))
197197
.isInstanceOf(DecimalData.class)
198-
.hasToString("-3.1415926");
198+
.hasToString("-3.1415926000");
199199

200200
assertThat(
201201
convertToInternal(

flink-cdc-composer/src/test/resources/specs/decimal.yaml

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,27 +40,29 @@
4040
- do: Multiply Op
4141
projection: |-
4242
id_
43+
decimal_10_0_, decimal_20_2_
4344
decimal_10_0_ * CAST(2 AS DECIMAL(1, 0)) AS comp_1
4445
decimal_20_2_ * CAST(2 AS DECIMAL(1, 0)) AS comp_2
4546
primary-key: id_
4647
non-null: 'true'
4748
expect: |-
48-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DECIMAL(11, 0),`comp_2` DECIMAL(21, 2)}, primaryKeys=id_, options=()}
49-
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 2469135780, 246913578024691357.80], op=INSERT, meta=()}
50-
DataChangeEvent{tableId=foo.bar.baz, before=[1, 2469135780, 246913578024691357.80], after=[-1, -19753086420, -1975308642197530864.20], op=UPDATE, meta=()}
51-
DataChangeEvent{tableId=foo.bar.baz, before=[-1, -19753086420, -1975308642197530864.20], after=[], op=DELETE, meta=()}
49+
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`decimal_10_0_` DECIMAL(10, 0),`decimal_20_2_` DECIMAL(20, 2),`comp_1` DECIMAL(11, 0),`comp_2` DECIMAL(21, 2)}, primaryKeys=id_, options=()}
50+
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567890, 123456789012345678.90, 2469135780, 246913578024691357.80], op=INSERT, meta=()}
51+
DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567890, 123456789012345678.90, 2469135780, 246913578024691357.80], after=[-1, -9876543210, -987654321098765432.10, -19753086420, -1975308642197530864.20], op=UPDATE, meta=()}
52+
DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543210, -987654321098765432.10, -19753086420, -1975308642197530864.20], after=[], op=DELETE, meta=()}
5253
- do: Divide Op
5354
projection: |-
5455
id_
56+
decimal_10_0_, decimal_20_2_
5557
decimal_10_0_ / CAST(2 AS DECIMAL(1, 0)) AS comp_1
5658
decimal_20_2_ / CAST(2 AS DECIMAL(1, 0)) AS comp_2
5759
primary-key: id_
5860
non-null: 'true'
5961
expect: |-
60-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DECIMAL(16, 6),`comp_2` DECIMAL(24, 6)}, primaryKeys=id_, options=()}
61-
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 617.283945, 6172839450617.283945], op=INSERT, meta=()}
62-
DataChangeEvent{tableId=foo.bar.baz, before=[1, 617.283945, 6172839450617.283945], after=[-1, -4938.271605, -49382716054938.271605], op=UPDATE, meta=()}
63-
DataChangeEvent{tableId=foo.bar.baz, before=[-1, -4938.271605, -49382716054938.271605], after=[], op=DELETE, meta=()}
62+
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`decimal_10_0_` DECIMAL(10, 0),`decimal_20_2_` DECIMAL(20, 2),`comp_1` DECIMAL(16, 6),`comp_2` DECIMAL(24, 6)}, primaryKeys=id_, options=()}
63+
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567890, 123456789012345678.90, 617283945.000000, 61728394506172839.450000], op=INSERT, meta=()}
64+
DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567890, 123456789012345678.90, 617283945.000000, 61728394506172839.450000], after=[-1, -9876543210, -987654321098765432.10, -4938271605.000000, -493827160549382716.050000], op=UPDATE, meta=()}
65+
DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543210, -987654321098765432.10, -4938271605.000000, -493827160549382716.050000], after=[], op=DELETE, meta=()}
6466
- do: Abs Op
6567
projection: |-
6668
id_
@@ -103,13 +105,14 @@
103105
- do: Round Op
104106
projection: |-
105107
id_
108+
decimal_10_0_, decimal_20_2_
106109
ROUND(decimal_10_0_, 1) AS comp_1
107110
ROUND(decimal_20_2_, 1) AS comp_2
108111
primary-key: id_
109112
expect: |-
110-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`comp_1` DECIMAL(10, 0),`comp_2` DECIMAL(20, 1)}, primaryKeys=id_, options=()}
111-
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 12345678900, 123456789012345678.9], op=INSERT, meta=()}
112-
DataChangeEvent{tableId=foo.bar.baz, before=[1, 12345678900, 123456789012345678.9], after=[-1, -98765432100, -987654321098765432.1], op=UPDATE, meta=()}
113-
DataChangeEvent{tableId=foo.bar.baz, before=[-1, -98765432100, -987654321098765432.1], after=[], op=DELETE, meta=()}
114-
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], op=INSERT, meta=()}
115-
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], op=DELETE, meta=()}
113+
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`decimal_10_0_` DECIMAL(10, 0),`decimal_20_2_` DECIMAL(20, 2),`comp_1` DECIMAL(10, 0),`comp_2` DECIMAL(20, 1)}, primaryKeys=id_, options=()}
114+
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1234567890, 123456789012345678.90, 1234567890, 123456789012345678.9], op=INSERT, meta=()}
115+
DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567890, 123456789012345678.90, 1234567890, 123456789012345678.9], after=[-1, -9876543210, -987654321098765432.10, -9876543210, -987654321098765432.1], op=UPDATE, meta=()}
116+
DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543210, -987654321098765432.10, -9876543210, -987654321098765432.1], after=[], op=DELETE, meta=()}
117+
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null, null, null], op=INSERT, meta=()}
118+
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null, null], after=[], op=DELETE, meta=()}

0 commit comments

Comments
 (0)