Skip to content

Commit 04f63ae

Browse files
chengcongchinaMrart
authored andcommitted
[FLINK-39209][doris] Fix time data type serialiazation when sink to doris with pipeline connector (apache#4312)
1 parent 841f2a8 commit 04f63ae

5 files changed

Lines changed: 69 additions & 11 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ public class DorisEventSerializer implements DorisRecordSerializer<Event> {
6767
public static final DateTimeFormatter DATE_TIME_FORMATTER =
6868
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
6969

70+
/** Format TIME type data without precision. */
71+
public static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss");
72+
73+
/** Format TIME type data with millisecond precision. */
74+
public static final DateTimeFormatter TIME_WITH_MILLISECOND_FORMATTER =
75+
DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
76+
7077
/** ZoneId from pipeline config to support timestamp with local time zone. */
7178
public final ZoneId pipelineZoneId;
7279

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,18 @@ static SerializationConverter createExternalConverter(DataType type, ZoneId pipe
117117
final int zonedP = ((ZonedTimestampType) type).getPrecision();
118118
return (index, val) -> val.getTimestamp(index, zonedP).toTimestamp();
119119
case TIME_WITHOUT_TIME_ZONE:
120-
return (index, val) -> val.getTime(index).toLocalTime();
120+
return (index, val) -> {
121+
int precision = DataTypeChecks.getPrecision(type);
122+
if (precision == 0) {
123+
return val.getTime(index)
124+
.toLocalTime()
125+
.format(DorisEventSerializer.TIME_FORMATTER);
126+
} else {
127+
return val.getTime(index)
128+
.toLocalTime()
129+
.format(DorisEventSerializer.TIME_WITH_MILLISECOND_FORMATTER);
130+
}
131+
};
121132
case ARRAY:
122133
return (index, val) -> convertArrayData(val.getArray(index), type);
123134
case MAP:

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.cdc.common.configuration.Configuration;
2121
import org.apache.flink.cdc.common.data.DateData;
22+
import org.apache.flink.cdc.common.data.TimeData;
2223
import org.apache.flink.cdc.common.data.TimestampData;
2324
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
2425
import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -41,6 +42,7 @@
4142
import java.time.Instant;
4243
import java.time.LocalDate;
4344
import java.time.LocalDateTime;
45+
import java.time.LocalTime;
4446
import java.time.ZoneId;
4547
import java.util.HashMap;
4648
import java.util.Map;
@@ -67,6 +69,38 @@ public class DorisEventSerializerTest {
6769
private static final BinaryRecordDataGenerator RECORD_DATA_GENERATOR =
6870
new BinaryRecordDataGenerator(((RowType) SCHEMA.toRowDataType()));
6971

72+
@Test
73+
public void testDataChangeEventWithTimeDataType() throws IOException {
74+
Schema schema =
75+
Schema.newBuilder()
76+
.physicalColumn("id_", DataTypes.BIGINT().notNull())
77+
.physicalColumn("time_0_", DataTypes.TIME(0))
78+
.physicalColumn("time_3_", DataTypes.TIME(3))
79+
.primaryKey("id_")
80+
.build();
81+
BinaryRecordDataGenerator generator =
82+
new BinaryRecordDataGenerator(((RowType) schema.toRowDataType()));
83+
CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_ID, schema);
84+
DataChangeEvent dataChangeEvent =
85+
DataChangeEvent.insertEvent(
86+
TABLE_ID,
87+
generator.generate(
88+
new Object[] {
89+
1L,
90+
TimeData.fromLocalTime(LocalTime.of(19, 43, 17)),
91+
TimeData.fromLocalTime(LocalTime.of(21, 45, 3, 123000000)),
92+
}));
93+
94+
dorisEventSerializer = new DorisEventSerializer(ZoneId.of("UTC"), new Configuration());
95+
dorisEventSerializer.serialize(createTableEvent);
96+
DorisRecord dorisRecord = dorisEventSerializer.serialize(dataChangeEvent);
97+
JsonNode jsonNode = objectMapper.readTree(dorisRecord.getRow());
98+
99+
Assertions.assertThat(jsonNode.get("id_").asLong()).isEqualTo(1L);
100+
Assertions.assertThat(jsonNode.get("time_0_").asText()).isEqualTo("19:43:17");
101+
Assertions.assertThat(jsonNode.get("time_3_").asText()).isEqualTo("21:45:03.123");
102+
}
103+
70104
@Test
71105
public void testDataChangeEventWithDateTimePartitionColumn() throws IOException {
72106
Map<String, String> configMap = new HashMap<>();

0 commit comments

Comments
 (0)