Skip to content

Commit 5b5eb88

Browse files
committed
[FLINK-38779] Move TableIdRouter to flink-cdc-common package
1 parent 693b561 commit 5b5eb88

10 files changed

Lines changed: 43 additions & 65 deletions

File tree

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java renamed to flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.cdc.runtime.operators.schema.common;
18+
package org.apache.flink.cdc.common.route;
1919

2020
import org.apache.flink.api.java.tuple.Tuple3;
21+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2122
import org.apache.flink.cdc.common.event.TableId;
22-
import org.apache.flink.cdc.common.route.RouteRule;
23+
import org.apache.flink.cdc.common.schema.Selectors;
2324

2425
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
2526
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
@@ -44,6 +45,7 @@
4445
* Calculates how upstream data change events should be dispatched to downstream tables. Returns one
4546
* or many destination Table IDs based on provided routing rules.
4647
*/
48+
@PublicEvolving
4749
public class TableIdRouter {
4850

4951
private static final Logger LOG = LoggerFactory.getLogger(TableIdRouter.class);
@@ -54,7 +56,21 @@ public class TableIdRouter {
5456

5557
private static final String DOT_PLACEHOLDER = "_dot_placeholder_";
5658

57-
private static Pattern validateTableListToRegExpPattern(String tables) {
59+
/**
60+
* Currently, The supported regular syntax is not exactly the same in {@link Selectors}.
61+
*
62+
* <p>The main discrepancies are :
63+
*
64+
* <p>1) {@link Selectors} use {@code ,} to split table names instead of `|`.
65+
*
66+
* <p>2) If there is a need to use a dot ({@code .}) in a regular expression to match any
67+
* character, it is necessary to escape the dot with a backslash.
68+
*
69+
* <p>3) The unescaped {@code .} is used as the separator of database and table name. When
70+
* converting to Debezium style, it is expected to be escaped to match the dot ({@code .})
71+
* literally instead of the meta-character.
72+
*/
73+
public static String convertTableListToRegExpPattern(String tables) {
5874
LOG.info("Rewriting CDC style table capture list: {}", tables);
5975

6076
// In CDC-style table matching, table names could be separated by `,` character.
@@ -86,7 +102,7 @@ private static Pattern validateTableListToRegExpPattern(String tables) {
86102
unescapedTablesWithDbTblSeparator.replace(DOT_PLACEHOLDER, ".");
87103
LOG.info("Final standard RegExp table capture list: {}", standardRegExpTableCaptureList);
88104

89-
return Pattern.compile(standardRegExpTableCaptureList);
105+
return standardRegExpTableCaptureList;
90106
}
91107

92108
public TableIdRouter(List<RouteRule> routingRules) {
@@ -95,7 +111,7 @@ public TableIdRouter(List<RouteRule> routingRules) {
95111
try {
96112
routes.add(
97113
new Tuple3<>(
98-
validateTableListToRegExpPattern(rule.sourceTable),
114+
Pattern.compile(convertTableListToRegExpPattern(rule.sourceTable)),
99115
rule.sinkTable,
100116
rule.replaceSymbol));
101117
} catch (PatternSyntaxException e) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java

Lines changed: 2 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.flink.cdc.common.source.DataSource;
3131
import org.apache.flink.cdc.common.utils.StringUtils;
3232
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
33-
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
3433
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
3534
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
3635
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
@@ -45,7 +44,6 @@
4544

4645
import com.mysql.cj.conf.PropertyKey;
4746
import io.debezium.relational.RelationalDatabaseConnectorConfig;
48-
import io.debezium.relational.Tables;
4947
import org.slf4j.Logger;
5048
import org.slf4j.LoggerFactory;
5149

@@ -63,6 +61,7 @@
6361
import java.util.Set;
6462
import java.util.stream.Collectors;
6563

64+
import static org.apache.flink.cdc.common.route.TableIdRouter.convertTableListToRegExpPattern;
6665
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
6766
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
6867
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_META_GROUP_SIZE;
@@ -231,7 +230,7 @@ public DataSource createDataSource(Context context) {
231230
}
232231

233232
if (scanBinlogNewlyAddedTableEnabled) {
234-
String newTables = validateTableAndReturnDebeziumStyle(tables);
233+
String newTables = convertTableListToRegExpPattern(tables);
235234
configFactory.tableList(newTables);
236235
configFactory.excludeTableList(tablesExclude);
237236

@@ -516,58 +515,6 @@ && doubleCompare(distributionFactorLower, 1.0d) <= 0,
516515

517516
private static final String DOT_PLACEHOLDER = "_$dot_placeholder$_";
518517

519-
/**
520-
* Currently, The supported regular syntax is not exactly the same in {@link Selectors} and
521-
* {@link Tables.TableFilter}.
522-
*
523-
* <p>The main distinction are :
524-
*
525-
* <p>1) {@link Selectors} use {@code ,} to split table names and {@link Tables.TableFilter} use
526-
* `|` to split table names.
527-
*
528-
* <p>2) If there is a need to use a dot ({@code .}) in a regular expression to match any
529-
* character, it is necessary to escape the dot with a backslash, refer to {@link
530-
* MySqlDataSourceOptions#TABLES}.
531-
*
532-
* <p>3) The unescaped {@code .} is used as the separator of database and table name. When
533-
* converting to Debezium style, it is expected to be escaped to match the dot ({@code .})
534-
* literally instead of the meta-character.
535-
*/
536-
private String validateTableAndReturnDebeziumStyle(String tables) {
537-
LOG.info("Rewriting CDC style table capture list: {}", tables);
538-
539-
// In CDC-style table matching, table names could be separated by `,` character.
540-
// Convert it to `|` as it's standard RegEx syntax.
541-
tables =
542-
Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|"));
543-
LOG.info("Expression after replacing comma with vert separator: {}", tables);
544-
545-
// Essentially, we're just trying to swap escaped `\\.` and unescaped `.`.
546-
// In our table matching syntax, `\\.` means RegEx token matcher and `.` means database &
547-
// table name separator.
548-
// On the contrary, while we're matching TableId string, `\\.` means matching the "dot"
549-
// literal and `.` is the meta-character.
550-
551-
// Step 1: escape the dot with a backslash, but keep it as a placeholder (like `$`).
552-
// For example, `db\.*.tbl\.*` => `db$*.tbl$*`
553-
String unescapedTables = tables.replace("\\.", DOT_PLACEHOLDER);
554-
LOG.info("Expression after unescaping dots as RegEx meta-character: {}", unescapedTables);
555-
556-
// Step 2: replace all remaining dots (`.`) to quoted version (`\.`), as a separator between
557-
// database and table names.
558-
// For example, `db$*.tbl$*` => `db$*\.tbl$*`
559-
String unescapedTablesWithDbTblSeparator = unescapedTables.replace(".", "\\.");
560-
LOG.info("Re-escaping dots as TableId delimiter: {}", unescapedTablesWithDbTblSeparator);
561-
562-
// Step 3: restore placeholder to normal RegEx matcher (`.`)
563-
// For example, `db$*\.tbl$*` => `db.*\.tbl.*`
564-
String debeziumStyleTableCaptureList =
565-
unescapedTablesWithDbTblSeparator.replace(DOT_PLACEHOLDER, ".");
566-
LOG.info("Final Debezium-style table capture list: {}", debeziumStyleTableCaptureList);
567-
568-
return debeziumStyleTableCaptureList;
569-
}
570-
571518
/** Replaces the default timezone placeholder with session timezone, if applicable. */
572519
private static ZoneId getServerTimeZone(Configuration config) {
573520
final String serverTimeZone = config.get(SERVER_TIME_ZONE);

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema;
2929
import org.apache.flink.cdc.common.event.TableId;
3030
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
31+
import org.apache.flink.cdc.common.route.TableIdRouter;
3132
import org.apache.flink.cdc.common.schema.Column;
3233
import org.apache.flink.cdc.common.schema.Schema;
3334
import org.apache.flink.cdc.common.sink.MetadataApplier;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.cdc.common.event.TableId;
2323
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
2424
import org.apache.flink.cdc.common.route.RouteRule;
25+
import org.apache.flink.cdc.common.route.TableIdRouter;
2526
import org.apache.flink.cdc.common.schema.Schema;
2627
import org.apache.flink.cdc.common.sink.MetadataApplier;
2728
import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
2727
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
2828
import org.apache.flink.cdc.common.route.RouteRule;
29+
import org.apache.flink.cdc.common.route.TableIdRouter;
2930
import org.apache.flink.cdc.common.schema.Schema;
3031
import org.apache.flink.cdc.common.utils.SchemaUtils;
3132
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
3233
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
33-
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
3434
import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
3535
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeRequest;
3636
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeResponse;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
import org.apache.flink.cdc.common.event.TableId;
2626
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
2727
import org.apache.flink.cdc.common.route.RouteRule;
28+
import org.apache.flink.cdc.common.route.TableIdRouter;
2829
import org.apache.flink.cdc.common.schema.Schema;
2930
import org.apache.flink.cdc.common.sink.MetadataApplier;
3031
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
3132
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
32-
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
3333
import org.apache.flink.streaming.api.graph.StreamConfig;
3434
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
3535
import org.apache.flink.streaming.api.operators.ChainingStrategy;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626
import org.apache.flink.cdc.common.event.TableId;
2727
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
2828
import org.apache.flink.cdc.common.route.RouteRule;
29+
import org.apache.flink.cdc.common.route.TableIdRouter;
2930
import org.apache.flink.cdc.common.schema.Schema;
3031
import org.apache.flink.cdc.common.utils.SchemaUtils;
3132
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
3233
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
33-
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
3434
import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
3535
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest;
3636
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterTest.java renamed to flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.cdc.runtime.operators.schema.common;
18+
package org.apache.flink.cdc.common.route;
1919

2020
import org.apache.flink.cdc.common.event.TableId;
21-
import org.apache.flink.cdc.common.route.RouteRule;
21+
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaTestBase;
2222

2323
import org.junit.jupiter.api.Test;
2424

@@ -40,6 +40,17 @@ private static List<String> testRoute(String tableId) {
4040
.collect(Collectors.toList());
4141
}
4242

43+
private static String testConvert(String input) {
44+
return TableIdRouter.convertTableListToRegExpPattern(input);
45+
}
46+
47+
@Test
48+
void testConvertingDebeziumTableIdToStandardRegex() {
49+
assertThat(testConvert("foo.bar")).isEqualTo("foo\\.bar");
50+
assertThat(testConvert("foo.bar,foo.baz")).isEqualTo("foo\\.bar|foo\\.baz");
51+
assertThat(testConvert("db.\\.*")).isEqualTo("db\\..*");
52+
}
53+
4354
@Test
4455
void testImplicitRoute() {
4556
assertThat(testRoute("db_0.table_1")).containsExactlyInAnyOrder("db_0.table_1");

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.cdc.common.event.TruncateTableEvent;
2929
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
3030
import org.apache.flink.cdc.common.route.RouteRule;
31+
import org.apache.flink.cdc.common.route.TableIdRouter;
3132
import org.apache.flink.cdc.common.schema.Column;
3233
import org.apache.flink.cdc.common.schema.Schema;
3334
import org.apache.flink.cdc.common.sink.MetadataApplier;

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.cdc.common.event.Event;
2424
import org.apache.flink.cdc.common.event.TableId;
2525
import org.apache.flink.cdc.common.route.RouteRule;
26+
import org.apache.flink.cdc.common.route.TableIdRouter;
2627
import org.apache.flink.cdc.common.types.DataType;
2728
import org.apache.flink.cdc.common.types.DataTypes;
2829
import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;

0 commit comments

Comments
 (0)