diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java index c98111893..d117c7933 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java @@ -48,6 +48,7 @@ public class JsonDebeziumChangeContext implements Serializable { private final String targetTablePrefix; private final String targetTableSuffix; private TableNameConverter tableNameConverter; + private String timeZone; public JsonDebeziumChangeContext( DorisOptions dorisOptions, @@ -106,6 +107,38 @@ public JsonDebeziumChangeContext( this.tableNameConverter = tableNameConverter; } + public JsonDebeziumChangeContext( + DorisOptions dorisOptions, + Map tableMapping, + String sourceTableName, + String targetDatabase, + DorisTableConfig dorisTableConfig, + ObjectMapper objectMapper, + Pattern pattern, + String lineDelimiter, + boolean ignoreUpdateBefore, + String targetTablePrefix, + String targetTableSuffix, + boolean enableDelete, + TableNameConverter tableNameConverter, + String timeZone) { + this( + dorisOptions, + tableMapping, + sourceTableName, + targetDatabase, + dorisTableConfig, + objectMapper, + pattern, + lineDelimiter, + ignoreUpdateBefore, + targetTablePrefix, + targetTableSuffix, + enableDelete, + tableNameConverter); + this.timeZone = timeZone; + } + public DorisOptions getDorisOptions() { return dorisOptions; } @@ -169,4 +202,8 @@ public DorisTableConfig getDorisTableConf() { public void setTableNameConverter(TableNameConverter tableNameConverter) { this.tableNameConverter = tableNameConverter; } + + public String getTimeZone() { + return timeZone; + } } diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java index b77ec0e3e..2e90fa6f9 100644 --- a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java +++ b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java @@ -86,6 +86,12 @@ public class MongoDBDatabaseSync extends DatabaseSync { .noDefaultValue() .withDescription("Table name of the Mongo database to monitor."); + public static final ConfigOption DATE_TIMEZONE = + ConfigOptions.key("date.timezone") + .stringType() + .defaultValue("UTC") + .withDescription( + "The time zone used to convert MongoDB date/timestamp fields. Defaults to UTC."); public MongoDBDatabaseSync() throws SQLException {} @Override @@ -222,6 +228,9 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { default: throw new IllegalArgumentException("Unsupported startup mode: " + startupMode); } + + MongoDateConverter.setTimeZone(config.get(DATE_TIMEZONE)); + MongoDBSource mongoDBSource = mongoDBSourceBuilder.deserializer(schema).build(); return env.fromSource(mongoDBSource, WatermarkStrategy.noWatermarks(), "MongoDB Source"); } @@ -243,6 +252,7 @@ public DorisRecordSerializer buildSchemaSerializer( .setTargetTablePrefix(tablePrefix) .setTargetTableSuffix(tableSuffix) .setTableNameConverter(converter) + .setTimeZone(config.get(DATE_TIMEZONE)) .build(); } diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverter.java b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverter.java index 614fa092a..878e9d153 100644 --- a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverter.java +++ b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverter.java @@ -17,22 +17,34 @@ package org.apache.doris.flink.tools.cdc.mongodb; -import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig; - import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; public class MongoDateConverter { + private static final String DEFAULT_TIME_ZONE = "UTC"; + private static volatile String timeZone = DEFAULT_TIME_ZONE; + private static final ThreadLocal dateFormatterThreadLocal = - ThreadLocal.withInitial( - () -> DateTimeFormatter.ofPattern(DatabaseSyncConfig.DATETIME_MICRO_FORMAT)); + ThreadLocal.withInitial(() -> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")); + + public static void setTimeZone(String timeZone) { + MongoDateConverter.timeZone = timeZone; + } + + public static String getTimeZone() { + return timeZone; + } public static String convertTimestampToString(long timestamp) { + return convertTimestampToString(timestamp, timeZone); + } + + public static String convertTimestampToString(long timestamp, String timeZone) { Instant instant = Instant.ofEpochMilli(timestamp); - LocalDateTime localDateTime = - LocalDateTime.ofInstant(instant, ZoneId.of(DatabaseSyncConfig.TIME_ZONE_SHANGHAI)); + LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.of(timeZone)); return dateFormatterThreadLocal.get().format(localDateTime); } + } diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java index dec06e91b..2c52f6002 100644 --- a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java @@ -64,6 +64,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize private String targetTablePrefix; private String targetTableSuffix; private TableNameConverter tableNameConverter; + private String timeZone; public MongoDBJsonDebeziumSchemaSerializer( DorisOptions dorisOptions, @@ -75,7 +76,8 @@ public MongoDBJsonDebeziumSchemaSerializer( String targetDatabase, String targetTablePrefix, String targetTableSuffix, - TableNameConverter tableNameConverter) { + TableNameConverter tableNameConverter, + String timeZone) { this.dorisOptions = dorisOptions; this.pattern = pattern; this.sourceTableName = sourceTableName; @@ -89,6 +91,7 @@ public MongoDBJsonDebeziumSchemaSerializer( this.targetTablePrefix = targetTablePrefix; this.targetTableSuffix = targetTableSuffix; this.tableNameConverter = tableNameConverter; + this.timeZone = timeZone; if (executionOptions != null) { this.lineDelimiter = executionOptions @@ -114,8 +117,9 @@ private void init() { ignoreUpdateBefore, targetTablePrefix, targetTableSuffix, - enableDelete); - changeContext.setTableNameConverter(tableNameConverter); + enableDelete, + tableNameConverter, + timeZone); this.dataChange = new MongoJsonDebeziumDataChange(changeContext); this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext); } @@ -149,6 +153,7 @@ public static class Builder { private String targetTablePrefix = ""; private String targetTableSuffix = ""; private TableNameConverter tableNameConverter; + private String timeZone; public MongoDBJsonDebeziumSchemaSerializer.Builder setDorisOptions( DorisOptions dorisOptions) { @@ -216,6 +221,11 @@ public MongoDBJsonDebeziumSchemaSerializer.Builder setTableNameConverter( return this; } + public MongoDBJsonDebeziumSchemaSerializer.Builder setTimeZone(String timeZone) { + this.timeZone = timeZone; + return this; + } + public MongoDBJsonDebeziumSchemaSerializer build() { return new MongoDBJsonDebeziumSchemaSerializer( dorisOptions, @@ -227,7 +237,8 @@ public MongoDBJsonDebeziumSchemaSerializer build() { targetDatabase, targetTablePrefix, targetTableSuffix, - tableNameConverter); + tableNameConverter, + timeZone); } } } diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java index cfcf53f21..3a835f1ad 100644 --- a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java +++ b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java @@ -43,6 +43,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Instant; +import java.time.format.DateTimeParseException; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -174,7 +176,8 @@ private void formatSpecialFieldData(JsonNode logData) { : jsonNode.asLong(); String formattedDate = MongoDateConverter.convertTimestampToString( - timestamp); + timestamp, + changeContext.getTimeZone()); ((ObjectNode) logData).put(fieldName, formattedDate); break; case DECIMAL_FIELD: @@ -189,6 +192,18 @@ private void formatSpecialFieldData(JsonNode logData) { break; } } + } else if (fieldNode.isTextual()) { + String text = fieldNode.asText(); + try { + Instant instant = Instant.parse(text); + String formattedDate = + MongoDateConverter.convertTimestampToString( + instant.toEpochMilli(), + changeContext.getTimeZone()); + ((ObjectNode) logData).put(fieldName, formattedDate); + } catch (DateTimeParseException e) { + // not an ISO date string, ignore + } } }); }