Skip to content

Commit 51cd95a

Browse files
committed
[Fix](mongodb-cdc) Fix MongoDB connection options propagation and filter system collections
This commit fixes two issues: 1. connection-options not being passed to MongoDBSourceBuilder: The internal Flink CDC key is \"connection.options\" (with dot), but users naturally pass \"connection-options\" (with hyphen). Added getConnectionOptions() to try both keys for compatibility. Also added .scheme() and .connectionOptions() calls to MongoDBSourceBuilder which were previously missing. 2. Unauthorized errors on system collections: MongoDB system collections (e.g. system.profile) cannot be accessed without elevated privileges. Added a filter to skip collections starting with \"system.\" during schema inference.
1 parent dd4b7ef commit 51cd95a

1 file changed

Lines changed: 14 additions & 1 deletion

File tree

flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public List<SourceSchema> getSchemaList() throws Exception {
109109
config.get(MongoDBSourceOptions.PASSWORD),
110110
config.get(MongoDBSourceOptions.SCHEME),
111111
config.get(MongoDBSourceOptions.HOSTS),
112-
config.get(MongoDBSourceOptions.CONNECTION_OPTIONS))));
112+
getConnectionOptions())));
113113

114114
MongoClientSettings settings = settingsBuilder.build();
115115
Double samplePercent = config.get(MONGO_CDC_CREATE_SAMPLE_PERCENT);
@@ -119,6 +119,9 @@ public List<SourceSchema> getSchemaList() throws Exception {
119119
MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName);
120120
MongoIterable<String> collectionNames = mongoDatabase.listCollectionNames();
121121
for (String collectionName : collectionNames) {
122+
if (collectionName.startsWith("system.")) {
123+
continue;
124+
}
122125
if (!isSyncNeeded(collectionName)) {
123126
continue;
124127
}
@@ -181,6 +184,14 @@ private static String buildConnectionString(
181184
return sb.toString();
182185
}
183186

187+
private String getConnectionOptions() {
188+
String options = config.getString("connection-options", null);
189+
if (StringUtils.isEmpty(options)) {
190+
options = config.get(MongoDBSourceOptions.CONNECTION_OPTIONS);
191+
}
192+
return options;
193+
}
194+
184195
@Override
185196
public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
186197
String hosts = config.get(MongoDBSourceOptions.HOSTS);
@@ -203,6 +214,8 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
203214
.hosts(hosts)
204215
.username(username)
205216
.password(password)
217+
.scheme(config.get(MongoDBSourceOptions.SCHEME))
218+
.connectionOptions(getConnectionOptions())
206219
.databaseList(database)
207220
.collectionList(collection);
208221

0 commit comments

Comments
 (0)