@@ -96,40 +96,98 @@ record -> {
9696
9797 private MySqlRecordEmitter <Void > createRecordEmitter () {
9898 return new MySqlRecordEmitter <>(
99- new DebeziumDeserializationSchema <Void >() {
100- @ Override
101- public void deserialize (SourceRecord record , Collector <Void > out ) {
102- throw new UnsupportedOperationException ();
103- }
99+ new DebeziumDeserializationSchema <>() {
100+ @ Override
101+ public void deserialize (SourceRecord record , Collector <Void > out ) {
102+ throw new UnsupportedOperationException ();
103+ }
104104
105- @ Override
106- public TypeInformation <Void > getProducedType () {
107- return TypeInformation .of (Void .class );
108- }
109- },
105+ @ Override
106+ public TypeInformation <Void > getProducedType () {
107+ return TypeInformation .of (Void .class );
108+ }
109+ },
110110 new MySqlSourceReaderMetrics (
111111 UnregisteredMetricGroups .createUnregisteredOperatorMetricGroup ()),
112112 false ,
113113 false );
114114 }
115115
116+ @ Test
117+ void testTransactionMetadataEventsDisabledByDefault () throws Exception {
118+ SourceRecord transactionBeginEvent = createTransactionMetadataEvent ("BEGIN" , "tx-123" , 100L );
119+
120+ Assertions .assertThat (RecordUtils .isTransactionMetadataEvent (transactionBeginEvent ))
121+ .isTrue ();
122+
123+ AtomicInteger emittedRecordsCount = new AtomicInteger (0 );
124+ MySqlRecordEmitter <String > recordEmitter = createRecordEmitterWithTransactionConfig (emittedRecordsCount , false );
125+ MySqlBinlogSplitState splitState = createBinlogSplitState ();
126+
127+ BinlogOffset offsetBeforeEmit = splitState .getStartingOffset ();
128+
129+ TestingReaderOutput <String > readerOutput = new TestingReaderOutput <>();
130+ recordEmitter .emitRecord (
131+ SourceRecords .fromSingleRecord (transactionBeginEvent ),
132+ readerOutput ,
133+ splitState );
134+
135+ // Verify the offset was updated (this should always happen)
136+ BinlogOffset expectedOffset = RecordUtils .getBinlogPosition (transactionBeginEvent );
137+ Assertions .assertThat (splitState .getStartingOffset ())
138+ .isNotNull ()
139+ .isNotEqualTo (offsetBeforeEmit )
140+ .isEqualByComparingTo (expectedOffset );
141+
142+ // Verify the event was NOT emitted (because includeTransactionMetadataEvents=false)
143+ Assertions .assertThat (emittedRecordsCount .get ()).isEqualTo (0 );
144+ Assertions .assertThat (readerOutput .getEmittedRecords ()).isEmpty ();
145+ }
146+
147+ @ Test
148+ void testTransactionMetadataEventsEnabledExplicitly () throws Exception {
149+ SourceRecord transactionBeginEvent = createTransactionMetadataEvent ("BEGIN" , "tx-456" , 150L );
150+
151+ Assertions .assertThat (RecordUtils .isTransactionMetadataEvent (transactionBeginEvent ))
152+ .isTrue ();
153+
154+ AtomicInteger emittedRecordsCount = new AtomicInteger (0 );
155+ MySqlRecordEmitter <String > recordEmitter = createRecordEmitterWithTransactionConfig (emittedRecordsCount , true );
156+ MySqlBinlogSplitState splitState = createBinlogSplitState ();
157+
158+ BinlogOffset offsetBeforeEmit = splitState .getStartingOffset ();
159+
160+ TestingReaderOutput <String > readerOutput = new TestingReaderOutput <>();
161+ recordEmitter .emitRecord (
162+ SourceRecords .fromSingleRecord (transactionBeginEvent ),
163+ readerOutput ,
164+ splitState );
165+
166+ // Verify the offset was updated
167+ BinlogOffset expectedOffset = RecordUtils .getBinlogPosition (transactionBeginEvent );
168+ Assertions .assertThat (splitState .getStartingOffset ())
169+ .isNotNull ()
170+ .isNotEqualTo (offsetBeforeEmit )
171+ .isEqualByComparingTo (expectedOffset );
172+
173+ // Verify the event was emitted (because includeTransactionMetadataEvents=true)
174+ Assertions .assertThat (emittedRecordsCount .get ()).isEqualTo (1 );
175+ Assertions .assertThat (readerOutput .getEmittedRecords ()).hasSize (1 );
176+ }
177+
116178 @ Test
117179 void testTransactionBeginEventHandling () throws Exception {
118- // Create a transaction BEGIN event
119180 SourceRecord transactionBeginEvent = createTransactionMetadataEvent ("BEGIN" , "tx-123" , 100L );
120181
121- // Verify it's detected as a transaction metadata event
122182 Assertions .assertThat (RecordUtils .isTransactionMetadataEvent (transactionBeginEvent ))
123183 .isTrue ();
124184
125- // Create emitter and split state
126185 AtomicInteger emittedRecordsCount = new AtomicInteger (0 );
127186 MySqlRecordEmitter <String > recordEmitter = createRecordEmitterWithCounter (emittedRecordsCount );
128187 MySqlBinlogSplitState splitState = createBinlogSplitState ();
129188
130189 BinlogOffset offsetBeforeEmit = splitState .getStartingOffset ();
131190
132- // Emit the transaction BEGIN event
133191 TestingReaderOutput <String > readerOutput = new TestingReaderOutput <>();
134192 recordEmitter .emitRecord (
135193 SourceRecords .fromSingleRecord (transactionBeginEvent ),
@@ -149,19 +207,15 @@ void testTransactionBeginEventHandling() throws Exception {
149207
150208 @ Test
151209 void testTransactionEndEventHandling () throws Exception {
152- // Create a transaction END event
153210 SourceRecord transactionEndEvent = createTransactionMetadataEvent ("END" , "tx-123" , 200L );
154211
155- // Verify it's detected as a transaction metadata event
156212 Assertions .assertThat (RecordUtils .isTransactionMetadataEvent (transactionEndEvent ))
157213 .isTrue ();
158214
159- // Create emitter and split state
160215 AtomicInteger emittedRecordsCount = new AtomicInteger (0 );
161216 MySqlRecordEmitter <String > recordEmitter = createRecordEmitterWithCounter (emittedRecordsCount );
162217 MySqlBinlogSplitState splitState = createBinlogSplitState ();
163218
164- // Emit the transaction END event
165219 TestingReaderOutput <String > readerOutput = new TestingReaderOutput <>();
166220 recordEmitter .emitRecord (
167221 SourceRecords .fromSingleRecord (transactionEndEvent ),
@@ -181,7 +235,6 @@ void testTransactionEndEventHandling() throws Exception {
181235
182236 @ Test
183237 void testNonTransactionEventNotDetected () {
184- // Create a regular data change event
185238 Schema keySchema = SchemaBuilder .struct ()
186239 .field ("id" , Schema .INT32_SCHEMA )
187240 .build ();
@@ -211,7 +264,6 @@ void testNonTransactionEventNotDetected() {
211264
212265 @ Test
213266 void testTransactionEventWithoutKeySchemaNotDetected () {
214- // Create a record without a key schema (should not be detected as transaction event)
215267 Schema valueSchema = SchemaBuilder .struct ()
216268 .name (RecordUtils .SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME )
217269 .field ("status" , Schema .STRING_SCHEMA )
@@ -236,6 +288,64 @@ void testTransactionEventWithoutKeySchemaNotDetected() {
236288 Assertions .assertThat (RecordUtils .isTransactionMetadataEvent (record )).isFalse ();
237289 }
238290
291+ @ Test
292+ void testMultipleTransactionEventsWithDisabledConfig () throws Exception {
293+ SourceRecord beginEvent = createTransactionMetadataEvent ("BEGIN" , "tx-789" , 300L );
294+ SourceRecord endEvent = createTransactionMetadataEvent ("END" , "tx-789" , 400L );
295+
296+ AtomicInteger emittedRecordsCount = new AtomicInteger (0 );
297+ MySqlRecordEmitter <String > recordEmitter = createRecordEmitterWithTransactionConfig (emittedRecordsCount , false );
298+ MySqlBinlogSplitState splitState = createBinlogSplitState ();
299+
300+ TestingReaderOutput <String > readerOutput = new TestingReaderOutput <>();
301+
302+ recordEmitter .emitRecord (
303+ SourceRecords .fromSingleRecord (beginEvent ),
304+ readerOutput ,
305+ splitState );
306+
307+ recordEmitter .emitRecord (
308+ SourceRecords .fromSingleRecord (endEvent ),
309+ readerOutput ,
310+ splitState );
311+
312+ // Verify offsets were updated but no events were emitted
313+ BinlogOffset expectedOffset = RecordUtils .getBinlogPosition (endEvent );
314+ Assertions .assertThat (splitState .getStartingOffset ())
315+ .isNotNull ()
316+ .isEqualByComparingTo (expectedOffset );
317+
318+ // Verify no events were emitted (because includeTransactionMetadataEvents=false)
319+ Assertions .assertThat (emittedRecordsCount .get ()).isEqualTo (0 );
320+ Assertions .assertThat (readerOutput .getEmittedRecords ()).isEmpty ();
321+ }
322+
323+ @ Test
324+ void testMixedEventsWithTransactionMetadataDisabled () throws Exception {
325+ SourceRecord transactionEvent = createTransactionMetadataEvent ("BEGIN" , "tx-mixed" , 500L );
326+ SourceRecord dataEvent = createDataChangeEvent ("test.table" , 501L );
327+
328+ AtomicInteger emittedRecordsCount = new AtomicInteger (0 );
329+ MySqlRecordEmitter <String > recordEmitter = createRecordEmitterWithTransactionConfig (emittedRecordsCount , false );
330+ MySqlBinlogSplitState splitState = createBinlogSplitState ();
331+
332+ TestingReaderOutput <String > readerOutput = new TestingReaderOutput <>();
333+
334+ recordEmitter .emitRecord (
335+ SourceRecords .fromSingleRecord (transactionEvent ),
336+ readerOutput ,
337+ splitState );
338+
339+ recordEmitter .emitRecord (
340+ SourceRecords .fromSingleRecord (dataEvent ),
341+ readerOutput ,
342+ splitState );
343+
344+ // Verify only data event was emitted (count=1, not 2)
345+ Assertions .assertThat (emittedRecordsCount .get ()).isEqualTo (1 );
346+ Assertions .assertThat (readerOutput .getEmittedRecords ()).hasSize (1 );
347+ }
348+
239349 private MySqlBinlogSplitState createBinlogSplitState () {
240350 return new MySqlBinlogSplitState (
241351 new MySqlBinlogSplit (
@@ -251,6 +361,13 @@ private MySqlBinlogSplitState createBinlogSplitState() {
251361 * Helper method to create a MySqlRecordEmitter that counts emitted records.
252362 */
253363 private MySqlRecordEmitter <String > createRecordEmitterWithCounter (AtomicInteger counter ) {
364+ return createRecordEmitterWithTransactionConfig (counter , true );
365+ }
366+
367+ /**
368+ * Helper method to create a MySqlRecordEmitter with configurable transaction metadata events.
369+ */
370+ private MySqlRecordEmitter <String > createRecordEmitterWithTransactionConfig (AtomicInteger counter , boolean includeTransactionMetadataEvents ) {
254371 return new MySqlRecordEmitter <>(
255372 new DebeziumDeserializationSchema <>() {
256373 @ Override
@@ -267,7 +384,7 @@ public TypeInformation<String> getProducedType() {
267384 new MySqlSourceReaderMetrics (
268385 UnregisteredMetricGroups .createUnregisteredOperatorMetricGroup ()),
269386 false ,
270- true );
387+ includeTransactionMetadataEvents );
271388 }
272389
273390 private SourceRecord createTransactionMetadataEvent (
@@ -311,4 +428,38 @@ private SourceRecord createTransactionMetadataEvent(
311428 value );
312429 }
313430
431+ private SourceRecord createDataChangeEvent (String topicName , long position ) {
432+ Schema keySchema = SchemaBuilder .struct ()
433+ .field ("id" , Schema .INT32_SCHEMA )
434+ .build ();
435+ Schema valueSchema = SchemaBuilder .struct ()
436+ .field ("op" , Schema .STRING_SCHEMA )
437+ .field ("after" , SchemaBuilder .struct ()
438+ .field ("id" , Schema .INT32_SCHEMA )
439+ .field ("name" , Schema .STRING_SCHEMA )
440+ .optional ())
441+ .build ();
442+
443+ Struct key = new Struct (keySchema ).put ("id" , 1 );
444+ Struct after = new Struct (valueSchema .field ("after" ).schema ())
445+ .put ("id" , 1 )
446+ .put ("name" , "test" );
447+ Struct value = new Struct (valueSchema )
448+ .put ("op" , "c" )
449+ .put ("after" , after );
450+
451+ Map <String , Object > offset = new HashMap <>();
452+ offset .put ("file" , "mysql-bin.000001" );
453+ offset .put ("pos" , position );
454+
455+ return new SourceRecord (
456+ Collections .singletonMap ("server" , "mysql" ),
457+ offset ,
458+ topicName ,
459+ keySchema ,
460+ key ,
461+ valueSchema ,
462+ value );
463+ }
464+
314465}
0 commit comments