Skip to content

Commit 9ff8808

Browse files
committed
[FLINK-38522][cdc connector mysql] centralise ssl connection config, add ssl socketFactory to ensure consistency with io.debezium code.
1 parent 71c82cf commit 9ff8808

7 files changed

Lines changed: 264 additions & 50 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ protected void setSystemProperty(String property, Field field, boolean showValue
233233
*
234234
* @return the session variables that are related to sessions ssl version
235235
*/
236-
protected String getSessionVariableForSslVersion() {
236+
public String getSessionVariableForSslVersion() {
237237
final String sslVersion = "Ssl_version";
238238
LOGGER.debug("Reading MySQL Session variable for Ssl Version");
239239
Map<String, String> sessionVariables =

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java

Lines changed: 120 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import com.github.shyiko.mysql.binlog.event.EventData;
2828
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
2929
import com.github.shyiko.mysql.binlog.event.RotateEventData;
30+
import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
3031
import com.github.shyiko.mysql.binlog.network.SSLMode;
32+
import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
3133
import io.debezium.config.Configuration;
3234
import io.debezium.connector.mysql.MySqlConnection;
3335
import io.debezium.connector.mysql.MySqlConnectorConfig;
@@ -47,7 +49,20 @@
4749
import org.slf4j.Logger;
4850
import org.slf4j.LoggerFactory;
4951

52+
import javax.net.ssl.KeyManager;
53+
import javax.net.ssl.KeyManagerFactory;
54+
import javax.net.ssl.SSLContext;
55+
import javax.net.ssl.TrustManager;
56+
import javax.net.ssl.TrustManagerFactory;
57+
import javax.net.ssl.X509TrustManager;
58+
5059
import java.io.IOException;
60+
import java.security.GeneralSecurityException;
61+
import java.security.KeyStore;
62+
import java.security.KeyStoreException;
63+
import java.security.NoSuchAlgorithmException;
64+
import java.security.UnrecoverableKeyException;
65+
import java.security.cert.X509Certificate;
5166
import java.sql.SQLException;
5267
import java.util.ArrayList;
5368
import java.util.HashMap;
@@ -57,6 +72,8 @@
5772
import java.util.concurrent.ArrayBlockingQueue;
5873
import java.util.function.Predicate;
5974

75+
import static io.debezium.util.Strings.isNullOrEmpty;
76+
6077
/** Utilities related to Debezium. */
6178
public class DebeziumUtils {
6279
private static final String QUOTED_CHARACTER = "`";
@@ -94,13 +111,27 @@ public static MySqlConnection createMySqlConnection(
94111
}
95112

96113
/** Creates a new {@link BinaryLogClient} for consuming mysql binlog. */
97-
public static BinaryLogClient createBinaryClient(Configuration dbzConfiguration) {
114+
public static BinaryLogClient createBinaryClient(
115+
Configuration dbzConfiguration, MySqlConnection connection) {
98116
final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(dbzConfiguration);
99-
return new BinaryLogClient(
100-
connectorConfig.hostname(),
101-
connectorConfig.port(),
102-
connectorConfig.username(),
103-
connectorConfig.password());
117+
BinaryLogClient client =
118+
new BinaryLogClient(
119+
connectorConfig.hostname(),
120+
connectorConfig.port(),
121+
connectorConfig.username(),
122+
connectorConfig.password());
123+
SSLMode sslMode = sslModeFor(connectorConfig.sslMode());
124+
if (sslMode != null) {
125+
client.setSSLMode(sslMode);
126+
}
127+
if (connectorConfig.sslModeEnabled()) {
128+
SSLSocketFactory sslSocketFactory =
129+
getBinlogSslSocketFactory(connectorConfig, connection);
130+
if (sslSocketFactory != null) {
131+
client.setSslSocketFactory(sslSocketFactory);
132+
}
133+
}
134+
return client;
104135
}
105136

106137
/** Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql database schemas. */
@@ -252,17 +283,92 @@ static SSLMode sslModeFor(MySqlConnectorConfig.SecureConnectionMode mode) {
252283
}
253284
}
254285

286+
// see
287+
// flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource#getBinlogSslSocketFactory
288+
static SSLSocketFactory getBinlogSslSocketFactory(
289+
MySqlConnectorConfig connectorConfig, MySqlConnection connection) {
290+
String acceptedTlsVersion = connection.getSessionVariableForSslVersion();
291+
if (!isNullOrEmpty(acceptedTlsVersion)) {
292+
SSLMode sslMode = sslModeFor(connectorConfig.sslMode());
293+
LOG.info(
294+
"Enable ssl {} mode for connector {}",
295+
sslMode,
296+
connectorConfig.getLogicalName());
297+
298+
final char[] keyPasswordArray = connection.connectionConfig().sslKeyStorePassword();
299+
final String keyFilename = connection.connectionConfig().sslKeyStore();
300+
final char[] trustPasswordArray = connection.connectionConfig().sslTrustStorePassword();
301+
final String trustFilename = connection.connectionConfig().sslTrustStore();
302+
KeyManager[] keyManagers = null;
303+
if (keyFilename != null) {
304+
try {
305+
KeyStore ks = connection.loadKeyStore(keyFilename, keyPasswordArray);
306+
307+
KeyManagerFactory kmf = KeyManagerFactory.getInstance("NewSunX509");
308+
kmf.init(ks, keyPasswordArray);
309+
310+
keyManagers = kmf.getKeyManagers();
311+
} catch (KeyStoreException
312+
| NoSuchAlgorithmException
313+
| UnrecoverableKeyException e) {
314+
throw new FlinkRuntimeException("Could not load keystore", e);
315+
}
316+
}
317+
TrustManager[] trustManagers;
318+
try {
319+
KeyStore ks = null;
320+
if (trustFilename != null) {
321+
ks = connection.loadKeyStore(trustFilename, trustPasswordArray);
322+
}
323+
324+
if (ks == null && (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED)) {
325+
trustManagers =
326+
new TrustManager[] {
327+
new X509TrustManager() {
328+
329+
@Override
330+
public void checkClientTrusted(
331+
X509Certificate[] x509Certificates, String s) {}
332+
333+
@Override
334+
public void checkServerTrusted(
335+
X509Certificate[] x509Certificates, String s) {}
336+
337+
@Override
338+
public X509Certificate[] getAcceptedIssuers() {
339+
return new X509Certificate[0];
340+
}
341+
}
342+
};
343+
} else {
344+
TrustManagerFactory tmf =
345+
TrustManagerFactory.getInstance(
346+
TrustManagerFactory.getDefaultAlgorithm());
347+
tmf.init(ks);
348+
trustManagers = tmf.getTrustManagers();
349+
}
350+
} catch (KeyStoreException | NoSuchAlgorithmException e) {
351+
throw new FlinkRuntimeException("Could not load truststore", e);
352+
}
353+
// DBZ-1208 Resembles the logic from the upstream BinaryLogClient, only that
354+
// the accepted TLS version is passed to the constructed factory
355+
final KeyManager[] finalKMS = keyManagers;
356+
return new DefaultSSLSocketFactory(acceptedTlsVersion) {
357+
358+
@Override
359+
protected void initSSLContext(SSLContext sc) throws GeneralSecurityException {
360+
sc.init(finalKMS, trustManagers, null);
361+
}
362+
};
363+
}
364+
365+
return null;
366+
}
367+
255368
public static BinlogOffset findBinlogOffset(
256369
long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) {
257-
MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
258370
BinaryLogClient client =
259-
new BinaryLogClient(
260-
config.hostname(), config.port(), config.username(), config.password());
261-
SSLMode sslMode = sslModeFor(config.sslMode());
262-
if (sslMode != null) {
263-
client.setSSLMode(sslMode);
264-
}
265-
371+
createBinaryClient(mySqlSourceConfig.getDbzConfiguration(), connection);
266372
if (mySqlSourceConfig.getServerIdRange() != null) {
267373
client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId());
268374
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.github.shyiko.mysql.binlog.event.Event;
4040
import com.github.shyiko.mysql.binlog.event.EventType;
4141
import io.debezium.connector.base.ChangeEventQueue;
42+
import io.debezium.connector.mysql.MySqlConnection;
4243
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
4344
import io.debezium.pipeline.DataChangeEvent;
4445
import io.debezium.relational.TableId;
@@ -96,12 +97,15 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
9697
private static final long READER_CLOSE_TIMEOUT = 30L;
9798

9899
public BinlogSplitReader(MySqlSourceConfig sourceConfig, int subtaskId) {
99-
this(
100-
new StatefulTaskContext(
101-
sourceConfig,
102-
createBinaryClient(sourceConfig.getDbzConfiguration()),
103-
createMySqlConnection(sourceConfig)),
104-
subtaskId);
100+
this(createStatefulTaskContext(sourceConfig), subtaskId);
101+
}
102+
103+
private static StatefulTaskContext createStatefulTaskContext(MySqlSourceConfig sourceConfig) {
104+
MySqlConnection connection = createMySqlConnection(sourceConfig);
105+
return new StatefulTaskContext(
106+
sourceConfig,
107+
createBinaryClient(sourceConfig.getDbzConfiguration(), connection),
108+
connection);
105109
}
106110

107111
public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
import io.debezium.config.Configuration;
3939
import io.debezium.connector.base.ChangeEventQueue;
40+
import io.debezium.connector.mysql.MySqlConnection;
4041
import io.debezium.connector.mysql.MySqlConnectorConfig;
4142
import io.debezium.connector.mysql.MySqlOffsetContext;
4243
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
@@ -99,13 +100,15 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
99100

100101
public SnapshotSplitReader(
101102
MySqlSourceConfig sourceConfig, int subtaskId, SnapshotPhaseHooks hooks) {
102-
this(
103-
new StatefulTaskContext(
104-
sourceConfig,
105-
createBinaryClient(sourceConfig.getDbzConfiguration()),
106-
createMySqlConnection(sourceConfig)),
107-
subtaskId,
108-
hooks);
103+
this(createStatefulTaskContext(sourceConfig), subtaskId, hooks);
104+
}
105+
106+
private static StatefulTaskContext createStatefulTaskContext(MySqlSourceConfig sourceConfig) {
107+
MySqlConnection connection = createMySqlConnection(sourceConfig);
108+
return new StatefulTaskContext(
109+
sourceConfig,
110+
createBinaryClient(sourceConfig.getDbzConfiguration(), connection),
111+
connection);
109112
}
110113

111114
public SnapshotSplitReader(

0 commit comments

Comments
 (0)