Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class JsonDebeziumChangeContext implements Serializable {
private final String targetTablePrefix;
private final String targetTableSuffix;
private TableNameConverter tableNameConverter;
private String timeZone;

public JsonDebeziumChangeContext(
DorisOptions dorisOptions,
Expand Down Expand Up @@ -106,6 +107,38 @@ public JsonDebeziumChangeContext(
this.tableNameConverter = tableNameConverter;
}

public JsonDebeziumChangeContext(
DorisOptions dorisOptions,
Map<String, String> tableMapping,
String sourceTableName,
String targetDatabase,
DorisTableConfig dorisTableConfig,
ObjectMapper objectMapper,
Pattern pattern,
String lineDelimiter,
boolean ignoreUpdateBefore,
String targetTablePrefix,
String targetTableSuffix,
boolean enableDelete,
TableNameConverter tableNameConverter,
String timeZone) {
this(
dorisOptions,
tableMapping,
sourceTableName,
targetDatabase,
dorisTableConfig,
objectMapper,
pattern,
lineDelimiter,
ignoreUpdateBefore,
targetTablePrefix,
targetTableSuffix,
enableDelete,
tableNameConverter);
this.timeZone = timeZone;
}

public DorisOptions getDorisOptions() {
return dorisOptions;
}
Expand Down Expand Up @@ -169,4 +202,8 @@ public DorisTableConfig getDorisTableConf() {
public void setTableNameConverter(TableNameConverter tableNameConverter) {
this.tableNameConverter = tableNameConverter;
}

public String getTimeZone() {
return timeZone;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ public class MongoDBDatabaseSync extends DatabaseSync {
.noDefaultValue()
.withDescription("Table name of the Mongo database to monitor.");

public static final ConfigOption<String> DATE_TIMEZONE =
ConfigOptions.key("date.timezone")
.stringType()
.defaultValue("UTC")
.withDescription(
"The time zone used to convert MongoDB date/timestamp fields. Defaults to UTC.");
public MongoDBDatabaseSync() throws SQLException {}

@Override
Expand Down Expand Up @@ -222,6 +228,9 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
default:
throw new IllegalArgumentException("Unsupported startup mode: " + startupMode);
}

MongoDateConverter.setTimeZone(config.get(DATE_TIMEZONE));

MongoDBSource<String> mongoDBSource = mongoDBSourceBuilder.deserializer(schema).build();
return env.fromSource(mongoDBSource, WatermarkStrategy.noWatermarks(), "MongoDB Source");
}
Expand All @@ -243,6 +252,7 @@ public DorisRecordSerializer<String> buildSchemaSerializer(
.setTargetTablePrefix(tablePrefix)
.setTargetTableSuffix(tableSuffix)
.setTableNameConverter(converter)
.setTimeZone(config.get(DATE_TIMEZONE))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,34 @@

package org.apache.doris.flink.tools.cdc.mongodb;

import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

public class MongoDateConverter {
private static final String DEFAULT_TIME_ZONE = "UTC";
private static volatile String timeZone = DEFAULT_TIME_ZONE;

private static final ThreadLocal<DateTimeFormatter> dateFormatterThreadLocal =
ThreadLocal.withInitial(
() -> DateTimeFormatter.ofPattern(DatabaseSyncConfig.DATETIME_MICRO_FORMAT));
ThreadLocal.withInitial(() -> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"));

public static void setTimeZone(String timeZone) {
MongoDateConverter.timeZone = timeZone;
}

public static String getTimeZone() {
return timeZone;
}

public static String convertTimestampToString(long timestamp) {
return convertTimestampToString(timestamp, timeZone);
}

public static String convertTimestampToString(long timestamp, String timeZone) {
Instant instant = Instant.ofEpochMilli(timestamp);
LocalDateTime localDateTime =
LocalDateTime.ofInstant(instant, ZoneId.of(DatabaseSyncConfig.TIME_ZONE_SHANGHAI));
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.of(timeZone));
return dateFormatterThreadLocal.get().format(localDateTime);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize
private String targetTablePrefix;
private String targetTableSuffix;
private TableNameConverter tableNameConverter;
private String timeZone;

public MongoDBJsonDebeziumSchemaSerializer(
DorisOptions dorisOptions,
Expand All @@ -75,7 +76,8 @@ public MongoDBJsonDebeziumSchemaSerializer(
String targetDatabase,
String targetTablePrefix,
String targetTableSuffix,
TableNameConverter tableNameConverter) {
TableNameConverter tableNameConverter,
String timeZone) {
this.dorisOptions = dorisOptions;
this.pattern = pattern;
this.sourceTableName = sourceTableName;
Expand All @@ -89,6 +91,7 @@ public MongoDBJsonDebeziumSchemaSerializer(
this.targetTablePrefix = targetTablePrefix;
this.targetTableSuffix = targetTableSuffix;
this.tableNameConverter = tableNameConverter;
this.timeZone = timeZone;
if (executionOptions != null) {
this.lineDelimiter =
executionOptions
Expand All @@ -114,8 +117,9 @@ private void init() {
ignoreUpdateBefore,
targetTablePrefix,
targetTableSuffix,
enableDelete);
changeContext.setTableNameConverter(tableNameConverter);
enableDelete,
tableNameConverter,
timeZone);
this.dataChange = new MongoJsonDebeziumDataChange(changeContext);
this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext);
}
Expand Down Expand Up @@ -149,6 +153,7 @@ public static class Builder {
private String targetTablePrefix = "";
private String targetTableSuffix = "";
private TableNameConverter tableNameConverter;
private String timeZone;

public MongoDBJsonDebeziumSchemaSerializer.Builder setDorisOptions(
DorisOptions dorisOptions) {
Expand Down Expand Up @@ -216,6 +221,11 @@ public MongoDBJsonDebeziumSchemaSerializer.Builder setTableNameConverter(
return this;
}

public MongoDBJsonDebeziumSchemaSerializer.Builder setTimeZone(String timeZone) {
this.timeZone = timeZone;
return this;
}

public MongoDBJsonDebeziumSchemaSerializer build() {
return new MongoDBJsonDebeziumSchemaSerializer(
dorisOptions,
Expand All @@ -227,7 +237,8 @@ public MongoDBJsonDebeziumSchemaSerializer build() {
targetDatabase,
targetTablePrefix,
targetTableSuffix,
tableNameConverter);
tableNameConverter,
timeZone);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -174,7 +176,8 @@ private void formatSpecialFieldData(JsonNode logData) {
: jsonNode.asLong();
String formattedDate =
MongoDateConverter.convertTimestampToString(
timestamp);
timestamp,
changeContext.getTimeZone());
((ObjectNode) logData).put(fieldName, formattedDate);
break;
case DECIMAL_FIELD:
Expand All @@ -189,6 +192,18 @@ private void formatSpecialFieldData(JsonNode logData) {
break;
}
}
} else if (fieldNode.isTextual()) {
String text = fieldNode.asText();
try {
Instant instant = Instant.parse(text);
String formattedDate =
MongoDateConverter.convertTimestampToString(
instant.toEpochMilli(),
changeContext.getTimeZone());
((ObjectNode) logData).put(fieldName, formattedDate);
} catch (DateTimeParseException e) {
// not an ISO date string, ignore
}
}
});
}
Expand Down