Skip to content

Commit 7355d76

Browse files
haruki-830yuxiqian.yxq
andauthored
[FLINK-39758] Fix DECIMAL OOB in SchemaMergingUtils (#4419)
Co-authored-by: yuxiqian.yxq <yuxiqian.yxq@alibaba-inc.com>
1 parent 15288b3 commit 7355d76

4 files changed

Lines changed: 191 additions & 85 deletions

File tree

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -435,15 +435,7 @@ static DataType mergeDecimalType(DataType lType, DataType rType) {
435435
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
436436
rhsDecimal.getPrecision() - rhsDecimal.getScale());
437437
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
438-
Preconditions.checkArgument(
439-
resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
440-
String.format(
441-
"Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available",
442-
lType,
443-
rType,
444-
resultIntDigits + resultScale,
445-
DecimalType.MAX_PRECISION));
446-
return DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
438+
return createDecimalBounded(resultIntDigits + resultScale, resultScale);
447439
} else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
448440
// Merge decimal and int
449441
return mergeExactNumericsIntoDecimal((DecimalType) lType, rType);
@@ -461,11 +453,7 @@ private static DataType mergeExactNumericsIntoDecimal(
461453
Math.max(
462454
decimalType.getPrecision(),
463455
decimalType.getScale() + getNumericPrecision(otherType));
464-
if (resultPrecision <= DecimalType.MAX_PRECISION) {
465-
return DataTypes.DECIMAL(resultPrecision, decimalType.getScale());
466-
} else {
467-
return DataTypes.STRING();
468-
}
456+
return createDecimalBounded(resultPrecision, decimalType.getScale());
469457
}
470458

471459
@VisibleForTesting
@@ -935,4 +923,13 @@ private static Map<Class<? extends DataType>, List<DataType>> getTypeMergingTree
935923
mergingTree.put(VariantType.class, ImmutableList.of(stringType));
936924
return mergingTree;
937925
}
926+
927+
static DecimalType createDecimalBounded(int precision, int scale) {
928+
if (precision > DecimalType.MAX_PRECISION) {
929+
int lossDigits = precision - DecimalType.MAX_PRECISION;
930+
return DataTypes.DECIMAL(precision - lossDigits, scale - lossDigits);
931+
} else {
932+
return DataTypes.DECIMAL(precision, scale);
933+
}
934+
}
938935
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import java.util.stream.Collectors;
5252
import java.util.stream.IntStream;
5353

54+
import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.createDecimalBounded;
55+
5456
/** Utils for {@link Schema} to perform the ability of evolution. */
5557
@PublicEvolving
5658
public class SchemaUtils {
@@ -575,15 +577,7 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
575577
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
576578
rhsDecimal.getPrecision() - rhsDecimal.getScale());
577579
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
578-
Preconditions.checkArgument(
579-
resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
580-
String.format(
581-
"Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available",
582-
lType,
583-
rType,
584-
resultIntDigits + resultScale,
585-
DecimalType.MAX_PRECISION));
586-
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
580+
mergedType = createDecimalBounded(resultIntDigits + resultScale, resultScale);
587581
} else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
588582
// Merge decimal and int
589583
mergedType = mergeExactNumericsIntoDecimal((DecimalType) lType, rType);
@@ -608,12 +602,7 @@ private static DataType mergeExactNumericsIntoDecimal(
608602
Math.max(
609603
decimalType.getPrecision(),
610604
decimalType.getScale() + getNumericPrecision(otherType));
611-
Preconditions.checkArgument(
612-
resultPrecision <= DecimalType.MAX_PRECISION,
613-
String.format(
614-
"Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available",
615-
decimalType, otherType, resultPrecision, DecimalType.MAX_PRECISION));
616-
return DataTypes.DECIMAL(resultPrecision, decimalType.getScale());
605+
return createDecimalBounded(resultPrecision, decimalType.getScale());
617606
}
618607

619608
@Deprecated

flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -314,21 +314,15 @@ void testInferWiderType() {
314314
.isEqualTo(DataTypes.DECIMAL(12, 4));
315315

316316
// Test overflow decimal conversions
317-
Assertions.assertThatThrownBy(
318-
() ->
319-
SchemaUtils.inferWiderType(
320-
DataTypes.DECIMAL(5, 5), DataTypes.DECIMAL(38, 0)))
321-
.isExactlyInstanceOf(IllegalArgumentException.class)
322-
.hasMessage(
323-
"Failed to merge DECIMAL(5, 5) NOT NULL and DECIMAL(38, 0) NOT NULL type into DECIMAL. 43 precision digits required, 38 available");
317+
Assertions.assertThat(
318+
SchemaUtils.inferWiderType(
319+
DataTypes.DECIMAL(5, 5), DataTypes.DECIMAL(38, 0)))
320+
.isEqualTo(DataTypes.DECIMAL(38, 0));
324321

325-
Assertions.assertThatThrownBy(
326-
() ->
327-
SchemaUtils.inferWiderType(
328-
DataTypes.DECIMAL(38, 0), DataTypes.DECIMAL(5, 5)))
329-
.isExactlyInstanceOf(IllegalArgumentException.class)
330-
.hasMessage(
331-
"Failed to merge DECIMAL(38, 0) NOT NULL and DECIMAL(5, 5) NOT NULL type into DECIMAL. 43 precision digits required, 38 available");
322+
Assertions.assertThat(
323+
SchemaUtils.inferWiderType(
324+
DataTypes.DECIMAL(38, 0), DataTypes.DECIMAL(5, 5)))
325+
.isEqualTo(DataTypes.DECIMAL(38, 0));
332326

333327
// Test merging with nullability
334328
Assertions.assertThat(

0 commit comments

Comments
 (0)