|
23 | 23 | import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; |
24 | 24 | import org.apache.flink.table.catalog.ObjectPath; |
25 | 25 |
|
| 26 | +import io.debezium.config.CommonConnectorConfig; |
| 27 | + |
26 | 28 | import java.io.Serializable; |
27 | 29 | import java.time.Duration; |
28 | 30 | import java.time.ZoneId; |
@@ -64,6 +66,7 @@ public class MySqlSourceConfigFactory implements Serializable { |
64 | 66 | MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(); |
65 | 67 | private boolean includeSchemaChanges = false; |
66 | 68 | private boolean includeHeartbeatEvents = false; |
| 69 | + private boolean includeTransactionMetadataEvents = false; |
67 | 70 | private boolean scanNewlyAddedTableEnabled = false; |
68 | 71 | private boolean closeIdleReaders = false; |
69 | 72 | private Properties jdbcProperties; |
@@ -242,6 +245,13 @@ public MySqlSourceConfigFactory includeHeartbeatEvents(boolean includeHeartbeatE |
242 | 245 | return this; |
243 | 246 | } |
244 | 247 |
|
| 248 | + /** Whether the {@link MySqlSource} should output the transaction metadata events or not. */ |
| 249 | + public MySqlSourceConfigFactory includeTransactionMetadataEvents( |
| 250 | + boolean includeTransactionMetadataEvents) { |
| 251 | + this.includeTransactionMetadataEvents = includeTransactionMetadataEvents; |
| 252 | + return this; |
| 253 | + } |
| 254 | + |
245 | 255 | /** Whether the {@link MySqlSource} should scan the newly added tables or not. */ |
246 | 256 | public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { |
247 | 257 | this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; |
@@ -366,6 +376,10 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { |
366 | 376 | // Note: the includeSchemaChanges parameter is used to control emitting the schema record, |
367 | 377 | // only DataStream API program need to emit the schema record, the Table API need not |
368 | 378 | props.setProperty("include.schema.changes", String.valueOf(true)); |
| 379 | + // enable transaction metadata if includeTransactionMetadataEvents is true |
| 380 | + props.setProperty( |
| 381 | + CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA.name(), |
| 382 | + String.valueOf(includeTransactionMetadataEvents)); |
369 | 383 | // disable the offset flush totally |
370 | 384 | props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE)); |
371 | 385 | // disable tombstones |
@@ -420,6 +434,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { |
420 | 434 | distributionFactorLower, |
421 | 435 | includeSchemaChanges, |
422 | 436 | includeHeartbeatEvents, |
| 437 | + includeTransactionMetadataEvents, |
423 | 438 | scanNewlyAddedTableEnabled, |
424 | 439 | closeIdleReaders, |
425 | 440 | props, |
|
0 commit comments