diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java index b77ec0e3e..91465c340 100644 --- a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java +++ b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java @@ -109,7 +109,7 @@ public List getSchemaList() throws Exception { config.get(MongoDBSourceOptions.PASSWORD), config.get(MongoDBSourceOptions.SCHEME), config.get(MongoDBSourceOptions.HOSTS), - config.get(MongoDBSourceOptions.CONNECTION_OPTIONS)))); + getConnectionOptions()))); MongoClientSettings settings = settingsBuilder.build(); Double samplePercent = config.get(MONGO_CDC_CREATE_SAMPLE_PERCENT); @@ -119,6 +119,9 @@ public List getSchemaList() throws Exception { MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName); MongoIterable collectionNames = mongoDatabase.listCollectionNames(); for (String collectionName : collectionNames) { + if (collectionName.startsWith("system.")) { + continue; + } if (!isSyncNeeded(collectionName)) { continue; } @@ -181,6 +184,14 @@ private static String buildConnectionString( return sb.toString(); } + private String getConnectionOptions() { + String options = config.getString("connection-options", null); + if (StringUtils.isEmpty(options)) { + options = config.get(MongoDBSourceOptions.CONNECTION_OPTIONS); + } + return options; + } + @Override public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { String hosts = config.get(MongoDBSourceOptions.HOSTS); @@ -203,6 +214,8 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { .hosts(hosts) .username(username) .password(password) + .scheme(config.get(MongoDBSourceOptions.SCHEME)) + .connectionOptions(getConnectionOptions()) .databaseList(database) .collectionList(collection);