diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java index b77ec0e3e..f0ac79ffa 100644 --- a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java +++ b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java @@ -86,6 +86,18 @@ public class MongoDBDatabaseSync extends DatabaseSync { .noDefaultValue() .withDescription("Table name of the Mongo database to monitor."); + public static final ConfigOption SPLIT_META_GROUP_SIZE = + ConfigOptions.key("split.meta.group.size") + .intType() + .noDefaultValue() + .withDescription("The meta group size of split."); + + public static final ConfigOption CLOSE_IDLE_READERS = + ConfigOptions.key("close.idle.readers") + .booleanType() + .defaultValue(false) + .withDescription("Whether to close idle readers."); + public MongoDBDatabaseSync() throws SQLException {} @Override @@ -222,6 +234,19 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { default: throw new IllegalArgumentException("Unsupported startup mode: " + startupMode); } + + config.getOptional(MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB) + .ifPresent(mongoDBSourceBuilder::splitSizeMB); + config.getOptional(MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SAMPLES) + .ifPresent(mongoDBSourceBuilder::samplesPerChunk); + config.getOptional(SPLIT_META_GROUP_SIZE).ifPresent(mongoDBSourceBuilder::splitMetaGroupSize); + config.getOptional(CLOSE_IDLE_READERS) + .ifPresent(mongoDBSourceBuilder::closeIdleReaders); + config.getOptional(MongoDBSourceOptions.FULL_DOCUMENT_PRE_POST_IMAGE) + .ifPresent(mongoDBSourceBuilder::scanFullChangelog); + config.getOptional(MongoDBSourceOptions.SCAN_NO_CURSOR_TIMEOUT) + .ifPresent(mongoDBSourceBuilder::disableCursorTimeout); + MongoDBSource mongoDBSource = mongoDBSourceBuilder.deserializer(schema).build(); return env.fromSource(mongoDBSource, WatermarkStrategy.noWatermarks(), "MongoDB Source"); }