Skip to content

Commit 88263de

Browse files
committed
[Feature](mongodb-cdc) Expose split-related CDC options for MongoDB
Expose additional MongoDB CDC source options that were previously not configurable in mongodb-sync-database: - scan.incremental.snapshot.chunk.size.mb - scan.incremental.snapshot.chunk.samples - split.meta.group.size - close.idle.readers - scan.full-changelog - scan.cursor.no-timeout These options allow users to control snapshot chunk granularity and reader behavior, which is especially useful for large collections.
1 parent dd4b7ef commit 88263de

1 file changed

Lines changed: 25 additions & 0 deletions

File tree

flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,18 @@ public class MongoDBDatabaseSync extends DatabaseSync {
8686
.noDefaultValue()
8787
.withDescription("Table name of the Mongo database to monitor.");
8888

89+
public static final ConfigOption<Integer> SPLIT_META_GROUP_SIZE =
90+
ConfigOptions.key("split.meta.group.size")
91+
.intType()
92+
.noDefaultValue()
93+
.withDescription("The meta group size of split.");
94+
95+
public static final ConfigOption<Boolean> CLOSE_IDLE_READERS =
96+
ConfigOptions.key("close.idle.readers")
97+
.booleanType()
98+
.defaultValue(false)
99+
.withDescription("Whether to close idle readers.");
100+
89101
public MongoDBDatabaseSync() throws SQLException {}
90102

91103
@Override
@@ -222,6 +234,19 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
222234
default:
223235
throw new IllegalArgumentException("Unsupported startup mode: " + startupMode);
224236
}
237+
238+
config.getOptional(MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB)
239+
.ifPresent(mongoDBSourceBuilder::splitSizeMB);
240+
config.getOptional(MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SAMPLES)
241+
.ifPresent(mongoDBSourceBuilder::samplesPerChunk);
242+
config.getOptional(SPLIT_META_GROUP_SIZE).ifPresent(mongoDBSourceBuilder::splitMetaGroupSize);
243+
config.getOptional(CLOSE_IDLE_READERS)
244+
.ifPresent(mongoDBSourceBuilder::closeIdleReaders);
245+
config.getOptional(MongoDBSourceOptions.FULL_DOCUMENT_PRE_POST_IMAGE)
246+
.ifPresent(mongoDBSourceBuilder::scanFullChangelog);
247+
config.getOptional(MongoDBSourceOptions.SCAN_NO_CURSOR_TIMEOUT)
248+
.ifPresent(mongoDBSourceBuilder::disableCursorTimeout);
249+
225250
MongoDBSource<String> mongoDBSource = mongoDBSourceBuilder.deserializer(schema).build();
226251
return env.fromSource(mongoDBSource, WatermarkStrategy.noWatermarks(), "MongoDB Source");
227252
}

0 commit comments

Comments
 (0)