Skip to content

Commit 0f55773

Browse files
committed
[FLINK-38236][mongodb] Add metadata support for full document in MongoDB CDC connector
1 parent aac7d8d commit 0f55773

4 files changed

Lines changed: 32 additions & 2 deletions

File tree

docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,11 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能
383383
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
384384
<td>它指示在数据库中进行更改的时间。 <br>如果记录是从表的快照而不是改变流中读取的,该值将始终为0。</td>
385385
</tr>
386+
<tr>
387+
<td>full_document</td>
388+
<td>STRING</td>
389+
<td>变更事件的完整文档 JSON 字符串原始数据。对于 insert 事件,这是新文档。对于 update 事件,这是更新后的完整文档。对于 delete 事件,该值为 null。</td>
390+
</tr>
386391
<tr>
387392
<td>row_kind</td>
388393
<td>STRING NOT NULL</td>
@@ -398,6 +403,7 @@ CREATE TABLE products (
398403
db_name STRING METADATA FROM 'database_name' VIRTUAL,
399404
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
400405
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
406+
raw_data STRING METADATA FROM 'full_document' VIRTUAL,
401407
operation STRING METADATA FROM 'row_kind' VIRTUAL,
402408
_id STRING, // 必须声明
403409
name STRING,

docs/content/docs/connectors/flink-sources/mongodb-cdc.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,11 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
408408
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
409409
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the change stream, the value is always 0.</td>
410410
</tr>
411+
<tr>
412+
<td>full_document</td>
413+
<td>STRING</td>
414+
<td>The full document of the change event as a JSON string raw data. For insert events, this is the new document. For update events, this is the full document after the update. For delete events, this is null.</td>
415+
</tr>
411416
<tr>
412417
<td>row_kind</td>
413418
<td>STRING NOT NULL</td>
@@ -424,6 +429,7 @@ CREATE TABLE products (
424429
db_name STRING METADATA FROM 'database_name' VIRTUAL,
425430
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
426431
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
432+
raw_data STRING METADATA FROM 'full_document' VIRTUAL,
427433
operation STRING METADATA FROM 'row_kind' VIRTUAL,
428434
_id STRING, // must be declared
429435
name STRING,

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,21 @@ public Object read(SourceRecord record) {
8585
}
8686
}),
8787

88+
/** It indicates the full document as string raw data. */
89+
FULL_DOCUMENT(
90+
"full_document",
91+
DataTypes.STRING().nullable(),
92+
new MetadataConverter() {
93+
private static final long serialVersionUID = 1L;
94+
95+
@Override
96+
public Object read(SourceRecord record) {
97+
Struct value = (Struct) record.value();
98+
String fullDocString = value.getString(MongoDBEnvelope.FULL_DOCUMENT_FIELD);
99+
return fullDocString != null ? StringData.fromString(fullDocString) : null;
100+
}
101+
}),
102+
88103
/**
89104
* It indicates the row kind of the changelog. '+I' means INSERT message, '-D' means DELETE
90105
* message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ class MongoDBTableFactoryTest {
8989
Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true),
9090
Column.metadata(
9191
"_database_name", DataTypes.STRING(), "database_name", true),
92+
Column.metadata(
93+
"_full_document", DataTypes.STRING(), "full_document", true),
9294
Column.metadata("_row_kind", DataTypes.STRING(), "row_kind", true)),
9395
Collections.emptyList(),
9496
UniqueConstraint.primaryKey("pk", Collections.singletonList("_id")));
@@ -227,7 +229,7 @@ void testMetadataColumns() {
227229
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
228230
MongoDBTableSource mongoDBSource = (MongoDBTableSource) actualSource;
229231
mongoDBSource.applyReadableMetadata(
230-
Arrays.asList("op_ts", "database_name", "row_kind"),
232+
Arrays.asList("op_ts", "database_name", "full_document", "row_kind"),
231233
SCHEMA_WITH_METADATA.toSourceRowDataType());
232234
actualSource = mongoDBSource.copy();
233235

@@ -262,7 +264,8 @@ void testMetadataColumns() {
262264
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
263265

264266
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
265-
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "row_kind");
267+
expectedSource.metadataKeys =
268+
Arrays.asList("op_ts", "database_name", "full_document", "row_kind");
266269

267270
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
268271

0 commit comments

Comments
 (0)