diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java index 03a26dbfe..993dc6514 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java @@ -31,6 +31,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.types.RowKind; import org.apache.flink.util.CollectionUtil; import org.slf4j.Logger; @@ -38,6 +39,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.sql.Date; +import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -80,6 +83,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat { private final String[] fieldNames; private final boolean jsonFormat; private final RowData.FieldGetter[] fieldGetters; + private final LogicalType[] logicalTypes; private final List batch = new ArrayList<>(); private long batchBytes = 0L; private String fieldDelimiter; @@ -111,6 +115,7 @@ public DorisDynamicOutputFormat(DorisOptions option, handleStreamloadProp(); + this.logicalTypes = logicalTypes; this.fieldGetters = new RowData.FieldGetter[logicalTypes.length]; for (int i = 0; i < logicalTypes.length; i++) { fieldGetters[i] = createFieldGetter(logicalTypes[i], i); @@ -244,6 +249,10 @@ private void addBatch(T row) { StringJoiner value = new StringJoiner(this.fieldDelimiter); for (int i = 0; i < rowData.getArity() && i < fieldGetters.length; ++i) { Object field = fieldGetters[i].getFieldOrNull(rowData); + // Compatible date types + if(field != null && LogicalTypeRoot.DATE.equals(logicalTypes[i].getTypeRoot())) { + field = Date.valueOf(LocalDate.ofEpochDay((int) field)); + } if (jsonFormat) { String data = field != null ? field.toString() : null; valueMap.put(this.fieldNames[i], data);