Skip to content

Commit 8f0052c

Browse files
authored
[Improve](cdc)(MongoDB) optimize type inference to avoid unnecessary decimal conversion (#596)
1 parent b8b7d6c commit 8f0052c

2 files changed

Lines changed: 74 additions & 1 deletion

File tree

flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,13 @@ private String determineDorisType(String fieldName, Object value) {
106106
&& existingField.getTypeString().equals(DorisType.STRING))) {
107107
return DorisType.STRING;
108108
}
109+
if (existingField != null) {
110+
String existingType = existingField.getTypeString();
111+
if (isDowngrade(existingType, dorisType)) {
112+
return existingType;
113+
}
114+
}
115+
109116
// Check and process for decimal types
110117
DecimalJudgement decimalJudgement = judgeDecimalField(fieldName, dorisType);
111118
if (DecimalJudgement.needProcessing(decimalJudgement)) {
@@ -118,6 +125,27 @@ private String determineDorisType(String fieldName, Object value) {
118125
return dorisType;
119126
}
120127

128+
/**
129+
* Check whether the newType is a downgrade from the existingType. Currently only considers
130+
* numeric types: TINYINT < SMALLINT < INT < BIGINT.
131+
*/
132+
private boolean isDowngrade(String existingType, String newType) {
133+
List<String> typeHierarchy =
134+
Arrays.asList(
135+
DorisType.TINYINT, DorisType.SMALLINT, DorisType.INT, DorisType.BIGINT);
136+
137+
int existingIndex = typeHierarchy.indexOf(existingType);
138+
int newIndex = typeHierarchy.indexOf(newType);
139+
140+
return newIndex != -1 && existingIndex != -1 && newIndex < existingIndex;
141+
}
142+
143+
/**
144+
* Determine whether the field should be treated as a decimal type: - If the existing type is
145+
* already decimal and current value is also decimal, return CERTAIN_DECIMAL. - If the field's
146+
* type is convertible (e.g., INT, BIGINT) and the current value is decimal or double, return
147+
* CONVERT_TO_DECIMAL. - Otherwise, no decimal processing is needed.
148+
*/
121149
private DecimalJudgement judgeDecimalField(String fieldName, String dorisType) {
122150
FieldSchema existingField = fields.get(fieldName);
123151
if (existingField == null) {
@@ -127,7 +155,7 @@ private DecimalJudgement judgeDecimalField(String fieldName, String dorisType) {
127155
boolean isDecimal = dorisType.startsWith(DorisType.DECIMAL);
128156
if (existDecimal && isDecimal) {
129157
return DecimalJudgement.CERTAIN_DECIMAL;
130-
} else if (CONVERT_TYPE.contains(dorisType)) {
158+
} else if (existDecimal && CONVERT_TYPE.contains(dorisType)) {
131159
return DecimalJudgement.CONVERT_TO_DECIMAL;
132160
}
133161
return DecimalJudgement.NOT_DECIMAL;

flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,4 +189,49 @@ public void replaceDecimalTypeIfNeededTest6() throws Exception {
189189
}
190190
}
191191
}
192+
193+
@Test
194+
public void testIntFieldNotConvertedToDecimal() throws Exception {
195+
ArrayList<Document> documents = new ArrayList<>();
196+
documents.add(new Document("fields1", Integer.MAX_VALUE));
197+
documents.add(new Document("fields1", 1234567));
198+
documents.add(new Document("fields1", Integer.MIN_VALUE));
199+
200+
MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", "");
201+
FieldSchema fieldSchema = mongoDBSchema.getFields().get("fields1");
202+
203+
assertEquals("INT", fieldSchema.getTypeString());
204+
}
205+
206+
@Test
207+
public void testBigIntFieldNotConvertedToDecimal() throws Exception {
208+
ArrayList<Document> documents = new ArrayList<>();
209+
documents.add(new Document("fields1", Long.MAX_VALUE));
210+
documents.add(new Document("fields1", 12233720368541346L));
211+
documents.add(new Document("fields1", Long.MIN_VALUE));
212+
213+
MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", "");
214+
Map<String, FieldSchema> fields = mongoDBSchema.getFields();
215+
216+
for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
217+
String fieldName = entry.getKey();
218+
FieldSchema fieldSchema = entry.getValue();
219+
if (fieldName.equals("fields1")) {
220+
assertEquals("BIGINT", fieldSchema.getTypeString());
221+
}
222+
}
223+
}
224+
225+
@Test
226+
public void testMixedIntAndBigIntFieldsShouldConvertToBigInt() throws Exception {
227+
ArrayList<Document> documents = new ArrayList<>();
228+
documents.add(new Document("fields1", 2147483647));
229+
documents.add(new Document("fields1", 9223372036854775807L));
230+
documents.add(new Document("fields1", 12234));
231+
232+
MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", "");
233+
FieldSchema fieldSchema = mongoDBSchema.getFields().get("fields1");
234+
235+
assertEquals("BIGINT", fieldSchema.getTypeString());
236+
}
192237
}

0 commit comments

Comments
 (0)