diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java index 12fa8581b..fe13f293e 100644 --- a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java +++ b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.NullNode; import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.sink.writer.ChangeEvent; import org.apache.doris.flink.sink.writer.serializer.DorisRecord; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange; @@ -127,6 +128,13 @@ public Map extractBeforeRow(JsonNode record) { @Override public Map extractAfterRow(JsonNode recordRoot) { JsonNode dataNode = recordRoot.get(FIELD_DATA); + if (dataNode != null && dataNode.isTextual()) { + try { + dataNode = objectMapper.readTree(dataNode.asText()); + } catch (IOException e) { + throw new DorisRuntimeException("Failed to parse fullDocument JSON", e); + } + } return JsonNodeExtractUtil.extractAfterRow(dataNode, objectMapper); }