Skip to content

Commit ccf9d4a

Browse files
committed
[FLINK-39610] Mongo CDC supports specifying individual SSL context
1 parent b94d7b2 commit ccf9d4a

29 files changed

Lines changed: 1581 additions & 18 deletions

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,4 +285,52 @@ public MongoDBSourceBuilder<T> assignUnboundedChunkFirst(boolean assignUnbounded
285285
public MongoDBSource<T> build() {
286286
return new MongoDBSource<>(configFactory, checkNotNull(deserializer));
287287
}
288+
289+
/** Whether the connector will use SSL to connect to MongoDB instances. */
290+
public MongoDBSourceBuilder<T> sslEnabled(boolean sslEnabled) {
291+
this.configFactory.sslEnabled(sslEnabled);
292+
return this;
293+
}
294+
295+
/** When SSL is enabled, controls whether strict hostname checking is disabled. */
296+
public MongoDBSourceBuilder<T> sslInvalidHostnameAllowed(boolean sslInvalidHostnameAllowed) {
297+
this.configFactory.sslInvalidHostnameAllowed(sslInvalidHostnameAllowed);
298+
return this;
299+
}
300+
301+
/** The location of the key store file for two-way authentication. */
302+
public MongoDBSourceBuilder<T> sslKeyStore(String sslKeyStore) {
303+
this.configFactory.sslKeyStore(sslKeyStore);
304+
return this;
305+
}
306+
307+
/** The password for the key store file. */
308+
public MongoDBSourceBuilder<T> sslKeyStorePassword(String sslKeyStorePassword) {
309+
this.configFactory.sslKeyStorePassword(sslKeyStorePassword);
310+
return this;
311+
}
312+
313+
/** The type of key store file. */
314+
public MongoDBSourceBuilder<T> sslKeyStoreType(String sslKeyStoreType) {
315+
this.configFactory.sslKeyStoreType(sslKeyStoreType);
316+
return this;
317+
}
318+
319+
/** The location of the trust store file for the server certificate verification. */
320+
public MongoDBSourceBuilder<T> sslTrustStore(String sslTrustStore) {
321+
this.configFactory.sslTrustStore(sslTrustStore);
322+
return this;
323+
}
324+
325+
/** The password for the trust store file. */
326+
public MongoDBSourceBuilder<T> sslTrustStorePassword(String sslTrustStorePassword) {
327+
this.configFactory.sslTrustStorePassword(sslTrustStorePassword);
328+
return this;
329+
}
330+
331+
/** The type of trust store file. */
332+
public MongoDBSourceBuilder<T> sslTrustStoreType(String sslTrustStoreType) {
333+
this.configFactory.sslTrustStoreType(sslTrustStoreType);
334+
return this;
335+
}
288336
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ public class MongoDBSourceConfig implements SourceConfig {
5757
private final boolean isScanNewlyAddedTableEnabled;
5858
private final boolean assignUnboundedChunkFirst;
5959

60+
private final boolean sslEnabled;
61+
private final boolean sslInvalidHostnameAllowed;
62+
@Nullable private final String sslKeyStore;
63+
@Nullable private final String sslKeyStorePassword;
64+
private final String sslKeyStoreType;
65+
@Nullable private final String sslTrustStore;
66+
@Nullable private final String sslTrustStorePassword;
67+
private final String sslTrustStoreType;
68+
6069
MongoDBSourceConfig(
6170
String scheme,
6271
String hosts,
@@ -79,7 +88,15 @@ public class MongoDBSourceConfig implements SourceConfig {
7988
boolean disableCursorTimeout,
8089
boolean skipSnapshotBackfill,
8190
boolean isScanNewlyAddedTableEnabled,
82-
boolean assignUnboundedChunkFirst) {
91+
boolean assignUnboundedChunkFirst,
92+
boolean sslEnabled,
93+
boolean sslInvalidHostnameAllowed,
94+
@Nullable String sslKeyStore,
95+
@Nullable String sslKeyStorePassword,
96+
String sslKeyStoreType,
97+
@Nullable String sslTrustStore,
98+
@Nullable String sslTrustStorePassword,
99+
String sslTrustStoreType) {
83100
this.scheme = checkNotNull(scheme);
84101
this.hosts = checkNotNull(hosts);
85102
this.username = username;
@@ -103,6 +120,14 @@ public class MongoDBSourceConfig implements SourceConfig {
103120
this.skipSnapshotBackfill = skipSnapshotBackfill;
104121
this.isScanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled;
105122
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
123+
this.sslEnabled = sslEnabled;
124+
this.sslInvalidHostnameAllowed = sslInvalidHostnameAllowed;
125+
this.sslKeyStore = sslKeyStore;
126+
this.sslKeyStorePassword = sslKeyStorePassword;
127+
this.sslKeyStoreType = sslKeyStoreType;
128+
this.sslTrustStore = sslTrustStore;
129+
this.sslTrustStorePassword = sslTrustStorePassword;
130+
this.sslTrustStoreType = sslTrustStoreType;
106131
}
107132

108133
public String getScheme() {
@@ -207,6 +232,42 @@ public boolean isAssignUnboundedChunkFirst() {
207232
return assignUnboundedChunkFirst;
208233
}
209234

235+
public boolean isSslEnabled() {
236+
return sslEnabled;
237+
}
238+
239+
public boolean isSslInvalidHostnameAllowed() {
240+
return sslInvalidHostnameAllowed;
241+
}
242+
243+
@Nullable
244+
public String getSslKeyStore() {
245+
return sslKeyStore;
246+
}
247+
248+
@Nullable
249+
public String getSslKeyStorePassword() {
250+
return sslKeyStorePassword;
251+
}
252+
253+
public String getSslKeyStoreType() {
254+
return sslKeyStoreType;
255+
}
256+
257+
@Nullable
258+
public String getSslTrustStore() {
259+
return sslTrustStore;
260+
}
261+
262+
@Nullable
263+
public String getSslTrustStorePassword() {
264+
return sslTrustStorePassword;
265+
}
266+
267+
public String getSslTrustStoreType() {
268+
return sslTrustStoreType;
269+
}
270+
210271
@Override
211272
public boolean equals(Object o) {
212273
if (this == o) {
@@ -234,7 +295,15 @@ public boolean equals(Object o) {
234295
&& Objects.equals(collectionList, that.collectionList)
235296
&& Objects.equals(connectionString, that.connectionString)
236297
&& Objects.equals(skipSnapshotBackfill, that.skipSnapshotBackfill)
237-
&& Objects.equals(isScanNewlyAddedTableEnabled, that.isScanNewlyAddedTableEnabled);
298+
&& Objects.equals(isScanNewlyAddedTableEnabled, that.isScanNewlyAddedTableEnabled)
299+
&& sslEnabled == that.sslEnabled
300+
&& sslInvalidHostnameAllowed == that.sslInvalidHostnameAllowed
301+
&& Objects.equals(sslKeyStore, that.sslKeyStore)
302+
&& Objects.equals(sslKeyStorePassword, that.sslKeyStorePassword)
303+
&& Objects.equals(sslKeyStoreType, that.sslKeyStoreType)
304+
&& Objects.equals(sslTrustStore, that.sslTrustStore)
305+
&& Objects.equals(sslTrustStorePassword, that.sslTrustStorePassword)
306+
&& Objects.equals(sslTrustStoreType, that.sslTrustStoreType);
238307
}
239308

240309
@Override
@@ -258,6 +327,14 @@ public int hashCode() {
258327
samplesPerChunk,
259328
closeIdleReaders,
260329
skipSnapshotBackfill,
261-
isScanNewlyAddedTableEnabled);
330+
isScanNewlyAddedTableEnabled,
331+
sslEnabled,
332+
sslInvalidHostnameAllowed,
333+
sslKeyStore,
334+
sslKeyStorePassword,
335+
sslKeyStoreType,
336+
sslTrustStore,
337+
sslTrustStorePassword,
338+
sslTrustStoreType);
262339
}
263340
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,17 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
6464
protected boolean scanNewlyAddedTableEnabled = false;
6565
protected boolean assignUnboundedChunkFirst = false;
6666

67+
private boolean sslEnabled = MongoDBSourceOptions.SSL_ENABLED.defaultValue();
68+
private boolean sslInvalidHostnameAllowed =
69+
MongoDBSourceOptions.SSL_INVALID_HOSTNAME_ALLOWED.defaultValue();
70+
private String sslKeyStore = MongoDBSourceOptions.SSL_KEYSTORE.defaultValue();
71+
private String sslKeyStorePassword = MongoDBSourceOptions.SSL_KEYSTORE_PASSWORD.defaultValue();
72+
private String sslKeyStoreType = MongoDBSourceOptions.SSL_KEYSTORE_TYPE.defaultValue();
73+
private String sslTrustStore = MongoDBSourceOptions.SSL_TRUSTSTORE.defaultValue();
74+
private String sslTrustStorePassword =
75+
MongoDBSourceOptions.SSL_TRUSTSTORE_PASSWORD.defaultValue();
76+
private String sslTrustStoreType = MongoDBSourceOptions.SSL_TRUSTSTORE_TYPE.defaultValue();
77+
6778
/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
6879
public MongoDBSourceConfigFactory scheme(String scheme) {
6980
checkArgument(
@@ -280,6 +291,54 @@ public MongoDBSourceConfigFactory assignUnboundedChunkFirst(boolean assignUnboun
280291
return this;
281292
}
282293

294+
/** Whether the connector will use SSL to connect to MongoDB instances. */
295+
public MongoDBSourceConfigFactory sslEnabled(boolean sslEnabled) {
296+
this.sslEnabled = sslEnabled;
297+
return this;
298+
}
299+
300+
/** When SSL is enabled, controls whether strict hostname checking is disabled. */
301+
public MongoDBSourceConfigFactory sslInvalidHostnameAllowed(boolean sslInvalidHostnameAllowed) {
302+
this.sslInvalidHostnameAllowed = sslInvalidHostnameAllowed;
303+
return this;
304+
}
305+
306+
/** The location of the key store file for two-way authentication. */
307+
public MongoDBSourceConfigFactory sslKeyStore(String sslKeyStore) {
308+
this.sslKeyStore = sslKeyStore;
309+
return this;
310+
}
311+
312+
/** The password for the key store file. */
313+
public MongoDBSourceConfigFactory sslKeyStorePassword(String sslKeyStorePassword) {
314+
this.sslKeyStorePassword = sslKeyStorePassword;
315+
return this;
316+
}
317+
318+
/** The type of key store file. */
319+
public MongoDBSourceConfigFactory sslKeyStoreType(String sslKeyStoreType) {
320+
this.sslKeyStoreType = sslKeyStoreType;
321+
return this;
322+
}
323+
324+
/** The location of the trust store file for the server certificate verification. */
325+
public MongoDBSourceConfigFactory sslTrustStore(String sslTrustStore) {
326+
this.sslTrustStore = sslTrustStore;
327+
return this;
328+
}
329+
330+
/** The password for the trust store file. */
331+
public MongoDBSourceConfigFactory sslTrustStorePassword(String sslTrustStorePassword) {
332+
this.sslTrustStorePassword = sslTrustStorePassword;
333+
return this;
334+
}
335+
336+
/** The type of trust store file. */
337+
public MongoDBSourceConfigFactory sslTrustStoreType(String sslTrustStoreType) {
338+
this.sslTrustStoreType = sslTrustStoreType;
339+
return this;
340+
}
341+
283342
/** Creates a new {@link MongoDBSourceConfig} for the given subtask {@code subtaskId}. */
284343
@Override
285344
public MongoDBSourceConfig create(int subtaskId) {
@@ -306,6 +365,14 @@ public MongoDBSourceConfig create(int subtaskId) {
306365
disableCursorTimeout,
307366
skipSnapshotBackfill,
308367
scanNewlyAddedTableEnabled,
309-
assignUnboundedChunkFirst);
368+
assignUnboundedChunkFirst,
369+
sslEnabled,
370+
sslInvalidHostnameAllowed,
371+
sslKeyStore,
372+
sslKeyStorePassword,
373+
sslKeyStoreType,
374+
sslTrustStore,
375+
sslTrustStorePassword,
376+
sslTrustStoreType);
310377
}
311378
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,4 +178,68 @@ public class MongoDBSourceOptions {
178178
.defaultValue(true)
179179
.withDescription(
180180
"MongoDB server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to true to prevent that.");
181+
182+
// SSL/TLS options
183+
184+
public static final ConfigOption<Boolean> SSL_ENABLED =
185+
ConfigOptions.key("mongodb.ssl.enabled")
186+
.booleanType()
187+
.defaultValue(false)
188+
.withDescription(
189+
"Whether the connector will use SSL to connect to MongoDB instances.");
190+
191+
public static final ConfigOption<Boolean> SSL_INVALID_HOSTNAME_ALLOWED =
192+
ConfigOptions.key("mongodb.ssl.invalid.hostname.allowed")
193+
.booleanType()
194+
.defaultValue(false)
195+
.withDescription(
196+
"When SSL is enabled, this setting controls whether strict hostname checking is disabled "
197+
+ "during the connection phase. If true, the connection will not prevent man-in-the-middle attacks.");
198+
199+
public static final ConfigOption<String> SSL_KEYSTORE =
200+
ConfigOptions.key("mongodb.ssl.keystore")
201+
.stringType()
202+
.noDefaultValue()
203+
.withDescription(
204+
"The location of the key store file. "
205+
+ "This is optional and can be used for two-way authentication between the client and the MongoDB server.");
206+
207+
public static final ConfigOption<String> SSL_KEYSTORE_PASSWORD =
208+
ConfigOptions.key("mongodb.ssl.keystore.password")
209+
.stringType()
210+
.noDefaultValue()
211+
.withDescription(
212+
"The password for the key store file. "
213+
+ "This is optional and only needed if 'mongodb.ssl.keystore' is configured.");
214+
215+
public static final ConfigOption<String> SSL_KEYSTORE_TYPE =
216+
ConfigOptions.key("mongodb.ssl.keystore.type")
217+
.stringType()
218+
.defaultValue("PKCS12")
219+
.withDescription(
220+
"The type of key store file. "
221+
+ "This is optional and only needed if 'mongodb.ssl.keystore' is configured. Defaults to PKCS12.");
222+
223+
public static final ConfigOption<String> SSL_TRUSTSTORE =
224+
ConfigOptions.key("mongodb.ssl.truststore")
225+
.stringType()
226+
.noDefaultValue()
227+
.withDescription(
228+
"The location of the trust store file for the server certificate verification.");
229+
230+
public static final ConfigOption<String> SSL_TRUSTSTORE_PASSWORD =
231+
ConfigOptions.key("mongodb.ssl.truststore.password")
232+
.stringType()
233+
.noDefaultValue()
234+
.withDescription(
235+
"The password for the trust store file. "
236+
+ "Used to check the integrity of the truststore, and unlock the truststore.");
237+
238+
public static final ConfigOption<String> SSL_TRUSTSTORE_TYPE =
239+
ConfigOptions.key("mongodb.ssl.truststore.type")
240+
.stringType()
241+
.defaultValue("PKCS12")
242+
.withDescription(
243+
"The type of trust store file. "
244+
+ "This is optional and only needed if 'mongodb.ssl.truststore' is configured. Defaults to PKCS12.");
181245
}

0 commit comments

Comments
 (0)