Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ protected void processElement(
emitElement(element, output);
} else if (RecordUtils.isHeartbeatEvent(element)) {
updateStartingOffsetForSplit(splitState, element);
} else if (RecordUtils.isTransactionMetadataEvent(element)) {
Comment thread
tejanshrana marked this conversation as resolved.
updateStartingOffsetForSplit(splitState, element);
emitElement(element, output);
} else {
// unknown element
LOG.info("Meet unknown element {}, just skip.", element);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ private RecordUtils() {}
"io.debezium.connector.mysql.SchemaChangeKey";
public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME =
"io.debezium.connector.common.Heartbeat";
public static final String SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME =
"io.debezium.connector.common.TransactionMetadataKey";
private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader();

/** Converts a {@link ResultSet} row to an array of Objects. */
Expand Down Expand Up @@ -339,6 +341,18 @@ public static boolean isHeartbeatEvent(SourceRecord record) {
&& SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
}

/**
* Check whether the given source record is a transaction metadata event (BEGIN or END).
*
* <p>Transaction events are emitted by Debezium to mark transaction boundaries when
* provide.transaction.metadata is enabled.
*/
public static boolean isTransactionMetadataEvent(SourceRecord record) {
Schema keySchema = record.keySchema();
return keySchema != null
&& SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
}

/**
* Return the finished snapshot split information.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplitState;
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
Expand All @@ -36,11 +37,17 @@
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static io.debezium.config.CommonConnectorConfig.TRANSACTION_TOPIC;
import static io.debezium.connector.mysql.MySqlConnectorConfig.SERVER_NAME;
Expand Down Expand Up @@ -105,14 +112,201 @@ public TypeInformation<Void> getProducedType() {
false);
}

@Test
void testTransactionBeginEventHandling() throws Exception {
// Create a transaction BEGIN event
SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-123", 100L);

// Verify it's detected as a transaction metadata event
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent))
.isTrue();

// Create emitter and split state
AtomicInteger emittedRecordsCount = new AtomicInteger(0);
MySqlRecordEmitter<String> recordEmitter = createRecordEmitterWithCounter(emittedRecordsCount);
MySqlBinlogSplitState splitState = createBinlogSplitState();

BinlogOffset offsetBeforeEmit = splitState.getStartingOffset();

// Emit the transaction BEGIN event
TestingReaderOutput<String> readerOutput = new TestingReaderOutput<>();
recordEmitter.emitRecord(
SourceRecords.fromSingleRecord(transactionBeginEvent),
readerOutput,
splitState);

// Verify the offset was updated
BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionBeginEvent);
Assertions.assertThat(splitState.getStartingOffset())
.isNotNull()
.isNotEqualTo(offsetBeforeEmit)
.isEqualByComparingTo(expectedOffset);

// Verify the event was emitted
Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1);
}

@Test
void testTransactionEndEventHandling() throws Exception {
// Create a transaction END event
SourceRecord transactionEndEvent = createTransactionMetadataEvent("END", "tx-123", 200L);

// Verify it's detected as a transaction metadata event
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionEndEvent))
.isTrue();

// Create emitter and split state
AtomicInteger emittedRecordsCount = new AtomicInteger(0);
MySqlRecordEmitter<String> recordEmitter = createRecordEmitterWithCounter(emittedRecordsCount);
MySqlBinlogSplitState splitState = createBinlogSplitState();

// Emit the transaction END event
TestingReaderOutput<String> readerOutput = new TestingReaderOutput<>();
recordEmitter.emitRecord(
SourceRecords.fromSingleRecord(transactionEndEvent),
readerOutput,
splitState);

// Verify the offset was updated
BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionEndEvent);
Assertions.assertThat(splitState.getStartingOffset())
.isNotNull()
.isEqualByComparingTo(expectedOffset);

// Verify the event was emitted
Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1);
}


@Test
void testNonTransactionEventNotDetected() {
// Create a regular data change event
Schema keySchema = SchemaBuilder.struct()
.field("id", Schema.INT32_SCHEMA)
.build();
Schema valueSchema = SchemaBuilder.struct()
.field("op", Schema.STRING_SCHEMA)
.build();

Struct key = new Struct(keySchema).put("id", 1);
Struct value = new Struct(valueSchema).put("op", "c");

Map<String, Object> offset = new HashMap<>();
offset.put("file", "mysql-bin.000001");
offset.put("pos", 100L);

SourceRecord dataRecord = new SourceRecord(
Collections.singletonMap("server", "mysql"),
offset,
"test.table",
keySchema,
key,
valueSchema,
value);

// Verify it's NOT detected as a transaction metadata event
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(dataRecord)).isFalse();
}

@Test
void testTransactionEventWithoutKeySchemaNotDetected() {
// Create a record without a key schema (should not be detected as transaction event)
Schema valueSchema = SchemaBuilder.struct()
.name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME)
.field("status", Schema.STRING_SCHEMA)
.build();

Struct value = new Struct(valueSchema).put("status", "BEGIN");

Map<String, Object> offset = new HashMap<>();
offset.put("file", "mysql-bin.000001");
offset.put("pos", 100L);

SourceRecord record = new SourceRecord(
Collections.singletonMap("server", "mysql"),
offset,
"transaction.topic",
null, // No key schema
null,
valueSchema,
value);

// Verify it's NOT detected as a transaction metadata event
Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(record)).isFalse();
}

private MySqlBinlogSplitState createBinlogSplitState() {
return new MySqlBinlogSplitState(
new MySqlBinlogSplit(
"binlog-split",
BinlogOffset.ofEarliest(),
BinlogOffset.ofNonStopping(),
Collections.emptyList(),
Collections.emptyMap(),
0));
new MySqlBinlogSplit(
"binlog-split",
BinlogOffset.ofEarliest(),
BinlogOffset.ofNonStopping(),
Collections.emptyList(),
Collections.emptyMap(),
0));
}

/**
* Helper method to create a MySqlRecordEmitter that counts emitted records.
*/
private MySqlRecordEmitter<String> createRecordEmitterWithCounter(AtomicInteger counter) {
return new MySqlRecordEmitter<>(
new DebeziumDeserializationSchema<>() {
@Override
public void deserialize(SourceRecord record, Collector<String> out) {
counter.incrementAndGet();
out.collect("transaction-event");
}

@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
},
new MySqlSourceReaderMetrics(
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()),
false);
}

private SourceRecord createTransactionMetadataEvent(
String status, String transactionId, long position) {
Schema keySchema = SchemaBuilder.struct()
.name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME)
.field("id", Schema.STRING_SCHEMA)
.build();

Schema valueSchema = SchemaBuilder.struct()
.name("io.debezium.connector.common.TransactionMetadataValue")
.field("status", Schema.STRING_SCHEMA)
.field("id", Schema.STRING_SCHEMA)
.field("event_count", Schema.OPTIONAL_INT64_SCHEMA)
.field("ts_ms", Schema.INT64_SCHEMA)
.build();

Struct key = new Struct(keySchema).put("id", transactionId);

Struct value = new Struct(valueSchema)
.put("status", status)
.put("id", transactionId)
.put("ts_ms", System.currentTimeMillis());

if ("END".equals(status)) {
value.put("event_count", 5L);
}

Map<String, Object> offset = new HashMap<>();
offset.put("file", "mysql-bin.000001");
offset.put("pos", position);
offset.put("transaction_id", transactionId);

return new SourceRecord(
Collections.singletonMap("server", "mysql_binlog_source"),
offset,
"mysql_binlog_source.transaction",
keySchema,
key,
valueSchema,
value);
}

}
Loading