Skip to content

Commit 4a86d9f

Browse files
committed
heartbeat records; merge outstanding PRs
1 parent af9e1fd commit 4a86d9f

13 files changed

Lines changed: 660 additions & 23 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ public MySqlPipelineRecordEmitter(
9292
super(
9393
debeziumDeserializationSchema,
9494
sourceReaderMetrics,
95-
sourceConfig.isIncludeSchemaChanges());
95+
sourceConfig.isIncludeSchemaChanges(),
96+
false, // Explicitly disable transaction metadata events
97+
false); // Explicitly disable heartbeat events
9698
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
9799
this.sourceConfig = sourceConfig;
98100
this.alreadySendCreateTableTables = new HashSet<>();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.cdc.connectors.mysql.debezium;
1919

20+
import com.github.shyiko.mysql.binlog.network.SSLMode;
2021
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
2122
import org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionFactory;
2223
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
@@ -242,12 +243,26 @@ private static Map<String, String> querySystemVariables(
242243
return variables;
243244
}
244245

246+
static SSLMode sslModeFor(MySqlConnectorConfig.SecureConnectionMode mode) {
247+
try {
248+
return mode == null ? null : SSLMode.valueOf(mode.name());
249+
} catch (IllegalArgumentException e) {
250+
throw new FlinkRuntimeException(
251+
String.format("Invalid SecureConnectionMode provided: %s ", mode.name()), e);
252+
}
253+
}
254+
245255
public static BinlogOffset findBinlogOffset(
246256
long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) {
247257
MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
248258
BinaryLogClient client =
249259
new BinaryLogClient(
250260
config.hostname(), config.port(), config.username(), config.password());
261+
SSLMode sslMode = sslModeFor(config.sslMode());
262+
if (sslMode != null) {
263+
client.setSSLMode(sslMode);
264+
}
265+
251266
if (mySqlSourceConfig.getServerIdRange() != null) {
252267
client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId());
253268
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,9 @@ public static <T> MySqlSourceBuilder<T> builder() {
132132
new MySqlRecordEmitter<>(
133133
deserializationSchema,
134134
sourceReaderMetrics,
135-
sourceConfig.isIncludeSchemaChanges()));
135+
sourceConfig.isIncludeSchemaChanges(),
136+
sourceConfig.isIncludeTransactionMetadataEvents(),
137+
sourceConfig.isIncludeHeartbeatEvents()));
136138
}
137139

138140
MySqlSource(

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ public MySqlSourceBuilder<T> includeSchemaChanges(boolean includeSchemaChanges)
198198
return this;
199199
}
200200

201+
/** Whether the {@link MySqlSource} should output the transaction metadata events or not. */
202+
public MySqlSourceBuilder<T> includeTransactionMetadataEvents(boolean includeTransactionMetadataEvents) {
203+
this.configFactory.includeTransactionMetadataEvents(includeTransactionMetadataEvents);
204+
return this;
205+
}
206+
201207
/** Whether the {@link MySqlSource} should scan the newly added tables or not. */
202208
public MySqlSourceBuilder<T> scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) {
203209
this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
@@ -237,6 +243,18 @@ public MySqlSourceBuilder<T> heartbeatInterval(Duration heartbeatInterval) {
237243
return this;
238244
}
239245

246+
/**
247+
* The interval of heartbeat event and whether to emit heartbeat events.
248+
*
249+
* @param heartbeatInterval the interval of heartbeat event
250+
* @param includeHeartbeatEvents whether to emit heartbeat events
251+
*/
252+
public MySqlSourceBuilder<T> heartbeatInterval(Duration heartbeatInterval, boolean includeHeartbeatEvents) {
253+
this.configFactory.heartbeatInterval(heartbeatInterval);
254+
this.configFactory.includeHeartbeatEvents(includeHeartbeatEvents);
255+
return this;
256+
}
257+
240258
/**
241259
* Whether to skip backfill in snapshot reading phase.
242260
*

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public class MySqlSourceConfig implements Serializable {
6262
private final double distributionFactorUpper;
6363
private final double distributionFactorLower;
6464
private final boolean includeSchemaChanges;
65+
private final boolean includeTransactionMetadataEvents;
66+
private final boolean includeHeartbeatEvents;
6567
private final boolean scanNewlyAddedTableEnabled;
6668
private final boolean closeIdleReaders;
6769
private final Properties jdbcProperties;
@@ -99,6 +101,8 @@ public class MySqlSourceConfig implements Serializable {
99101
double distributionFactorUpper,
100102
double distributionFactorLower,
101103
boolean includeSchemaChanges,
104+
boolean includeTransactionMetadataEvents,
105+
boolean includeHeartbeatEvents,
102106
boolean scanNewlyAddedTableEnabled,
103107
boolean closeIdleReaders,
104108
Properties dbzProperties,
@@ -128,6 +132,8 @@ public class MySqlSourceConfig implements Serializable {
128132
this.distributionFactorUpper = distributionFactorUpper;
129133
this.distributionFactorLower = distributionFactorLower;
130134
this.includeSchemaChanges = includeSchemaChanges;
135+
this.includeTransactionMetadataEvents = includeTransactionMetadataEvents;
136+
this.includeHeartbeatEvents = includeHeartbeatEvents;
131137
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
132138
this.closeIdleReaders = closeIdleReaders;
133139
this.dbzProperties = checkNotNull(dbzProperties);
@@ -227,6 +233,14 @@ public boolean isIncludeSchemaChanges() {
227233
return includeSchemaChanges;
228234
}
229235

236+
public boolean isIncludeTransactionMetadataEvents() {
237+
return includeTransactionMetadataEvents;
238+
}
239+
240+
public boolean isIncludeHeartbeatEvents() {
241+
return includeHeartbeatEvents;
242+
}
243+
230244
public boolean isScanNewlyAddedTableEnabled() {
231245
return scanNewlyAddedTableEnabled;
232246
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class MySqlSourceConfigFactory implements Serializable {
6363
private double distributionFactorLower =
6464
MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
6565
private boolean includeSchemaChanges = false;
66+
private boolean includeTransactionMetadataEvents = false;
67+
private boolean includeHeartbeatEvents = false;
6668
private boolean scanNewlyAddedTableEnabled = false;
6769
private boolean closeIdleReaders = false;
6870
private Properties jdbcProperties;
@@ -235,6 +237,18 @@ public MySqlSourceConfigFactory includeSchemaChanges(boolean includeSchemaChange
235237
return this;
236238
}
237239

240+
/** Whether the {@link MySqlSource} should output the transaction metadata events or not. */
241+
public MySqlSourceConfigFactory includeTransactionMetadataEvents(boolean includeTransactionMetadataEvents) {
242+
this.includeTransactionMetadataEvents = includeTransactionMetadataEvents;
243+
return this;
244+
}
245+
246+
/** Whether the {@link MySqlSource} should output the heartbeat events or not. */
247+
public MySqlSourceConfigFactory includeHeartbeatEvents(boolean includeHeartbeatEvents) {
248+
this.includeHeartbeatEvents = includeHeartbeatEvents;
249+
return this;
250+
}
251+
238252
/** Whether the {@link MySqlSource} should scan the newly added tables or not. */
239253
public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) {
240254
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
@@ -359,6 +373,8 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
359373
// Note: the includeSchemaChanges parameter is used to control emitting the schema record,
360374
// only DataStream API program need to emit the schema record, the Table API need not
361375
props.setProperty("include.schema.changes", String.valueOf(true));
376+
// enable transaction metadata if includeTransactionMetadataEvents is true
377+
props.setProperty("provide.transaction.metadata", String.valueOf(includeTransactionMetadataEvents));
362378
// disable the offset flush totally
363379
props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
364380
// disable tombstones
@@ -412,6 +428,8 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
412428
distributionFactorUpper,
413429
distributionFactorLower,
414430
includeSchemaChanges,
431+
includeTransactionMetadataEvents,
432+
includeHeartbeatEvents,
415433
scanNewlyAddedTableEnabled,
416434
closeIdleReaders,
417435
props,

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,21 @@ public class MySqlRecordEmitter<T> implements RecordEmitter<SourceRecords, T, My
5353
private final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
5454
private final MySqlSourceReaderMetrics sourceReaderMetrics;
5555
private final boolean includeSchemaChanges;
56+
private final boolean includeTransactionMetadataEvents;
57+
private final boolean includeHeartbeatEvents;
5658
private final OutputCollector<T> outputCollector;
5759

5860
public MySqlRecordEmitter(
5961
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
6062
MySqlSourceReaderMetrics sourceReaderMetrics,
61-
boolean includeSchemaChanges) {
63+
boolean includeSchemaChanges,
64+
boolean includeTransactionMetadataEvents,
65+
boolean includeHeartbeatEvents) {
6266
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
6367
this.sourceReaderMetrics = sourceReaderMetrics;
6468
this.includeSchemaChanges = includeSchemaChanges;
69+
this.includeTransactionMetadataEvents = includeTransactionMetadataEvents;
70+
this.includeHeartbeatEvents = includeHeartbeatEvents;
6571
this.outputCollector = new OutputCollector<>();
6672
}
6773

@@ -102,6 +108,14 @@ protected void processElement(
102108
emitElement(element, output);
103109
} else if (RecordUtils.isHeartbeatEvent(element)) {
104110
updateStartingOffsetForSplit(splitState, element);
111+
if (includeHeartbeatEvents) {
112+
emitElement(element, output);
113+
}
114+
} else if (RecordUtils.isTransactionMetadataEvent(element)) {
115+
updateStartingOffsetForSplit(splitState, element);
116+
if (includeTransactionMetadataEvents) {
117+
emitElement(element, output);
118+
}
105119
} else {
106120
// unknown element
107121
LOG.info("Meet unknown element {}, just skip.", element);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ private RecordUtils() {}
7373
"io.debezium.connector.mysql.SchemaChangeKey";
7474
public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME =
7575
"io.debezium.connector.common.Heartbeat";
76+
public static final String SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME =
77+
"io.debezium.connector.common.TransactionMetadataKey";
7678
private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader();
7779

7880
/** Converts a {@link ResultSet} row to an array of Objects. */
@@ -339,6 +341,18 @@ public static boolean isHeartbeatEvent(SourceRecord record) {
339341
&& SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
340342
}
341343

344+
/**
345+
* Check whether the given source record is a transaction metadata event (BEGIN or END).
346+
*
347+
* <p>Transaction events are emitted by Debezium to mark transaction boundaries when
348+
* provide.transaction.metadata is enabled.
349+
*/
350+
public static boolean isTransactionMetadataEvent(SourceRecord record) {
351+
Schema keySchema = record.keySchema();
352+
return keySchema != null
353+
&& SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
354+
}
355+
342356
/**
343357
* Return the finished snapshot split information.
344358
*

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.flink.cdc.connectors.mysql.debezium;
1919

20+
import com.github.shyiko.mysql.binlog.network.SSLMode;
21+
import io.debezium.connector.mysql.MySqlConnectorConfig;
22+
import java.util.stream.Stream;
2023
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
2124
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
2225
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
@@ -28,6 +31,9 @@
2831
import java.time.Duration;
2932
import java.time.ZoneId;
3033
import java.util.Properties;
34+
import org.junit.jupiter.params.ParameterizedTest;
35+
import org.junit.jupiter.params.provider.Arguments;
36+
import org.junit.jupiter.params.provider.MethodSource;
3137

3238
/** Tests for {@link org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils}. */
3339
class DebeziumUtilsTest {
@@ -89,4 +95,24 @@ private void assertJdbcUrl(String expected, String actual) {
8995
String[] actualParam = actual.split("&");
9096
Assertions.assertThat(actualParam).containsExactlyInAnyOrder(expectedParam);
9197
}
98+
99+
@ParameterizedTest
100+
@MethodSource("sslModeProvider")
101+
void testSslModeConversion(MySqlConnectorConfig.SecureConnectionMode input, SSLMode expected) {
102+
SSLMode actual = DebeziumUtils.sslModeFor(input);
103+
Assertions.assertThat(actual).isEqualTo(expected);
104+
}
105+
static Stream<Arguments> sslModeProvider() {
106+
return Stream.of(
107+
Arguments.of(MySqlConnectorConfig.SecureConnectionMode.DISABLED, SSLMode.DISABLED),
108+
Arguments.of(
109+
MySqlConnectorConfig.SecureConnectionMode.PREFERRED, SSLMode.PREFERRED),
110+
Arguments.of(MySqlConnectorConfig.SecureConnectionMode.REQUIRED, SSLMode.REQUIRED),
111+
Arguments.of(
112+
MySqlConnectorConfig.SecureConnectionMode.VERIFY_CA, SSLMode.VERIFY_CA),
113+
Arguments.of(
114+
MySqlConnectorConfig.SecureConnectionMode.VERIFY_IDENTITY,
115+
SSLMode.VERIFY_IDENTITY));
116+
}
117+
92118
}

0 commit comments

Comments
 (0)