|
25 | 25 | import org.apache.flink.table.catalog.ObjectPath; |
26 | 26 |
|
27 | 27 | import io.debezium.config.CommonConnectorConfig; |
28 | | -import org.slf4j.Logger; |
29 | | -import org.slf4j.LoggerFactory; |
30 | 28 |
|
31 | 29 | import java.io.Serializable; |
32 | 30 | import java.time.Duration; |
|
37 | 35 | import java.util.Map; |
38 | 36 | import java.util.Properties; |
39 | 37 | import java.util.UUID; |
40 | | -import java.util.stream.Collectors; |
41 | 38 |
|
42 | 39 | import static org.apache.flink.cdc.connectors.mysql.source.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished; |
43 | 40 | import static org.apache.flink.util.Preconditions.checkNotNull; |
|
47 | 44 | public class MySqlSourceConfigFactory implements Serializable { |
48 | 45 |
|
49 | 46 | private static final long serialVersionUID = 1L; |
50 | | - private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceConfigFactory.class); |
51 | 47 |
|
52 | 48 | private int port = 3306; // default 3306 port |
53 | 49 | private String hostname; |
@@ -477,47 +473,4 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { |
477 | 473 | useLegacyJsonFormat, |
478 | 474 | assignUnboundedChunkFirst); |
479 | 475 | } |
480 | | - |
481 | | - /** |
482 | | - * Convert Flink CDC style table pattern to Debezium style. |
483 | | - * |
484 | | - * <p>In CDC-style table matching, table names are separated by commas and use `\.` for regex |
485 | | - * matching. In Debezium style, table names are separated by pipes and use `.` for regex |
486 | | - * matching while `\.` is used as database.table separator. |
487 | | - * |
488 | | - * <p>Examples: |
489 | | - * |
490 | | - * <ul> |
491 | | - * <li>{@code "db1.table_\.*,db2.user_\.*"} -> {@code "db1\.table_.*|db2\.user_.*"} |
492 | | - * <li>{@code "test_db.orders"} -> {@code "test_db\.orders"} |
493 | | - * </ul> |
494 | | - * |
495 | | - * @param tables Flink CDC style table pattern |
496 | | - * @return Debezium style table pattern |
497 | | - */ |
498 | | - private static String convertToDebeziumStyle(String tables) { |
499 | | - LOG.debug("Converting table pattern to Debezium style: {}", tables); |
500 | | - |
501 | | - // Step 1: Replace comma separator with pipe (OR semantics) |
502 | | - tables = |
503 | | - Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|")); |
504 | | - LOG.debug("After replacing comma with pipe separator: {}", tables); |
505 | | - |
506 | | - // Step 2: Replace escaped dot \. with placeholder |
507 | | - // In Flink CDC, \. means a literal dot in regex, in Debezium it should be . (any |
508 | | - // character). |
509 | | - String unescapedTables = tables.replace("\\.", "$"); |
510 | | - LOG.debug("After unescaping dots as RegEx meta-character: {}", unescapedTables); |
511 | | - |
512 | | - // Step 3: Replace unescaped dot . with \. |
513 | | - // In Flink CDC, unescaped . is database.table separator, in Debezium it should be \. |
514 | | - String escapedTables = unescapedTables.replace(".", "\\."); |
515 | | - LOG.debug("After escaping dots as separator: {}", escapedTables); |
516 | | - |
517 | | - // Step 4: Restore placeholder to regular dot . |
518 | | - String debeziumStyle = escapedTables.replace("$", "."); |
519 | | - LOG.debug("Final Debezium-style table pattern: {}", debeziumStyle); |
520 | | - |
521 | | - return debeziumStyle; |
522 | | - } |
523 | 476 | } |
0 commit comments