Skip to content

Commit b03fd05

Browse files
authored
[Pick] Pick flink's date data type (#591)
1 parent 31e10ed commit b03fd05

1 file changed

Lines changed: 9 additions & 0 deletions

File tree

flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@
3131
import org.apache.flink.table.data.RowData;
3232
import org.apache.flink.table.types.DataType;
3333
import org.apache.flink.table.types.logical.LogicalType;
34+
import org.apache.flink.table.types.logical.LogicalTypeRoot;
3435
import org.apache.flink.types.RowKind;
3536
import org.apache.flink.util.CollectionUtil;
3637
import org.slf4j.Logger;
3738
import org.slf4j.LoggerFactory;
3839

3940
import java.io.IOException;
4041
import java.nio.charset.StandardCharsets;
42+
import java.sql.Date;
43+
import java.time.LocalDate;
4144
import java.util.ArrayList;
4245
import java.util.Arrays;
4346
import java.util.HashMap;
@@ -80,6 +83,7 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
8083
private final String[] fieldNames;
8184
private final boolean jsonFormat;
8285
private final RowData.FieldGetter[] fieldGetters;
86+
private final LogicalType[] logicalTypes;
8387
private final List batch = new ArrayList<>();
8488
private long batchBytes = 0L;
8589
private String fieldDelimiter;
@@ -111,6 +115,7 @@ public DorisDynamicOutputFormat(DorisOptions option,
111115

112116

113117
handleStreamloadProp();
118+
this.logicalTypes = logicalTypes;
114119
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
115120
for (int i = 0; i < logicalTypes.length; i++) {
116121
fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
@@ -244,6 +249,10 @@ private void addBatch(T row) {
244249
StringJoiner value = new StringJoiner(this.fieldDelimiter);
245250
for (int i = 0; i < rowData.getArity() && i < fieldGetters.length; ++i) {
246251
Object field = fieldGetters[i].getFieldOrNull(rowData);
252+
// Compatible date types
253+
if(field != null && LogicalTypeRoot.DATE.equals(logicalTypes[i].getTypeRoot())) {
254+
field = Date.valueOf(LocalDate.ofEpochDay((int) field));
255+
}
247256
if (jsonFormat) {
248257
String data = field != null ? field.toString() : null;
249258
valueMap.put(this.fieldNames[i], data);

0 commit comments

Comments
 (0)