diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index 9fdfdcaed..df19ce0d6 100644 --- a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -49,9 +49,11 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -100,6 +102,8 @@ public Connection getConnection() throws SQLException { @Override public List getSchemaList() throws Exception { String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME); + Set chunkKeyTables = getChunkKeyTables(); + List skippedNoPrimaryKeyTables = new ArrayList<>(); List schemaList = new ArrayList<>(); try (Connection conn = getConnection()) { @@ -120,6 +124,14 @@ public List getSchemaList() throws Exception { SourceSchema sourceSchema = new MysqlSchema( metaData, tableCatalog, tableName, tableComment); + if (shouldSkipTableWithoutPrimaryKey( + tableCatalog, + tableName, + sourceSchema.primaryKeys.isEmpty(), + chunkKeyTables)) { + skippedNoPrimaryKeyTables.add(tableCatalog + "." + tableName); + continue; + } sourceSchema.setModel( !sourceSchema.primaryKeys.isEmpty() ? DataModel.UNIQUE @@ -131,9 +143,54 @@ public List getSchemaList() throws Exception { } } } + + if (!skippedNoPrimaryKeyTables.isEmpty()) { + LOG.warn( + "Skipping MySQL tables without primary key in incremental snapshot mode (no chunk key configured): {}. " + + "Configure '{}' for these tables if snapshot sync is required.", + skippedNoPrimaryKeyTables, + MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key()); + } + if (schemaList.isEmpty() && !skippedNoPrimaryKeyTables.isEmpty()) { + throw new IllegalStateException( + String.format( + "No MySQL tables left to synchronize: all matched tables are without primary key and no chunk key is configured in initial snapshot mode. " + + "Skipped tables: %s", + skippedNoPrimaryKeyTables)); + } return schemaList; } + boolean shouldSkipTableWithoutPrimaryKey( + String databaseName, + String tableName, + boolean hasNoPrimaryKey, + Set chunkKeyTables) { + if (!hasNoPrimaryKey) { + return false; + } + if (!isInitialSnapshotStartup()) { + return false; + } + return !chunkKeyTables.contains(databaseName + "." + tableName); + } + + private boolean isInitialSnapshotStartup() { + String startupMode = config.get(MySqlSourceOptions.SCAN_STARTUP_MODE); + if (StringUtils.isNullOrWhitespaceOnly(startupMode)) { + return true; + } + return DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_INITIAL.equalsIgnoreCase(startupMode); + } + + private Set getChunkKeyTables() { + Set tables = new HashSet<>(); + for (ObjectPath objectPath : getChunkColumnMap().keySet()) { + tables.add(objectPath.getDatabaseName() + "." + objectPath.getObjectName()); + } + return tables; + } + @Override public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { MySqlSourceBuilder sourceBuilder = MySqlSource.builder(); diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/test/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSyncTest.java b/flink-doris-connector/flink-doris-connector-flink1/src/test/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSyncTest.java new file mode 100644 index 000000000..95d248a73 --- /dev/null +++ b/flink-doris-connector/flink-doris-connector-flink1/src/test/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSyncTest.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.mysql; + +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions; +import org.apache.flink.configuration.Configuration; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class MysqlDatabaseSyncTest { + + @Test + public void testSkipTableWithoutPrimaryKeyInInitialSnapshot() throws Exception { + MysqlDatabaseSync sync = new MysqlDatabaseSync(); + Configuration config = new Configuration(); + config.setString(MySqlSourceOptions.SCAN_STARTUP_MODE.key(), "initial"); + sync.setConfig(config); + + boolean shouldSkip = + sync.shouldSkipTableWithoutPrimaryKey( + "test_db", "no_pk_table", true, Collections.emptySet()); + + Assert.assertTrue(shouldSkip); + } + + @Test + public void testDoNotSkipTableWithoutPrimaryKeyWhenChunkKeyConfigured() throws Exception { + MysqlDatabaseSync sync = new MysqlDatabaseSync(); + Configuration config = new Configuration(); + config.setString(MySqlSourceOptions.SCAN_STARTUP_MODE.key(), "initial"); + sync.setConfig(config); + + Set chunkKeyTables = new HashSet<>(); + chunkKeyTables.add("test_db.no_pk_table"); + + boolean shouldSkip = + sync.shouldSkipTableWithoutPrimaryKey( + "test_db", "no_pk_table", true, chunkKeyTables); + + Assert.assertFalse(shouldSkip); + } + + @Test + public void testDoNotSkipTableWithoutPrimaryKeyForNonInitialStartup() throws Exception { + MysqlDatabaseSync sync = new MysqlDatabaseSync(); + Configuration config = new Configuration(); + config.setString(MySqlSourceOptions.SCAN_STARTUP_MODE.key(), "latest-offset"); + sync.setConfig(config); + + boolean shouldSkip = + sync.shouldSkipTableWithoutPrimaryKey( + "test_db", "no_pk_table", true, Collections.emptySet()); + + Assert.assertFalse(shouldSkip); + } +}