Skip to content

Commit d5917d0

Browse files
committed
[Feature](mongodb-cdc) Add date.timezone option for MongoDB to Doris time conversion
Introduces a new date.timezone configuration option (default UTC) for mongodb-sync-database, allowing users to specify the target timezone when converting MongoDB date/timestamp fields to Doris DATETIME. Key changes: 1. Add DATE_TIMEZONE ConfigOption to MongoDBDatabaseSync. 2. Handle Debezium's default ISO 8601 string serialization for BSON Date (e.g. \"2024-06-04T08:03:37Z\") via Instant.parse() in formatSpecialFieldData(). 3. Replace hardcoded Asia/Shanghai in MongoDateConverter with configurable timezone passed through JsonDebeziumChangeContext (Serializable).
1 parent dd4b7ef commit d5917d0

5 files changed

Lines changed: 96 additions & 11 deletions

File tree

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class JsonDebeziumChangeContext implements Serializable {
4848
private final String targetTablePrefix;
4949
private final String targetTableSuffix;
5050
private TableNameConverter tableNameConverter;
51+
private String timeZone;
5152

5253
public JsonDebeziumChangeContext(
5354
DorisOptions dorisOptions,
@@ -106,6 +107,38 @@ public JsonDebeziumChangeContext(
106107
this.tableNameConverter = tableNameConverter;
107108
}
108109

110+
public JsonDebeziumChangeContext(
111+
DorisOptions dorisOptions,
112+
Map<String, String> tableMapping,
113+
String sourceTableName,
114+
String targetDatabase,
115+
DorisTableConfig dorisTableConfig,
116+
ObjectMapper objectMapper,
117+
Pattern pattern,
118+
String lineDelimiter,
119+
boolean ignoreUpdateBefore,
120+
String targetTablePrefix,
121+
String targetTableSuffix,
122+
boolean enableDelete,
123+
TableNameConverter tableNameConverter,
124+
String timeZone) {
125+
this(
126+
dorisOptions,
127+
tableMapping,
128+
sourceTableName,
129+
targetDatabase,
130+
dorisTableConfig,
131+
objectMapper,
132+
pattern,
133+
lineDelimiter,
134+
ignoreUpdateBefore,
135+
targetTablePrefix,
136+
targetTableSuffix,
137+
enableDelete,
138+
tableNameConverter);
139+
this.timeZone = timeZone;
140+
}
141+
109142
public DorisOptions getDorisOptions() {
110143
return dorisOptions;
111144
}
@@ -169,4 +202,8 @@ public DorisTableConfig getDorisTableConf() {
169202
public void setTableNameConverter(TableNameConverter tableNameConverter) {
170203
this.tableNameConverter = tableNameConverter;
171204
}
205+
206+
public String getTimeZone() {
207+
return timeZone;
208+
}
172209
}

flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ public class MongoDBDatabaseSync extends DatabaseSync {
8686
.noDefaultValue()
8787
.withDescription("Table name of the Mongo database to monitor.");
8888

89+
public static final ConfigOption<String> DATE_TIMEZONE =
90+
ConfigOptions.key("date.timezone")
91+
.stringType()
92+
.defaultValue("UTC")
93+
.withDescription(
94+
"The time zone used to convert MongoDB date/timestamp fields. Defaults to UTC.");
8995
public MongoDBDatabaseSync() throws SQLException {}
9096

9197
@Override
@@ -222,6 +228,9 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
222228
default:
223229
throw new IllegalArgumentException("Unsupported startup mode: " + startupMode);
224230
}
231+
232+
MongoDateConverter.setTimeZone(config.get(DATE_TIMEZONE));
233+
225234
MongoDBSource<String> mongoDBSource = mongoDBSourceBuilder.deserializer(schema).build();
226235
return env.fromSource(mongoDBSource, WatermarkStrategy.noWatermarks(), "MongoDB Source");
227236
}
@@ -243,6 +252,7 @@ public DorisRecordSerializer<String> buildSchemaSerializer(
243252
.setTargetTablePrefix(tablePrefix)
244253
.setTargetTableSuffix(tableSuffix)
245254
.setTableNameConverter(converter)
255+
.setTimeZone(config.get(DATE_TIMEZONE))
246256
.build();
247257
}
248258

flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverter.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,34 @@
1717

1818
package org.apache.doris.flink.tools.cdc.mongodb;
1919

20-
import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig;
21-
2220
import java.time.Instant;
2321
import java.time.LocalDateTime;
2422
import java.time.ZoneId;
2523
import java.time.format.DateTimeFormatter;
2624

2725
public class MongoDateConverter {
26+
private static final String DEFAULT_TIME_ZONE = "UTC";
27+
private static volatile String timeZone = DEFAULT_TIME_ZONE;
28+
2829
private static final ThreadLocal<DateTimeFormatter> dateFormatterThreadLocal =
29-
ThreadLocal.withInitial(
30-
() -> DateTimeFormatter.ofPattern(DatabaseSyncConfig.DATETIME_MICRO_FORMAT));
30+
ThreadLocal.withInitial(() -> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"));
31+
32+
public static void setTimeZone(String timeZone) {
33+
MongoDateConverter.timeZone = timeZone;
34+
}
35+
36+
public static String getTimeZone() {
37+
return timeZone;
38+
}
3139

3240
public static String convertTimestampToString(long timestamp) {
41+
return convertTimestampToString(timestamp, timeZone);
42+
}
43+
44+
public static String convertTimestampToString(long timestamp, String timeZone) {
3345
Instant instant = Instant.ofEpochMilli(timestamp);
34-
LocalDateTime localDateTime =
35-
LocalDateTime.ofInstant(instant, ZoneId.of(DatabaseSyncConfig.TIME_ZONE_SHANGHAI));
46+
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.of(timeZone));
3647
return dateFormatterThreadLocal.get().format(localDateTime);
3748
}
49+
3850
}

flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize
6464
private String targetTablePrefix;
6565
private String targetTableSuffix;
6666
private TableNameConverter tableNameConverter;
67+
private String timeZone;
6768

6869
public MongoDBJsonDebeziumSchemaSerializer(
6970
DorisOptions dorisOptions,
@@ -75,7 +76,8 @@ public MongoDBJsonDebeziumSchemaSerializer(
7576
String targetDatabase,
7677
String targetTablePrefix,
7778
String targetTableSuffix,
78-
TableNameConverter tableNameConverter) {
79+
TableNameConverter tableNameConverter,
80+
String timeZone) {
7981
this.dorisOptions = dorisOptions;
8082
this.pattern = pattern;
8183
this.sourceTableName = sourceTableName;
@@ -89,6 +91,7 @@ public MongoDBJsonDebeziumSchemaSerializer(
8991
this.targetTablePrefix = targetTablePrefix;
9092
this.targetTableSuffix = targetTableSuffix;
9193
this.tableNameConverter = tableNameConverter;
94+
this.timeZone = timeZone;
9295
if (executionOptions != null) {
9396
this.lineDelimiter =
9497
executionOptions
@@ -114,8 +117,9 @@ private void init() {
114117
ignoreUpdateBefore,
115118
targetTablePrefix,
116119
targetTableSuffix,
117-
enableDelete);
118-
changeContext.setTableNameConverter(tableNameConverter);
120+
enableDelete,
121+
tableNameConverter,
122+
timeZone);
119123
this.dataChange = new MongoJsonDebeziumDataChange(changeContext);
120124
this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext);
121125
}
@@ -149,6 +153,7 @@ public static class Builder {
149153
private String targetTablePrefix = "";
150154
private String targetTableSuffix = "";
151155
private TableNameConverter tableNameConverter;
156+
private String timeZone;
152157

153158
public MongoDBJsonDebeziumSchemaSerializer.Builder setDorisOptions(
154159
DorisOptions dorisOptions) {
@@ -216,6 +221,11 @@ public MongoDBJsonDebeziumSchemaSerializer.Builder setTableNameConverter(
216221
return this;
217222
}
218223

224+
public MongoDBJsonDebeziumSchemaSerializer.Builder setTimeZone(String timeZone) {
225+
this.timeZone = timeZone;
226+
return this;
227+
}
228+
219229
public MongoDBJsonDebeziumSchemaSerializer build() {
220230
return new MongoDBJsonDebeziumSchemaSerializer(
221231
dorisOptions,
@@ -227,7 +237,8 @@ public MongoDBJsonDebeziumSchemaSerializer build() {
227237
targetDatabase,
228238
targetTablePrefix,
229239
targetTableSuffix,
230-
tableNameConverter);
240+
tableNameConverter,
241+
timeZone);
231242
}
232243
}
233244
}

flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import org.slf4j.LoggerFactory;
4444

4545
import java.io.IOException;
46+
import java.time.Instant;
47+
import java.time.format.DateTimeParseException;
4648
import java.util.Arrays;
4749
import java.util.HashMap;
4850
import java.util.HashSet;
@@ -174,7 +176,8 @@ private void formatSpecialFieldData(JsonNode logData) {
174176
: jsonNode.asLong();
175177
String formattedDate =
176178
MongoDateConverter.convertTimestampToString(
177-
timestamp);
179+
timestamp,
180+
changeContext.getTimeZone());
178181
((ObjectNode) logData).put(fieldName, formattedDate);
179182
break;
180183
case DECIMAL_FIELD:
@@ -189,6 +192,18 @@ private void formatSpecialFieldData(JsonNode logData) {
189192
break;
190193
}
191194
}
195+
} else if (fieldNode.isTextual()) {
196+
String text = fieldNode.asText();
197+
try {
198+
Instant instant = Instant.parse(text);
199+
String formattedDate =
200+
MongoDateConverter.convertTimestampToString(
201+
instant.toEpochMilli(),
202+
changeContext.getTimeZone());
203+
((ObjectNode) logData).put(fieldName, formattedDate);
204+
} catch (DateTimeParseException e) {
205+
// not an ISO date string, ignore
206+
}
192207
}
193208
});
194209
}

0 commit comments

Comments
 (0)