diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java index 0e87828f42fc80..c4aad00d33a175 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Sets; import java.util.Map; @@ -61,6 +62,11 @@ public class DataSourceConfigValidator { DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX ); + private static final Set ALLOW_LOAD_KEYS = ImmutableSortedSet.of( + DataSourceConfigKeys.LOAD_PROPERTIES + LoadCommand.MAX_FILTER_RATIO_PROPERTY, + DataSourceConfigKeys.LOAD_PROPERTIES + LoadCommand.STRICT_MODE + ); + private static final String TABLE_LEVEL_PREFIX = DataSourceConfigKeys.TABLE + "."; public static void validateSource(Map input, @@ -101,18 +107,24 @@ public static void validateSource(Map input, public static void validateTarget(Map input) throws IllegalArgumentException { for (Map.Entry entry : input.entrySet()) { String key = entry.getKey(); - if (!key.startsWith(DataSourceConfigKeys.TABLE_PROPS_PREFIX) - && !key.startsWith(DataSourceConfigKeys.LOAD_PROPERTIES)) { - throw new IllegalArgumentException("Not support target properties key " + key); + if (key.startsWith(DataSourceConfigKeys.TABLE_PROPS_PREFIX)) { + continue; } - - if (key.equals(DataSourceConfigKeys.LOAD_PROPERTIES + LoadCommand.MAX_FILTER_RATIO_PROPERTY)) { - try { - Double.parseDouble(entry.getValue()); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid value for key '" + key + "': " + entry.getValue()); + if (key.startsWith(DataSourceConfigKeys.LOAD_PROPERTIES)) { + if (!ALLOW_LOAD_KEYS.contains(key)) { + throw new IllegalArgumentException("Unsupported load property: '" + key + + "'. Supported keys: " + ALLOW_LOAD_KEYS); + } + if (key.equals(DataSourceConfigKeys.LOAD_PROPERTIES + LoadCommand.MAX_FILTER_RATIO_PROPERTY)) { + try { + Double.parseDouble(entry.getValue()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid value for key '" + key + "': " + entry.getValue()); + } } + continue; } + throw new IllegalArgumentException("Not support target properties key " + key); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java index 7886ff65ee3c92..d75be51c457670 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java @@ -200,6 +200,13 @@ private void checkUnmodifiableSourceProperties(Map originSourceP "The database property cannot be modified in ALTER JOB"); } + if (sourceProperties.containsKey(DataSourceConfigKeys.SCHEMA)) { + Preconditions.checkArgument(Objects.equals( + originSourceProperties.get(DataSourceConfigKeys.SCHEMA), + sourceProperties.get(DataSourceConfigKeys.SCHEMA)), + "The schema property cannot be modified in ALTER JOB"); + } + if (sourceProperties.containsKey(DataSourceConfigKeys.INCLUDE_TABLES)) { Preconditions.checkArgument(Objects.equals( originSourceProperties.get(DataSourceConfigKeys.INCLUDE_TABLES), @@ -301,7 +308,8 @@ private void checkUnmodifiableProperties(String originExecuteSql) throws Analysi "The uri property cannot be modified in ALTER JOB"); break; case "cdc_stream": - // type, jdbc_url, database, schema, table identify the source and cannot be changed. + // type, jdbc_url, database, schema, and table identify the source and cannot be changed. + // snapshot_* are materialized into split metadata on first fetch and never re-read. // slot_name / publication_name are fixed at create time to keep ownership stable. // user, password, driver_url, driver_class, etc. are modifiable (credential rotation). for (String unmodifiable : new String[] { @@ -310,6 +318,9 @@ private void checkUnmodifiableProperties(String originExecuteSql) throws Analysi DataSourceConfigKeys.DATABASE, DataSourceConfigKeys.SCHEMA, DataSourceConfigKeys.TABLE, + DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY, + DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE, + DataSourceConfigKeys.SNAPSHOT_PARALLELISM, DataSourceConfigKeys.SLOT_NAME, DataSourceConfigKeys.PUBLICATION_NAME}) { Preconditions.checkArgument( diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy index 64cce88fb97963..a0dbd5e36cea6e 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy @@ -69,7 +69,7 @@ suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docke "user" = "root", "password" = "123456", "database" = "${mysqlDb}", - "include_tables" = "${table1}", + "include_tables" = "${table1}", "offset" = "initial" ) TO DATABASE ${currentDb} ( @@ -79,6 +79,27 @@ suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docke exception "Not support target properties key table.create.properties1.replication_num" } + // load.* keys outside the allow-list are silently dropped at runtime; reject at CREATE time + test { + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "load.where" = "age > 0" + ) + """ + exception "Unsupported load property: 'load.where'" + } + //error jdbc url format test { sql """CREATE JOB ${jobName} @@ -350,6 +371,18 @@ suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docke exception "The exclude_tables property cannot be modified in ALTER JOB" } + // alter schema (PG-only source identity; MySQL does not use it, but the + // from-to check is source-type agnostic and must reject any schema change) + test { + sql """ALTER JOB ${jobName} + FROM MYSQL ( + "schema" = "any_schema" + ) + TO DATABASE ${currentDb} + """ + exception "The schema property cannot be modified in ALTER JOB" + } + // snapshot_parallelism is cached in BE reader's pollExecutor on first initialize; // reject to avoid silent staleness test { diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.groovy index fdc50c76118701..f17140567c8e3e 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.groovy @@ -122,6 +122,76 @@ suite("test_streaming_job_cdc_stream_mysql", "p0,external,mysql,external_docker, qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY name """ + // snapshot_split_key / snapshot_split_size / snapshot_parallelism are materialized + // into split metadata at CREATE and are never re-read; ALTER must reject them. + sql """PAUSE JOB where jobname = '${jobName}'""" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def s = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + s.size() == 1 && s.get(0).get(0) == "PAUSED" + }) + + test { + sql """ + ALTER JOB ${jobName} + INSERT INTO ${currentDb}.${dorisTable} (name, age) + SELECT name, age FROM cdc_stream( + "type" = "mysql", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "table" = "${mysqlTable}", + "offset" = "initial", + "snapshot_split_key" = "age" + ) + """ + exception "snapshot_split_key" + } + + test { + sql """ + ALTER JOB ${jobName} + INSERT INTO ${currentDb}.${dorisTable} (name, age) + SELECT name, age FROM cdc_stream( + "type" = "mysql", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "table" = "${mysqlTable}", + "offset" = "initial", + "snapshot_split_key" = "name", + "snapshot_split_size" = "2048" + ) + """ + exception "snapshot_split_size" + } + + test { + sql """ + ALTER JOB ${jobName} + INSERT INTO ${currentDb}.${dorisTable} (name, age) + SELECT name, age FROM cdc_stream( + "type" = "mysql", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "table" = "${mysqlTable}", + "offset" = "initial", + "snapshot_split_key" = "name", + "snapshot_parallelism" = "4" + ) + """ + exception "snapshot_parallelism" + } + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" sql """drop table if exists ${currentDb}.${dorisTable} force""" }