Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public MySqlPipelineRecordEmitter(
debeziumDeserializationSchema,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges(),
false); // Explicitly disable heartbeat events
false, // Explicitly disable heartbeat events
false); // Explicitly disable transaction metadata events
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
this.sourceConfig = sourceConfig;
this.alreadySendCreateTableTables = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public static <T> MySqlSourceBuilder<T> builder() {
deserializationSchema,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges(),
sourceConfig.isIncludeHeartbeatEvents()));
sourceConfig.isIncludeHeartbeatEvents(),
sourceConfig.isIncludeTransactionMetadataEvents()));
}

MySqlSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ public MySqlSourceBuilder<T> includeSchemaChanges(boolean includeSchemaChanges)
return this;
}

/** Whether the {@link MySqlSource} should output the transaction metadata events or not. */
public MySqlSourceBuilder<T> includeTransactionMetadataEvents(
boolean includeTransactionMetadataEvents) {
this.configFactory.includeTransactionMetadataEvents(includeTransactionMetadataEvents);
return this;
}

/** Whether the {@link MySqlSource} should scan the newly added tables or not. */
public MySqlSourceBuilder<T> scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) {
this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class MySqlSourceConfig implements Serializable {
private final double distributionFactorLower;
private final boolean includeSchemaChanges;
private final boolean includeHeartbeatEvents;
private final boolean includeTransactionMetadataEvents;
private final boolean scanNewlyAddedTableEnabled;
private final boolean closeIdleReaders;
private final Properties jdbcProperties;
Expand Down Expand Up @@ -101,6 +102,7 @@ public class MySqlSourceConfig implements Serializable {
double distributionFactorLower,
boolean includeSchemaChanges,
boolean includeHeartbeatEvents,
boolean includeTransactionMetadataEvents,
boolean scanNewlyAddedTableEnabled,
boolean closeIdleReaders,
Properties dbzProperties,
Expand Down Expand Up @@ -131,6 +133,7 @@ public class MySqlSourceConfig implements Serializable {
this.distributionFactorLower = distributionFactorLower;
this.includeSchemaChanges = includeSchemaChanges;
this.includeHeartbeatEvents = includeHeartbeatEvents;
this.includeTransactionMetadataEvents = includeTransactionMetadataEvents;
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
this.closeIdleReaders = closeIdleReaders;
this.dbzProperties = checkNotNull(dbzProperties);
Expand Down Expand Up @@ -234,6 +237,10 @@ public boolean isIncludeHeartbeatEvents() {
return includeHeartbeatEvents;
}

public boolean isIncludeTransactionMetadataEvents() {
return includeTransactionMetadataEvents;
}

public boolean isScanNewlyAddedTableEnabled() {
return scanNewlyAddedTableEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.table.catalog.ObjectPath;

import io.debezium.config.CommonConnectorConfig;

import java.io.Serializable;
import java.time.Duration;
import java.time.ZoneId;
Expand Down Expand Up @@ -64,6 +66,7 @@ public class MySqlSourceConfigFactory implements Serializable {
MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
private boolean includeSchemaChanges = false;
private boolean includeHeartbeatEvents = false;
private boolean includeTransactionMetadataEvents = false;
private boolean scanNewlyAddedTableEnabled = false;
private boolean closeIdleReaders = false;
private Properties jdbcProperties;
Expand Down Expand Up @@ -242,6 +245,13 @@ public MySqlSourceConfigFactory includeHeartbeatEvents(boolean includeHeartbeatE
return this;
}

/** Whether the {@link MySqlSource} should output the transaction metadata events or not. */
public MySqlSourceConfigFactory includeTransactionMetadataEvents(
boolean includeTransactionMetadataEvents) {
this.includeTransactionMetadataEvents = includeTransactionMetadataEvents;
return this;
}

/** Whether the {@link MySqlSource} should scan the newly added tables or not. */
public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) {
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
Expand Down Expand Up @@ -366,6 +376,10 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
// Note: the includeSchemaChanges parameter is used to control emitting the schema record,
// only DataStream API program need to emit the schema record, the Table API need not
props.setProperty("include.schema.changes", String.valueOf(true));
// enable transaction metadata if includeTransactionMetadataEvents is true
props.setProperty(
CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA.name(),
String.valueOf(includeTransactionMetadataEvents));
// disable the offset flush totally
props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
// disable tombstones
Expand Down Expand Up @@ -420,6 +434,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
distributionFactorLower,
includeSchemaChanges,
includeHeartbeatEvents,
includeTransactionMetadataEvents,
scanNewlyAddedTableEnabled,
closeIdleReaders,
props,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,20 @@ public class MySqlRecordEmitter<T> implements RecordEmitter<SourceRecords, T, My
private final MySqlSourceReaderMetrics sourceReaderMetrics;
private final boolean includeSchemaChanges;
private final boolean includeHeartbeatEvents;
private final boolean includeTransactionMetadataEvents;
private final OutputCollector<T> outputCollector;

public MySqlRecordEmitter(
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
MySqlSourceReaderMetrics sourceReaderMetrics,
boolean includeSchemaChanges,
boolean includeHeartbeatEvents) {
boolean includeHeartbeatEvents,
boolean includeTransactionMetadataEvents) {
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
this.sourceReaderMetrics = sourceReaderMetrics;
this.includeSchemaChanges = includeSchemaChanges;
this.includeHeartbeatEvents = includeHeartbeatEvents;
this.includeTransactionMetadataEvents = includeTransactionMetadataEvents;
this.outputCollector = new OutputCollector<>();
}

Expand Down Expand Up @@ -108,6 +111,11 @@ protected void processElement(
if (includeHeartbeatEvents) {
emitElement(element, output);
}
} else if (RecordUtils.isTransactionMetadataEvent(element)) {
Comment thread
tejanshrana marked this conversation as resolved.
updateStartingOffsetForSplit(splitState, element);
if (includeTransactionMetadataEvents) {
emitElement(element, output);
}
} else {
// unknown element
LOG.info("Meet unknown element {}, just skip.", element);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ private RecordUtils() {}
"io.debezium.connector.mysql.SchemaChangeKey";
public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME =
"io.debezium.connector.common.Heartbeat";
public static final String SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME =
"io.debezium.connector.common.TransactionMetadataKey";
private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader();

/** Converts a {@link ResultSet} row to an array of Objects. */
Expand Down Expand Up @@ -339,6 +341,18 @@ public static boolean isHeartbeatEvent(SourceRecord record) {
&& SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
}

/**
* Check whether the given source record is a transaction metadata event (BEGIN or END).
*
* <p>Transaction events are emitted by Debezium to mark transaction boundaries when
* provide.transaction.metadata is enabled.
*/
public static boolean isTransactionMetadataEvent(SourceRecord record) {
Schema keySchema = record.keySchema();
return keySchema != null
&& SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
}

/**
* Return the finished snapshot split information.
*
Expand Down
Loading
Loading