|
31 | 31 | import org.apache.flink.table.data.RowData; |
32 | 32 | import org.apache.flink.table.types.DataType; |
33 | 33 | import org.apache.flink.table.types.logical.LogicalType; |
| 34 | +import org.apache.flink.table.types.logical.LogicalTypeRoot; |
34 | 35 | import org.apache.flink.types.RowKind; |
35 | 36 | import org.apache.flink.util.CollectionUtil; |
36 | 37 | import org.slf4j.Logger; |
37 | 38 | import org.slf4j.LoggerFactory; |
38 | 39 |
|
39 | 40 | import java.io.IOException; |
40 | 41 | import java.nio.charset.StandardCharsets; |
| 42 | +import java.sql.Date; |
| 43 | +import java.time.LocalDate; |
41 | 44 | import java.util.ArrayList; |
42 | 45 | import java.util.Arrays; |
43 | 46 | import java.util.HashMap; |
@@ -80,6 +83,7 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { |
80 | 83 | private final String[] fieldNames; |
81 | 84 | private final boolean jsonFormat; |
82 | 85 | private final RowData.FieldGetter[] fieldGetters; |
| 86 | + private final LogicalType[] logicalTypes; |
83 | 87 | private final List batch = new ArrayList<>(); |
84 | 88 | private long batchBytes = 0L; |
85 | 89 | private String fieldDelimiter; |
@@ -111,6 +115,7 @@ public DorisDynamicOutputFormat(DorisOptions option, |
111 | 115 |
|
112 | 116 |
|
113 | 117 | handleStreamloadProp(); |
| 118 | + this.logicalTypes = logicalTypes; |
114 | 119 | this.fieldGetters = new RowData.FieldGetter[logicalTypes.length]; |
115 | 120 | for (int i = 0; i < logicalTypes.length; i++) { |
116 | 121 | fieldGetters[i] = createFieldGetter(logicalTypes[i], i); |
@@ -244,6 +249,11 @@ private void addBatch(T row) { |
244 | 249 | StringJoiner value = new StringJoiner(this.fieldDelimiter); |
245 | 250 | for (int i = 0; i < rowData.getArity() && i < fieldGetters.length; ++i) { |
246 | 251 | Object field = fieldGetters[i].getFieldOrNull(rowData); |
| 252 | + LogicalType logicalType = logicalTypes[i]; |
| 253 | + // Compatible date types |
| 254 | + if(LogicalTypeRoot.DATE.equals(logicalType.getTypeRoot())) { |
| 255 | + field = Date.valueOf(LocalDate.ofEpochDay((int) field)); |
| 256 | + } |
247 | 257 | if (jsonFormat) { |
248 | 258 | String data = field != null ? field.toString() : null; |
249 | 259 | valueMap.put(this.fieldNames[i], data); |
|
0 commit comments