From fb2e77e6ac07968a8a6fea0ed092aea6135e2f34 Mon Sep 17 00:00:00 2001 From: Tejansh Rana Date: Tue, 4 Nov 2025 15:50:37 +0000 Subject: [PATCH 1/7] Support for transaction boundaries in mysql connector --- .../source/reader/MySqlRecordEmitter.java | 3 + .../mysql/source/utils/RecordUtils.java | 14 ++ .../source/reader/MySqlRecordEmitterTest.java | 208 +++++++++++++++++- 3 files changed, 218 insertions(+), 7 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index 449e7f608f2..06801a659b7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -102,6 +102,9 @@ protected void processElement( emitElement(element, output); } else if (RecordUtils.isHeartbeatEvent(element)) { updateStartingOffsetForSplit(splitState, element); + } else if (RecordUtils.isTransactionMetadataEvent(element)) { + updateStartingOffsetForSplit(splitState, element); + emitElement(element, output); } else { // unknown element LOG.info("Meet unknown element {}, just skip.", element); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index e6848f1c4c8..2a323cc51fa 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -76,6 +76,8 @@ private RecordUtils() {} "io.debezium.connector.mysql.SchemaChangeKey"; public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME = "io.debezium.connector.common.Heartbeat"; + public static final String SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME = + "io.debezium.connector.common.TransactionMetadataKey"; private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader(); /** Converts a {@link ResultSet} row to an array of Objects. */ @@ -342,6 +344,18 @@ public static boolean isHeartbeatEvent(SourceRecord record) { && SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name()); } + /** + * Check whether the given source record is a transaction metadata event (BEGIN or END). + * + *

Transaction events are emitted by Debezium to mark transaction boundaries when + * provide.transaction.metadata is enabled. + */ + public static boolean isTransactionMetadataEvent(SourceRecord record) { + Schema keySchema = record.keySchema(); + return keySchema != null + && SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name()); + } + /** * Return the finished snapshot split information. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java index 5553b1ba8ce..4c530641426 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplitState; import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords; +import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; @@ -36,11 +37,17 @@ import io.debezium.relational.TableId; import io.debezium.schema.TopicSelector; import io.debezium.util.SchemaNameAdjuster; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import static io.debezium.config.CommonConnectorConfig.TRANSACTION_TOPIC; import static io.debezium.connector.mysql.MySqlConnectorConfig.SERVER_NAME; @@ -105,14 +112,201 @@ public TypeInformation getProducedType() { false); } + @Test + void testTransactionBeginEventHandling() throws Exception { + // Create a transaction BEGIN event + SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-123", 100L); + + // Verify it's detected as a transaction metadata event + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent)) + .isTrue(); + + // Create emitter and split state + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithCounter(emittedRecordsCount); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + BinlogOffset offsetBeforeEmit = splitState.getStartingOffset(); + + // Emit the transaction BEGIN event + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionBeginEvent), + readerOutput, + splitState); + + // Verify the offset was updated + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionBeginEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isNotEqualTo(offsetBeforeEmit) + .isEqualByComparingTo(expectedOffset); + + // Verify the event was emitted + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1); + } + + @Test + void testTransactionEndEventHandling() throws Exception { + // Create a transaction END event + SourceRecord transactionEndEvent = createTransactionMetadataEvent("END", "tx-123", 200L); + + // Verify it's detected as a transaction metadata event + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionEndEvent)) + .isTrue(); + + // Create emitter and split state + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithCounter(emittedRecordsCount); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + // Emit the transaction END event + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionEndEvent), + readerOutput, + splitState); + + // Verify the offset was updated + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionEndEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isEqualByComparingTo(expectedOffset); + + // Verify the event was emitted + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1); + } + + + @Test + void testNonTransactionEventNotDetected() { + // Create a regular data change event + Schema keySchema = SchemaBuilder.struct() + .field("id", Schema.INT32_SCHEMA) + .build(); + Schema valueSchema = SchemaBuilder.struct() + .field("op", Schema.STRING_SCHEMA) + .build(); + + Struct key = new Struct(keySchema).put("id", 1); + Struct value = new Struct(valueSchema).put("op", "c"); + + Map offset = new HashMap<>(); + offset.put("file", "mysql-bin.000001"); + offset.put("pos", 100L); + + SourceRecord dataRecord = new SourceRecord( + Collections.singletonMap("server", "mysql"), + offset, + "test.table", + keySchema, + key, + valueSchema, + value); + + // Verify it's NOT detected as a transaction metadata event + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(dataRecord)).isFalse(); + } + + @Test + void testTransactionEventWithoutKeySchemaNotDetected() { + // Create a record without a key schema (should not be detected as transaction event) + Schema valueSchema = SchemaBuilder.struct() + .name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME) + .field("status", Schema.STRING_SCHEMA) + .build(); + + Struct value = new Struct(valueSchema).put("status", "BEGIN"); + + Map offset = new HashMap<>(); + offset.put("file", "mysql-bin.000001"); + offset.put("pos", 100L); + + SourceRecord record = new SourceRecord( + Collections.singletonMap("server", "mysql"), + offset, + "transaction.topic", + null, // No key schema + null, + valueSchema, + value); + + // Verify it's NOT detected as a transaction metadata event + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(record)).isFalse(); + } + private MySqlBinlogSplitState createBinlogSplitState() { return new MySqlBinlogSplitState( - new MySqlBinlogSplit( - "binlog-split", - BinlogOffset.ofEarliest(), - BinlogOffset.ofNonStopping(), - Collections.emptyList(), - Collections.emptyMap(), - 0)); + new MySqlBinlogSplit( + "binlog-split", + BinlogOffset.ofEarliest(), + BinlogOffset.ofNonStopping(), + Collections.emptyList(), + Collections.emptyMap(), + 0)); } + + /** + * Helper method to create a MySqlRecordEmitter that counts emitted records. + */ + private MySqlRecordEmitter createRecordEmitterWithCounter(AtomicInteger counter) { + return new MySqlRecordEmitter<>( + new DebeziumDeserializationSchema<>() { + @Override + public void deserialize(SourceRecord record, Collector out) { + counter.incrementAndGet(); + out.collect("transaction-event"); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(String.class); + } + }, + new MySqlSourceReaderMetrics( + UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), + false); + } + + private SourceRecord createTransactionMetadataEvent( + String status, String transactionId, long position) { + Schema keySchema = SchemaBuilder.struct() + .name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME) + .field("id", Schema.STRING_SCHEMA) + .build(); + + Schema valueSchema = SchemaBuilder.struct() + .name("io.debezium.connector.common.TransactionMetadataValue") + .field("status", Schema.STRING_SCHEMA) + .field("id", Schema.STRING_SCHEMA) + .field("event_count", Schema.OPTIONAL_INT64_SCHEMA) + .field("ts_ms", Schema.INT64_SCHEMA) + .build(); + + Struct key = new Struct(keySchema).put("id", transactionId); + + Struct value = new Struct(valueSchema) + .put("status", status) + .put("id", transactionId) + .put("ts_ms", System.currentTimeMillis()); + + if ("END".equals(status)) { + value.put("event_count", 5L); + } + + Map offset = new HashMap<>(); + offset.put("file", "mysql-bin.000001"); + offset.put("pos", position); + offset.put("transaction_id", transactionId); + + return new SourceRecord( + Collections.singletonMap("server", "mysql_binlog_source"), + offset, + "mysql_binlog_source.transaction", + keySchema, + key, + valueSchema, + value); + } + } From fbd294e263aa4cf3613e0f45fbc162a6a364c697 Mon Sep 17 00:00:00 2001 From: Tejansh Rana Date: Mon, 24 Nov 2025 21:14:32 +0000 Subject: [PATCH 2/7] add option to enable/disable metadata events --- .../source/reader/MySqlPipelineRecordEmitter.java | 3 ++- .../cdc/connectors/mysql/source/MySqlSource.java | 3 ++- .../connectors/mysql/source/MySqlSourceBuilder.java | 6 ++++++ .../mysql/source/config/MySqlSourceConfig.java | 7 +++++++ .../source/config/MySqlSourceConfigFactory.java | 10 ++++++++++ .../mysql/source/reader/MySqlRecordEmitter.java | 12 +++++++++--- .../mysql/source/reader/MySqlRecordEmitterTest.java | 4 +++- .../mysql/source/reader/MySqlSourceReaderTest.java | 5 +++-- 8 files changed, 42 insertions(+), 8 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index c946b9e29ce..2bfee92ee8e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -92,7 +92,8 @@ public MySqlPipelineRecordEmitter( super( debeziumDeserializationSchema, sourceReaderMetrics, - sourceConfig.isIncludeSchemaChanges()); + sourceConfig.isIncludeSchemaChanges(), + false); // Explicitly disable transaction metadata events this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceConfig = sourceConfig; this.alreadySendCreateTableTables = new HashSet<>(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index cb06cc45a6d..b5acaf95bf8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -131,7 +131,8 @@ public static MySqlSourceBuilder builder() { new MySqlRecordEmitter<>( deserializationSchema, sourceReaderMetrics, - sourceConfig.isIncludeSchemaChanges())); + sourceConfig.isIncludeSchemaChanges(), + sourceConfig.isIncludeTransactionMetadataEvents())); } MySqlSource( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index caf316d1b4a..fbda06bccd9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -198,6 +198,12 @@ public MySqlSourceBuilder includeSchemaChanges(boolean includeSchemaChanges) return this; } + /** Whether the {@link MySqlSource} should output the transaction metadata events or not. */ + public MySqlSourceBuilder includeTransactionMetadataEvents(boolean includeTransactionMetadataEvents) { + this.configFactory.includeTransactionMetadataEvents(includeTransactionMetadataEvents); + return this; + } + /** Whether the {@link MySqlSource} should scan the newly added tables or not. */ public MySqlSourceBuilder scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 260a7cd2b5d..c0adf7eb214 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -62,6 +62,7 @@ public class MySqlSourceConfig implements Serializable { private final double distributionFactorUpper; private final double distributionFactorLower; private final boolean includeSchemaChanges; + private final boolean includeTransactionMetadataEvents; private final boolean scanNewlyAddedTableEnabled; private final boolean closeIdleReaders; private final Properties jdbcProperties; @@ -99,6 +100,7 @@ public class MySqlSourceConfig implements Serializable { double distributionFactorUpper, double distributionFactorLower, boolean includeSchemaChanges, + boolean includeTransactionMetadataEvents, boolean scanNewlyAddedTableEnabled, boolean closeIdleReaders, Properties dbzProperties, @@ -128,6 +130,7 @@ public class MySqlSourceConfig implements Serializable { this.distributionFactorUpper = distributionFactorUpper; this.distributionFactorLower = distributionFactorLower; this.includeSchemaChanges = includeSchemaChanges; + this.includeTransactionMetadataEvents = includeTransactionMetadataEvents; this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; this.closeIdleReaders = closeIdleReaders; this.dbzProperties = checkNotNull(dbzProperties); @@ -227,6 +230,10 @@ public boolean isIncludeSchemaChanges() { return includeSchemaChanges; } + public boolean isIncludeTransactionMetadataEvents() { + return includeTransactionMetadataEvents; + } + public boolean isScanNewlyAddedTableEnabled() { return scanNewlyAddedTableEnabled; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 427115edea7..8715c8b6f61 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -63,6 +63,7 @@ public class MySqlSourceConfigFactory implements Serializable { private double distributionFactorLower = MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(); private boolean includeSchemaChanges = false; + private boolean includeTransactionMetadataEvents = false; private boolean scanNewlyAddedTableEnabled = false; private boolean closeIdleReaders = false; private Properties jdbcProperties; @@ -235,6 +236,12 @@ public MySqlSourceConfigFactory includeSchemaChanges(boolean includeSchemaChange return this; } + /** Whether the {@link MySqlSource} should output the transaction metadata events or not. */ + public MySqlSourceConfigFactory includeTransactionMetadataEvents(boolean includeTransactionMetadataEvents) { + this.includeTransactionMetadataEvents = includeTransactionMetadataEvents; + return this; + } + /** Whether the {@link MySqlSource} should scan the newly added tables or not. */ public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; @@ -359,6 +366,8 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { // Note: the includeSchemaChanges parameter is used to control emitting the schema record, // only DataStream API program need to emit the schema record, the Table API need not props.setProperty("include.schema.changes", String.valueOf(true)); + // enable transaction metadata if includeTransactionMetadataEvents is true + props.setProperty("provide.transaction.metadata", String.valueOf(includeTransactionMetadataEvents)); // disable the offset flush totally props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE)); // disable tombstones @@ -412,6 +421,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + includeTransactionMetadataEvents, scanNewlyAddedTableEnabled, closeIdleReaders, props, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index 06801a659b7..6979a4486b1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -53,15 +53,18 @@ public class MySqlRecordEmitter implements RecordEmitter debeziumDeserializationSchema; private final MySqlSourceReaderMetrics sourceReaderMetrics; private final boolean includeSchemaChanges; + private final boolean includeTransactionMetadataEvents; private final OutputCollector outputCollector; public MySqlRecordEmitter( DebeziumDeserializationSchema debeziumDeserializationSchema, MySqlSourceReaderMetrics sourceReaderMetrics, - boolean includeSchemaChanges) { + boolean includeSchemaChanges, + boolean includeTransactionMetadataEvents) { this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceReaderMetrics = sourceReaderMetrics; this.includeSchemaChanges = includeSchemaChanges; + this.includeTransactionMetadataEvents = includeTransactionMetadataEvents; this.outputCollector = new OutputCollector<>(); } @@ -102,9 +105,12 @@ protected void processElement( emitElement(element, output); } else if (RecordUtils.isHeartbeatEvent(element)) { updateStartingOffsetForSplit(splitState, element); + //emitElement(element, output); } else if (RecordUtils.isTransactionMetadataEvent(element)) { - updateStartingOffsetForSplit(splitState, element); - emitElement(element, output); + updateStartingOffsetForSplit(splitState, element); + if (includeTransactionMetadataEvents) { + emitElement(element, output); + } } else { // unknown element LOG.info("Meet unknown element {}, just skip.", element); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java index 4c530641426..07a0d35d532 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java @@ -109,6 +109,7 @@ public TypeInformation getProducedType() { }, new MySqlSourceReaderMetrics( UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), + false, false); } @@ -265,7 +266,8 @@ public TypeInformation getProducedType() { }, new MySqlSourceReaderMetrics( UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), - false); + false, + true); } private SourceRecord createTransactionMetadataEvent( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 7b8dfdcdbb2..509e45872a0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -573,7 +573,8 @@ private MySqlSourceReader createReader( : new MySqlRecordEmitter<>( new ForwardDeserializeSchema(), new MySqlSourceReaderMetrics(readerContext.metricGroup()), - configuration.isIncludeSchemaChanges()); + configuration.isIncludeSchemaChanges(), + configuration.isIncludeTransactionMetadataEvents()); final MySqlSourceReaderContext mySqlSourceReaderContext = new MySqlSourceReaderContext(readerContext); return new MySqlSourceReader<>( @@ -740,7 +741,7 @@ public MysqlLimitedRecordEmitter( MySqlSourceReaderMetrics sourceReaderMetrics, boolean includeSchemaChanges, int limit) { - super(debeziumDeserializationSchema, sourceReaderMetrics, includeSchemaChanges); + super(debeziumDeserializationSchema, sourceReaderMetrics, includeSchemaChanges, false); this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceReaderMetrics = sourceReaderMetrics; this.includeSchemaChanges = includeSchemaChanges; From 46581dc111242b6b058a70957d7a8e66b7523484 Mon Sep 17 00:00:00 2001 From: Tejansh Rana Date: Mon, 24 Nov 2025 21:32:29 +0000 Subject: [PATCH 3/7] test updates --- .../source/reader/MySqlRecordEmitterTest.java | 193 ++++++++++++++++-- 1 file changed, 172 insertions(+), 21 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java index 07a0d35d532..38b54c61f36 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java @@ -96,40 +96,98 @@ record -> { private MySqlRecordEmitter createRecordEmitter() { return new MySqlRecordEmitter<>( - new DebeziumDeserializationSchema() { - @Override - public void deserialize(SourceRecord record, Collector out) { - throw new UnsupportedOperationException(); - } + new DebeziumDeserializationSchema<>() { + @Override + public void deserialize(SourceRecord record, Collector out) { + throw new UnsupportedOperationException(); + } - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(Void.class); - } - }, + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(Void.class); + } + }, new MySqlSourceReaderMetrics( UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), false, false); } + @Test + void testTransactionMetadataEventsDisabledByDefault() throws Exception { + SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-123", 100L); + + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent)) + .isTrue(); + + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + BinlogOffset offsetBeforeEmit = splitState.getStartingOffset(); + + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionBeginEvent), + readerOutput, + splitState); + + // Verify the offset was updated (this should always happen) + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionBeginEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isNotEqualTo(offsetBeforeEmit) + .isEqualByComparingTo(expectedOffset); + + // Verify the event was NOT emitted (because includeTransactionMetadataEvents=false) + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(0); + Assertions.assertThat(readerOutput.getEmittedRecords()).isEmpty(); + } + + @Test + void testTransactionMetadataEventsEnabledExplicitly() throws Exception { + SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-456", 150L); + + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent)) + .isTrue(); + + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, true); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + BinlogOffset offsetBeforeEmit = splitState.getStartingOffset(); + + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionBeginEvent), + readerOutput, + splitState); + + // Verify the offset was updated + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionBeginEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isNotEqualTo(offsetBeforeEmit) + .isEqualByComparingTo(expectedOffset); + + // Verify the event was emitted (because includeTransactionMetadataEvents=true) + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1); + Assertions.assertThat(readerOutput.getEmittedRecords()).hasSize(1); + } + @Test void testTransactionBeginEventHandling() throws Exception { - // Create a transaction BEGIN event SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-123", 100L); - // Verify it's detected as a transaction metadata event Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent)) .isTrue(); - // Create emitter and split state AtomicInteger emittedRecordsCount = new AtomicInteger(0); MySqlRecordEmitter recordEmitter = createRecordEmitterWithCounter(emittedRecordsCount); MySqlBinlogSplitState splitState = createBinlogSplitState(); BinlogOffset offsetBeforeEmit = splitState.getStartingOffset(); - // Emit the transaction BEGIN event TestingReaderOutput readerOutput = new TestingReaderOutput<>(); recordEmitter.emitRecord( SourceRecords.fromSingleRecord(transactionBeginEvent), @@ -149,19 +207,15 @@ void testTransactionBeginEventHandling() throws Exception { @Test void testTransactionEndEventHandling() throws Exception { - // Create a transaction END event SourceRecord transactionEndEvent = createTransactionMetadataEvent("END", "tx-123", 200L); - // Verify it's detected as a transaction metadata event Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionEndEvent)) .isTrue(); - // Create emitter and split state AtomicInteger emittedRecordsCount = new AtomicInteger(0); MySqlRecordEmitter recordEmitter = createRecordEmitterWithCounter(emittedRecordsCount); MySqlBinlogSplitState splitState = createBinlogSplitState(); - // Emit the transaction END event TestingReaderOutput readerOutput = new TestingReaderOutput<>(); recordEmitter.emitRecord( SourceRecords.fromSingleRecord(transactionEndEvent), @@ -181,7 +235,6 @@ void testTransactionEndEventHandling() throws Exception { @Test void testNonTransactionEventNotDetected() { - // Create a regular data change event Schema keySchema = SchemaBuilder.struct() .field("id", Schema.INT32_SCHEMA) .build(); @@ -211,7 +264,6 @@ void testNonTransactionEventNotDetected() { @Test void testTransactionEventWithoutKeySchemaNotDetected() { - // Create a record without a key schema (should not be detected as transaction event) Schema valueSchema = SchemaBuilder.struct() .name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME) .field("status", Schema.STRING_SCHEMA) @@ -236,6 +288,64 @@ void testTransactionEventWithoutKeySchemaNotDetected() { Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(record)).isFalse(); } + @Test + void testMultipleTransactionEventsWithDisabledConfig() throws Exception { + SourceRecord beginEvent = createTransactionMetadataEvent("BEGIN", "tx-789", 300L); + SourceRecord endEvent = createTransactionMetadataEvent("END", "tx-789", 400L); + + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(beginEvent), + readerOutput, + splitState); + + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(endEvent), + readerOutput, + splitState); + + // Verify offsets were updated but no events were emitted + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(endEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isEqualByComparingTo(expectedOffset); + + // Verify no events were emitted (because includeTransactionMetadataEvents=false) + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(0); + Assertions.assertThat(readerOutput.getEmittedRecords()).isEmpty(); + } + + @Test + void testMixedEventsWithTransactionMetadataDisabled() throws Exception { + SourceRecord transactionEvent = createTransactionMetadataEvent("BEGIN", "tx-mixed", 500L); + SourceRecord dataEvent = createDataChangeEvent("test.table", 501L); + + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionEvent), + readerOutput, + splitState); + + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(dataEvent), + readerOutput, + splitState); + + // Verify only data event was emitted (count=1, not 2) + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1); + Assertions.assertThat(readerOutput.getEmittedRecords()).hasSize(1); + } + private MySqlBinlogSplitState createBinlogSplitState() { return new MySqlBinlogSplitState( new MySqlBinlogSplit( @@ -251,6 +361,13 @@ private MySqlBinlogSplitState createBinlogSplitState() { * Helper method to create a MySqlRecordEmitter that counts emitted records. */ private MySqlRecordEmitter createRecordEmitterWithCounter(AtomicInteger counter) { + return createRecordEmitterWithTransactionConfig(counter, true); + } + + /** + * Helper method to create a MySqlRecordEmitter with configurable transaction metadata events. + */ + private MySqlRecordEmitter createRecordEmitterWithTransactionConfig(AtomicInteger counter, boolean includeTransactionMetadataEvents) { return new MySqlRecordEmitter<>( new DebeziumDeserializationSchema<>() { @Override @@ -267,7 +384,7 @@ public TypeInformation getProducedType() { new MySqlSourceReaderMetrics( UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), false, - true); + includeTransactionMetadataEvents); } private SourceRecord createTransactionMetadataEvent( @@ -311,4 +428,38 @@ private SourceRecord createTransactionMetadataEvent( value); } + private SourceRecord createDataChangeEvent(String topicName, long position) { + Schema keySchema = SchemaBuilder.struct() + .field("id", Schema.INT32_SCHEMA) + .build(); + Schema valueSchema = SchemaBuilder.struct() + .field("op", Schema.STRING_SCHEMA) + .field("after", SchemaBuilder.struct() + .field("id", Schema.INT32_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .optional()) + .build(); + + Struct key = new Struct(keySchema).put("id", 1); + Struct after = new Struct(valueSchema.field("after").schema()) + .put("id", 1) + .put("name", "test"); + Struct value = new Struct(valueSchema) + .put("op", "c") + .put("after", after); + + Map offset = new HashMap<>(); + offset.put("file", "mysql-bin.000001"); + offset.put("pos", position); + + return new SourceRecord( + Collections.singletonMap("server", "mysql"), + offset, + topicName, + keySchema, + key, + valueSchema, + value); + } + } From c6ed5f3988ee268c8caf49ed58c973443a35b0fd Mon Sep 17 00:00:00 2001 From: Tejansh Rana Date: Mon, 24 Nov 2025 21:56:07 +0000 Subject: [PATCH 4/7] clean up --- .../cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index 6979a4486b1..02d35380903 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -105,7 +105,6 @@ protected void processElement( emitElement(element, output); } else if (RecordUtils.isHeartbeatEvent(element)) { updateStartingOffsetForSplit(splitState, element); - //emitElement(element, output); } else if (RecordUtils.isTransactionMetadataEvent(element)) { updateStartingOffsetForSplit(splitState, element); if (includeTransactionMetadataEvents) { From aa0a858d1b8758c5ccbf2b4dc4527527aeecacfc Mon Sep 17 00:00:00 2001 From: Tejansh Rana Date: Tue, 16 Dec 2025 10:51:43 +0000 Subject: [PATCH 5/7] formatting fixes --- .../reader/MySqlPipelineRecordEmitter.java | 2 +- .../mysql/source/MySqlSourceBuilder.java | 3 +- .../config/MySqlSourceConfigFactory.java | 6 +- .../mysql/source/utils/RecordUtils.java | 4 +- .../source/reader/MySqlRecordEmitterTest.java | 251 +++++++++--------- 5 files changed, 129 insertions(+), 137 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index 2bfee92ee8e..ac26e587392 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -93,7 +93,7 @@ public MySqlPipelineRecordEmitter( debeziumDeserializationSchema, sourceReaderMetrics, sourceConfig.isIncludeSchemaChanges(), - false); // Explicitly disable transaction metadata events + false); // Explicitly disable transaction metadata events this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceConfig = sourceConfig; this.alreadySendCreateTableTables = new HashSet<>(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index fbda06bccd9..b1b1a00b937 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -199,7 +199,8 @@ public MySqlSourceBuilder includeSchemaChanges(boolean includeSchemaChanges) } /** Whether the {@link MySqlSource} should output the transaction metadata events or not. */ - public MySqlSourceBuilder includeTransactionMetadataEvents(boolean includeTransactionMetadataEvents) { + public MySqlSourceBuilder includeTransactionMetadataEvents( + boolean includeTransactionMetadataEvents) { this.configFactory.includeTransactionMetadataEvents(includeTransactionMetadataEvents); return this; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 8715c8b6f61..eb57e315447 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -237,7 +237,8 @@ public MySqlSourceConfigFactory includeSchemaChanges(boolean includeSchemaChange } /** Whether the {@link MySqlSource} should output the transaction metadata events or not. */ - public MySqlSourceConfigFactory includeTransactionMetadataEvents(boolean includeTransactionMetadataEvents) { + public MySqlSourceConfigFactory includeTransactionMetadataEvents( + boolean includeTransactionMetadataEvents) { this.includeTransactionMetadataEvents = includeTransactionMetadataEvents; return this; } @@ -367,7 +368,8 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { // only DataStream API program need to emit the schema record, the Table API need not props.setProperty("include.schema.changes", String.valueOf(true)); // enable transaction metadata if includeTransactionMetadataEvents is true - props.setProperty("provide.transaction.metadata", String.valueOf(includeTransactionMetadataEvents)); + props.setProperty( + "provide.transaction.metadata", String.valueOf(includeTransactionMetadataEvents)); // disable the offset flush totally props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE)); // disable tombstones diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index 6bdad7f3c5c..bf4d4f29f56 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -348,9 +348,9 @@ public static boolean isHeartbeatEvent(SourceRecord record) { * provide.transaction.metadata is enabled. */ public static boolean isTransactionMetadataEvent(SourceRecord record) { - Schema keySchema = record.keySchema(); + Schema keySchema = record.keySchema(); return keySchema != null - && SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name()); + && SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name()); } /** diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java index 38b54c61f36..952b16eb256 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java @@ -96,17 +96,17 @@ record -> { private MySqlRecordEmitter createRecordEmitter() { return new MySqlRecordEmitter<>( - new DebeziumDeserializationSchema<>() { - @Override - public void deserialize(SourceRecord record, Collector out) { - throw new UnsupportedOperationException(); - } - - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(Void.class); - } - }, + new DebeziumDeserializationSchema<>() { + @Override + public void deserialize(SourceRecord record, Collector out) { + throw new UnsupportedOperationException(); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(Void.class); + } + }, new MySqlSourceReaderMetrics( UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), false, @@ -115,22 +115,22 @@ public TypeInformation getProducedType() { @Test void testTransactionMetadataEventsDisabledByDefault() throws Exception { - SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-123", 100L); + SourceRecord transactionBeginEvent = + createTransactionMetadataEvent("BEGIN", "tx-123", 100L); Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent)) .isTrue(); AtomicInteger emittedRecordsCount = new AtomicInteger(0); - MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); + MySqlRecordEmitter recordEmitter = + createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); MySqlBinlogSplitState splitState = createBinlogSplitState(); BinlogOffset offsetBeforeEmit = splitState.getStartingOffset(); TestingReaderOutput readerOutput = new TestingReaderOutput<>(); recordEmitter.emitRecord( - SourceRecords.fromSingleRecord(transactionBeginEvent), - readerOutput, - splitState); + SourceRecords.fromSingleRecord(transactionBeginEvent), readerOutput, splitState); // Verify the offset was updated (this should always happen) BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionBeginEvent); @@ -146,22 +146,22 @@ void testTransactionMetadataEventsDisabledByDefault() throws Exception { @Test void testTransactionMetadataEventsEnabledExplicitly() throws Exception { - SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-456", 150L); + SourceRecord transactionBeginEvent = + createTransactionMetadataEvent("BEGIN", "tx-456", 150L); Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent)) .isTrue(); AtomicInteger emittedRecordsCount = new AtomicInteger(0); - MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, true); + MySqlRecordEmitter recordEmitter = + createRecordEmitterWithTransactionConfig(emittedRecordsCount, true); MySqlBinlogSplitState splitState = createBinlogSplitState(); BinlogOffset offsetBeforeEmit = splitState.getStartingOffset(); TestingReaderOutput readerOutput = new TestingReaderOutput<>(); recordEmitter.emitRecord( - SourceRecords.fromSingleRecord(transactionBeginEvent), - readerOutput, - splitState); + SourceRecords.fromSingleRecord(transactionBeginEvent), readerOutput, splitState); // Verify the offset was updated BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionBeginEvent); @@ -177,22 +177,22 @@ void testTransactionMetadataEventsEnabledExplicitly() throws Exception { @Test void testTransactionBeginEventHandling() throws Exception { - SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-123", 100L); + SourceRecord transactionBeginEvent = + createTransactionMetadataEvent("BEGIN", "tx-123", 100L); Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent)) .isTrue(); AtomicInteger emittedRecordsCount = new AtomicInteger(0); - MySqlRecordEmitter recordEmitter = createRecordEmitterWithCounter(emittedRecordsCount); + MySqlRecordEmitter recordEmitter = + createRecordEmitterWithCounter(emittedRecordsCount); MySqlBinlogSplitState splitState = createBinlogSplitState(); BinlogOffset offsetBeforeEmit = splitState.getStartingOffset(); TestingReaderOutput readerOutput = new TestingReaderOutput<>(); recordEmitter.emitRecord( - SourceRecords.fromSingleRecord(transactionBeginEvent), - readerOutput, - splitState); + SourceRecords.fromSingleRecord(transactionBeginEvent), readerOutput, splitState); // Verify the offset was updated BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionBeginEvent); @@ -209,18 +209,16 @@ void testTransactionBeginEventHandling() throws Exception { void testTransactionEndEventHandling() throws Exception { SourceRecord transactionEndEvent = createTransactionMetadataEvent("END", "tx-123", 200L); - Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionEndEvent)) - .isTrue(); + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionEndEvent)).isTrue(); AtomicInteger emittedRecordsCount = new AtomicInteger(0); - MySqlRecordEmitter recordEmitter = createRecordEmitterWithCounter(emittedRecordsCount); + MySqlRecordEmitter recordEmitter = + createRecordEmitterWithCounter(emittedRecordsCount); MySqlBinlogSplitState splitState = createBinlogSplitState(); TestingReaderOutput readerOutput = new TestingReaderOutput<>(); recordEmitter.emitRecord( - SourceRecords.fromSingleRecord(transactionEndEvent), - readerOutput, - splitState); + SourceRecords.fromSingleRecord(transactionEndEvent), readerOutput, splitState); // Verify the offset was updated BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionEndEvent); @@ -232,15 +230,10 @@ void testTransactionEndEventHandling() throws Exception { Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1); } - @Test void testNonTransactionEventNotDetected() { - Schema keySchema = SchemaBuilder.struct() - .field("id", Schema.INT32_SCHEMA) - .build(); - Schema valueSchema = SchemaBuilder.struct() - .field("op", Schema.STRING_SCHEMA) - .build(); + Schema keySchema = SchemaBuilder.struct().field("id", Schema.INT32_SCHEMA).build(); + Schema valueSchema = SchemaBuilder.struct().field("op", Schema.STRING_SCHEMA).build(); Struct key = new Struct(keySchema).put("id", 1); Struct value = new Struct(valueSchema).put("op", "c"); @@ -249,14 +242,15 @@ void testNonTransactionEventNotDetected() { offset.put("file", "mysql-bin.000001"); offset.put("pos", 100L); - SourceRecord dataRecord = new SourceRecord( - Collections.singletonMap("server", "mysql"), - offset, - "test.table", - keySchema, - key, - valueSchema, - value); + SourceRecord dataRecord = + new SourceRecord( + Collections.singletonMap("server", "mysql"), + offset, + "test.table", + keySchema, + key, + valueSchema, + value); // Verify it's NOT detected as a transaction metadata event Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(dataRecord)).isFalse(); @@ -264,10 +258,11 @@ void testNonTransactionEventNotDetected() { @Test void testTransactionEventWithoutKeySchemaNotDetected() { - Schema valueSchema = SchemaBuilder.struct() - .name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME) - .field("status", Schema.STRING_SCHEMA) - .build(); + Schema valueSchema = + SchemaBuilder.struct() + .name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME) + .field("status", Schema.STRING_SCHEMA) + .build(); Struct value = new Struct(valueSchema).put("status", "BEGIN"); @@ -275,14 +270,15 @@ void testTransactionEventWithoutKeySchemaNotDetected() { offset.put("file", "mysql-bin.000001"); offset.put("pos", 100L); - SourceRecord record = new SourceRecord( - Collections.singletonMap("server", "mysql"), - offset, - "transaction.topic", - null, // No key schema - null, - valueSchema, - value); + SourceRecord record = + new SourceRecord( + Collections.singletonMap("server", "mysql"), + offset, + "transaction.topic", + null, // No key schema + null, + valueSchema, + value); // Verify it's NOT detected as a transaction metadata event Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(record)).isFalse(); @@ -294,20 +290,17 @@ void testMultipleTransactionEventsWithDisabledConfig() throws Exception { SourceRecord endEvent = createTransactionMetadataEvent("END", "tx-789", 400L); AtomicInteger emittedRecordsCount = new AtomicInteger(0); - MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); + MySqlRecordEmitter recordEmitter = + createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); MySqlBinlogSplitState splitState = createBinlogSplitState(); TestingReaderOutput readerOutput = new TestingReaderOutput<>(); - + recordEmitter.emitRecord( - SourceRecords.fromSingleRecord(beginEvent), - readerOutput, - splitState); + SourceRecords.fromSingleRecord(beginEvent), readerOutput, splitState); recordEmitter.emitRecord( - SourceRecords.fromSingleRecord(endEvent), - readerOutput, - splitState); + SourceRecords.fromSingleRecord(endEvent), readerOutput, splitState); // Verify offsets were updated but no events were emitted BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(endEvent); @@ -320,26 +313,23 @@ void testMultipleTransactionEventsWithDisabledConfig() throws Exception { Assertions.assertThat(readerOutput.getEmittedRecords()).isEmpty(); } - @Test + @Test void testMixedEventsWithTransactionMetadataDisabled() throws Exception { SourceRecord transactionEvent = createTransactionMetadataEvent("BEGIN", "tx-mixed", 500L); SourceRecord dataEvent = createDataChangeEvent("test.table", 501L); AtomicInteger emittedRecordsCount = new AtomicInteger(0); - MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); + MySqlRecordEmitter recordEmitter = + createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); MySqlBinlogSplitState splitState = createBinlogSplitState(); TestingReaderOutput readerOutput = new TestingReaderOutput<>(); recordEmitter.emitRecord( - SourceRecords.fromSingleRecord(transactionEvent), - readerOutput, - splitState); + SourceRecords.fromSingleRecord(transactionEvent), readerOutput, splitState); recordEmitter.emitRecord( - SourceRecords.fromSingleRecord(dataEvent), - readerOutput, - splitState); + SourceRecords.fromSingleRecord(dataEvent), readerOutput, splitState); // Verify only data event was emitted (count=1, not 2) Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1); @@ -348,18 +338,16 @@ void testMixedEventsWithTransactionMetadataDisabled() throws Exception { private MySqlBinlogSplitState createBinlogSplitState() { return new MySqlBinlogSplitState( - new MySqlBinlogSplit( - "binlog-split", - BinlogOffset.ofEarliest(), - BinlogOffset.ofNonStopping(), - Collections.emptyList(), - Collections.emptyMap(), - 0)); + new MySqlBinlogSplit( + "binlog-split", + BinlogOffset.ofEarliest(), + BinlogOffset.ofNonStopping(), + Collections.emptyList(), + Collections.emptyMap(), + 0)); } - /** - * Helper method to create a MySqlRecordEmitter that counts emitted records. - */ + /** Helper method to create a MySqlRecordEmitter that counts emitted records. */ private MySqlRecordEmitter createRecordEmitterWithCounter(AtomicInteger counter) { return createRecordEmitterWithTransactionConfig(counter, true); } @@ -367,20 +355,21 @@ private MySqlRecordEmitter createRecordEmitterWithCounter(AtomicInteger /** * Helper method to create a MySqlRecordEmitter with configurable transaction metadata events. */ - private MySqlRecordEmitter createRecordEmitterWithTransactionConfig(AtomicInteger counter, boolean includeTransactionMetadataEvents) { + private MySqlRecordEmitter createRecordEmitterWithTransactionConfig( + AtomicInteger counter, boolean includeTransactionMetadataEvents) { return new MySqlRecordEmitter<>( - new DebeziumDeserializationSchema<>() { - @Override - public void deserialize(SourceRecord record, Collector out) { - counter.incrementAndGet(); - out.collect("transaction-event"); - } - - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(String.class); - } - }, + new DebeziumDeserializationSchema<>() { + @Override + public void deserialize(SourceRecord record, Collector out) { + counter.incrementAndGet(); + out.collect("transaction-event"); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(String.class); + } + }, new MySqlSourceReaderMetrics( UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), false, @@ -389,25 +378,28 @@ public TypeInformation getProducedType() { private SourceRecord createTransactionMetadataEvent( String status, String transactionId, long position) { - Schema keySchema = SchemaBuilder.struct() - .name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME) - .field("id", Schema.STRING_SCHEMA) - .build(); - - Schema valueSchema = SchemaBuilder.struct() - .name("io.debezium.connector.common.TransactionMetadataValue") - .field("status", Schema.STRING_SCHEMA) - .field("id", Schema.STRING_SCHEMA) - .field("event_count", Schema.OPTIONAL_INT64_SCHEMA) - .field("ts_ms", Schema.INT64_SCHEMA) - .build(); + Schema keySchema = + SchemaBuilder.struct() + .name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME) + .field("id", Schema.STRING_SCHEMA) + .build(); + + Schema valueSchema = + SchemaBuilder.struct() + .name("io.debezium.connector.common.TransactionMetadataValue") + .field("status", Schema.STRING_SCHEMA) + .field("id", Schema.STRING_SCHEMA) + .field("event_count", Schema.OPTIONAL_INT64_SCHEMA) + .field("ts_ms", Schema.INT64_SCHEMA) + .build(); Struct key = new Struct(keySchema).put("id", transactionId); - Struct value = new Struct(valueSchema) - .put("status", status) - .put("id", transactionId) - .put("ts_ms", System.currentTimeMillis()); + Struct value = + new Struct(valueSchema) + .put("status", status) + .put("id", transactionId) + .put("ts_ms", System.currentTimeMillis()); if ("END".equals(status)) { value.put("event_count", 5L); @@ -429,24 +421,22 @@ private SourceRecord createTransactionMetadataEvent( } private SourceRecord createDataChangeEvent(String topicName, long position) { - Schema keySchema = SchemaBuilder.struct() - .field("id", Schema.INT32_SCHEMA) - .build(); - Schema valueSchema = SchemaBuilder.struct() - .field("op", Schema.STRING_SCHEMA) - .field("after", SchemaBuilder.struct() - .field("id", Schema.INT32_SCHEMA) - .field("name", Schema.STRING_SCHEMA) - .optional()) - .build(); + Schema keySchema = SchemaBuilder.struct().field("id", Schema.INT32_SCHEMA).build(); + Schema valueSchema = + SchemaBuilder.struct() + .field("op", Schema.STRING_SCHEMA) + .field( + "after", + SchemaBuilder.struct() + .field("id", Schema.INT32_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .optional()) + .build(); Struct key = new Struct(keySchema).put("id", 1); - Struct after = new Struct(valueSchema.field("after").schema()) - .put("id", 1) - .put("name", "test"); - Struct value = new Struct(valueSchema) - .put("op", "c") - .put("after", after); + Struct after = + new Struct(valueSchema.field("after").schema()).put("id", 1).put("name", "test"); + Struct value = new Struct(valueSchema).put("op", "c").put("after", after); Map offset = new HashMap<>(); offset.put("file", "mysql-bin.000001"); @@ -461,5 +451,4 @@ private SourceRecord createDataChangeEvent(String topicName, long position) { valueSchema, value); } - } From 2f50a50d81edbab149904a59385b5098565e47ab Mon Sep 17 00:00:00 2001 From: Tejansh Rana Date: Tue, 23 Dec 2025 12:45:00 +0000 Subject: [PATCH 6/7] merge conflicts; java 8 fix; spotless fix --- .../mysql/source/reader/MySqlPipelineRecordEmitter.java | 2 +- .../connectors/mysql/source/config/MySqlSourceConfig.java | 3 ++- .../mysql/source/config/MySqlSourceConfigFactory.java | 5 +++-- .../mysql/source/reader/MySqlRecordEmitterTest.java | 8 +++++--- .../mysql/source/reader/MySqlSourceReaderTest.java | 2 +- 5 files changed, 12 insertions(+), 8 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index d9e502872d4..eb9e658cf6f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -93,7 +93,7 @@ public MySqlPipelineRecordEmitter( debeziumDeserializationSchema, sourceReaderMetrics, sourceConfig.isIncludeSchemaChanges(), - false, // Explicitly disable heartbeat events + false, // Explicitly disable heartbeat events false); // Explicitly disable transaction metadata events this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceConfig = sourceConfig; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 99dff91b646..cf456fcaed0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -236,9 +236,10 @@ public boolean isIncludeSchemaChanges() { public boolean isIncludeHeartbeatEvents() { return includeHeartbeatEvents; } - + public boolean isIncludeTransactionMetadataEvents() { return includeTransactionMetadataEvents; + } public boolean isScanNewlyAddedTableEnabled() { return scanNewlyAddedTableEnabled; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 705aa41ed6d..2710aaa175f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -242,13 +242,14 @@ public MySqlSourceConfigFactory includeHeartbeatEvents(boolean includeHeartbeatE this.includeHeartbeatEvents = includeHeartbeatEvents; return this; } - + /** Whether the {@link MySqlSource} should output the transaction metadata events or not. */ public MySqlSourceConfigFactory includeTransactionMetadataEvents( boolean includeTransactionMetadataEvents) { this.includeTransactionMetadataEvents = includeTransactionMetadataEvents; + return this; } - + /** Whether the {@link MySqlSource} should scan the newly added tables or not. */ public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java index 952b16eb256..63150d99854 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java @@ -96,7 +96,7 @@ record -> { private MySqlRecordEmitter createRecordEmitter() { return new MySqlRecordEmitter<>( - new DebeziumDeserializationSchema<>() { + new DebeziumDeserializationSchema() { @Override public void deserialize(SourceRecord record, Collector out) { throw new UnsupportedOperationException(); @@ -110,7 +110,8 @@ public TypeInformation getProducedType() { new MySqlSourceReaderMetrics( UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), false, - false); + false, + false); } @Test @@ -358,7 +359,7 @@ private MySqlRecordEmitter createRecordEmitterWithCounter(AtomicInteger private MySqlRecordEmitter createRecordEmitterWithTransactionConfig( AtomicInteger counter, boolean includeTransactionMetadataEvents) { return new MySqlRecordEmitter<>( - new DebeziumDeserializationSchema<>() { + new DebeziumDeserializationSchema() { @Override public void deserialize(SourceRecord record, Collector out) { counter.incrementAndGet(); @@ -373,6 +374,7 @@ public TypeInformation getProducedType() { new MySqlSourceReaderMetrics( UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), false, + false, includeTransactionMetadataEvents); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 239cd9f3d59..be0f69e8697 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -742,7 +742,7 @@ public MysqlLimitedRecordEmitter( MySqlSourceReaderMetrics sourceReaderMetrics, boolean includeSchemaChanges, int limit) { - super(debeziumDeserializationSchema, sourceReaderMetrics, includeSchemaChanges, false); + super(debeziumDeserializationSchema, sourceReaderMetrics, includeSchemaChanges, false, false); this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceReaderMetrics = sourceReaderMetrics; this.includeSchemaChanges = includeSchemaChanges; From 240104f92827638c173e6d78e5bb9835c679c6f2 Mon Sep 17 00:00:00 2001 From: Tejansh Rana Date: Tue, 27 Jan 2026 13:19:04 +0000 Subject: [PATCH 7/7] review comment; formatting --- .../mysql/source/config/MySqlSourceConfigFactory.java | 5 ++++- .../mysql/source/reader/MySqlRecordEmitterTest.java | 2 +- .../mysql/source/reader/MySqlSourceReaderTest.java | 7 ++++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 2710aaa175f..569b62232db 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -23,6 +23,8 @@ import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.table.catalog.ObjectPath; +import io.debezium.config.CommonConnectorConfig; + import java.io.Serializable; import java.time.Duration; import java.time.ZoneId; @@ -376,7 +378,8 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { props.setProperty("include.schema.changes", String.valueOf(true)); // enable transaction metadata if includeTransactionMetadataEvents is true props.setProperty( - "provide.transaction.metadata", String.valueOf(includeTransactionMetadataEvents)); + CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA.name(), + String.valueOf(includeTransactionMetadataEvents)); // disable the offset flush totally props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE)); // disable tombstones diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java index 63150d99854..22167e0e5cc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java @@ -111,7 +111,7 @@ public TypeInformation getProducedType() { UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), false, false, - false); + false); } @Test diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index be0f69e8697..7c9ce5ba94b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -742,7 +742,12 @@ public MysqlLimitedRecordEmitter( MySqlSourceReaderMetrics sourceReaderMetrics, boolean includeSchemaChanges, int limit) { - super(debeziumDeserializationSchema, sourceReaderMetrics, includeSchemaChanges, false, false); + super( + debeziumDeserializationSchema, + sourceReaderMetrics, + includeSchemaChanges, + false, + false); this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceReaderMetrics = sourceReaderMetrics; this.includeSchemaChanges = includeSchemaChanges;