Skip to content

Commit cbfb679

Browse files
committed
[Fix](mongodb-cdc) Fix fullDocument JSON string parsing error
Debezium may serialize fullDocument as a JSON string (TextNode) instead of an ObjectNode. Added a check to parse the textual fullDocument before extracting row data, preventing ClassCastException or deserialization errors.
1 parent dd4b7ef commit cbfb679

1 file changed

Lines changed: 8 additions & 0 deletions

File tree

flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.fasterxml.jackson.databind.ObjectMapper;
2525
import com.fasterxml.jackson.databind.node.NullNode;
2626
import org.apache.doris.flink.cfg.DorisOptions;
27+
import org.apache.doris.flink.exception.DorisRuntimeException;
2728
import org.apache.doris.flink.sink.writer.ChangeEvent;
2829
import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
2930
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange;
@@ -127,6 +128,13 @@ public Map<String, Object> extractBeforeRow(JsonNode record) {
127128
@Override
128129
public Map<String, Object> extractAfterRow(JsonNode recordRoot) {
129130
JsonNode dataNode = recordRoot.get(FIELD_DATA);
131+
if (dataNode != null && dataNode.isTextual()) {
132+
try {
133+
dataNode = objectMapper.readTree(dataNode.asText());
134+
} catch (IOException e) {
135+
throw new DorisRuntimeException("Failed to parse fullDocument JSON", e);
136+
}
137+
}
130138
return JsonNodeExtractUtil.extractAfterRow(dataNode, objectMapper);
131139
}
132140

0 commit comments

Comments
 (0)