Skip to content

Commit 00ccd53

Browse files
committed
[FLINK-38236][mongodb] Add metadata support for full document in MongoDB CDC connector
1 parent 0f55773 commit 00ccd53

2 files changed

Lines changed: 102 additions & 7 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,7 @@ private void testMongoDBParallelSourceWithMetadataColumns(
599599
+ " database_name STRING METADATA VIRTUAL,"
600600
+ " collection_name STRING METADATA VIRTUAL,"
601601
+ " row_kind STRING METADATA VIRTUAL,"
602+
+ " full_document STRING METADATA FROM 'full_document' VIRTUAL,"
602603
+ " primary key (_id) not enforced"
603604
+ ") WITH ("
604605
+ " 'connector' = 'mongodb-cdc',"
@@ -653,16 +654,32 @@ private void testMongoDBParallelSourceWithMetadataColumns(
653654
TableResult tableResult =
654655
tEnv.executeSql(
655656
"select database_name, collection_name, row_kind, "
656-
+ "cid, name, address, phone_number from customers");
657+
+ "cid, name, address, phone_number, full_document from customers");
657658
CloseableIterator<Row> iterator = tableResult.collect();
658659
JobID jobId = tableResult.getJobClient().get().getJobID();
659660
List<String> expectedSnapshotData = new ArrayList<>();
660661
for (int i = 0; i < captureCustomerCollections.length; i++) {
661662
expectedSnapshotData.addAll(snapshotForSingleTable);
662663
}
663664

664-
assertEqualsInAnyOrder(
665-
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
665+
List<String> rawSnapshotData = fetchRows(iterator, expectedSnapshotData.size());
666+
667+
// Strip full_document (last column) for precise matching of other metadata columns
668+
List<String> snapshotWithoutFullDoc =
669+
rawSnapshotData.stream()
670+
.map(MongoDBFullChangelogITCase::stripLastColumn)
671+
.collect(Collectors.toList());
672+
assertEqualsInAnyOrder(expectedSnapshotData, snapshotWithoutFullDoc);
673+
674+
// Verify full_document is non-null and contains expected fields in snapshot records
675+
for (String row : rawSnapshotData) {
676+
Assertions.assertThat(row)
677+
.as("Snapshot full_document should contain 'cid'")
678+
.contains("\"cid\"");
679+
Assertions.assertThat(row)
680+
.as("Snapshot full_document should contain 'name'")
681+
.contains("\"name\"");
682+
}
666683

667684
// second step: check the change stream data
668685
for (String collectionName : captureCustomerCollections) {
@@ -693,11 +710,50 @@ private void testMongoDBParallelSourceWithMetadataColumns(
693710
for (int i = 0; i < captureCustomerCollections.length; i++) {
694711
expectedChangeStreamData.addAll(changeEventsForSingleTable);
695712
}
696-
List<String> actualChangeStreamData = fetchRows(iterator, expectedChangeStreamData.size());
697-
assertEqualsInAnyOrder(expectedChangeStreamData, actualChangeStreamData);
713+
List<String> rawChangeStreamData = fetchRows(iterator, expectedChangeStreamData.size());
714+
715+
// Strip full_document for precise matching of other metadata columns
716+
List<String> changeStreamWithoutFullDoc =
717+
rawChangeStreamData.stream()
718+
.map(MongoDBFullChangelogITCase::stripLastColumn)
719+
.collect(Collectors.toList());
720+
assertEqualsInAnyOrder(expectedChangeStreamData, changeStreamWithoutFullDoc);
721+
722+
// Verify full_document in change stream: non-null for +I/+U, null for -D
723+
for (String row : rawChangeStreamData) {
724+
if (row.startsWith("+I") || row.startsWith("+U")) {
725+
Assertions.assertThat(row)
726+
.as("Change stream full_document should contain 'name'")
727+
.contains("\"name\"");
728+
}
729+
if (row.startsWith("-D")) {
730+
Assertions.assertThat(row)
731+
.as("Delete event full_document should be null")
732+
.endsWith(", null]");
733+
}
734+
}
735+
698736
tableResult.getJobClient().get().cancel().get();
699737
}
700738

739+
/**
740+
* Strip the last column (full_document) from a Row.toString() formatted string. Row format:
741+
* "+I[col1, col2, ..., colN, full_document_json]" or "+I[col1, col2, ..., colN, null]".
742+
*/
743+
private static String stripLastColumn(String row) {
744+
// Find the last ", " before the full_document column value
745+
// The full_document is a JSON string starting with {" or null
746+
int lastJsonStart = row.lastIndexOf(", {\"");
747+
if (lastJsonStart > 0) {
748+
return row.substring(0, lastJsonStart) + "]";
749+
}
750+
int lastNullStart = row.lastIndexOf(", null]");
751+
if (lastNullStart > 0) {
752+
return row.substring(0, lastNullStart) + "]";
753+
}
754+
return row;
755+
}
756+
701757
private void testMongoDBParallelSource(
702758
MongoDBTestUtils.FailoverType failoverType,
703759
MongoDBTestUtils.FailoverPhase failoverPhase,

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,7 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Exception {
471471
+ " weight DECIMAL(10,3),"
472472
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
473473
+ " collection_name STRING METADATA VIRTUAL,"
474+
+ " full_document STRING METADATA FROM 'full_document' VIRTUAL,"
474475
+ " PRIMARY KEY (_id) NOT ENFORCED"
475476
+ ") WITH ("
476477
+ " 'connector' = 'mongodb-cdc',"
@@ -497,6 +498,7 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Exception {
497498
+ " weight DECIMAL(10,3),"
498499
+ " database_name STRING,"
499500
+ " collection_name STRING,"
501+
+ " full_document STRING,"
500502
+ " PRIMARY KEY (_id) NOT ENFORCED"
501503
+ ") WITH ("
502504
+ " 'connector' = 'values',"
@@ -549,6 +551,30 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Exception {
549551

550552
waitForSinkSize("meta_sink", 16);
551553

554+
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("meta_sink");
555+
556+
// Verify original metadata columns (database_name, collection_name) by stripping
557+
// the full_document column (last column) from each row for precise matching
558+
List<String> actualWithoutFullDoc =
559+
actual.stream()
560+
.map(
561+
row -> {
562+
// Row format: +I(col1,col2,...,colN,full_document_json)
563+
// full_document is the last column, strip it
564+
int lastCommaBeforeJson = row.lastIndexOf(",{\"_id\"");
565+
if (lastCommaBeforeJson > 0) {
566+
return row.substring(0, lastCommaBeforeJson) + ")";
567+
}
568+
// For -D events, full_document is null
569+
int lastCommaBeforeNull = row.lastIndexOf(",null)");
570+
if (lastCommaBeforeNull > 0) {
571+
return row.substring(0, lastCommaBeforeNull) + ")";
572+
}
573+
return row;
574+
})
575+
.sorted()
576+
.collect(Collectors.toList());
577+
552578
List<String> expected =
553579
Stream.of(
554580
"+I(100000000000000000000101,scooter,Small 2-wheel scooter,3.140,%s,products)",
@@ -571,8 +597,21 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Exception {
571597
.sorted()
572598
.collect(Collectors.toList());
573599

574-
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("meta_sink");
575-
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
600+
Assertions.assertThat(actualWithoutFullDoc).containsExactlyInAnyOrderElementsOf(expected);
601+
602+
// Verify full_document metadata: non-null for all events, contains expected fields
603+
for (String row : actual) {
604+
Assertions.assertThat(row)
605+
.as("full_document should contain 'name' field")
606+
.contains("\"name\"");
607+
Assertions.assertThat(row)
608+
.as("full_document should contain 'description' field")
609+
.contains("\"description\"");
610+
Assertions.assertThat(row)
611+
.as("full_document should contain 'weight' field")
612+
.contains("\"weight\"");
613+
}
614+
576615
result.getJobClient().get().cancel().get();
577616
}
578617

0 commit comments

Comments
 (0)