Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Sets;

import java.util.Map;
Expand Down Expand Up @@ -61,6 +62,11 @@ public class DataSourceConfigValidator {
DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX
);

private static final Set<String> ALLOW_LOAD_KEYS = ImmutableSortedSet.of(
DataSourceConfigKeys.LOAD_PROPERTIES + LoadCommand.MAX_FILTER_RATIO_PROPERTY,
DataSourceConfigKeys.LOAD_PROPERTIES + LoadCommand.STRICT_MODE
);

private static final String TABLE_LEVEL_PREFIX = DataSourceConfigKeys.TABLE + ".";

public static void validateSource(Map<String, String> input,
Expand Down Expand Up @@ -101,18 +107,24 @@ public static void validateSource(Map<String, String> input,
public static void validateTarget(Map<String, String> input) throws IllegalArgumentException {
for (Map.Entry<String, String> entry : input.entrySet()) {
String key = entry.getKey();
if (!key.startsWith(DataSourceConfigKeys.TABLE_PROPS_PREFIX)
&& !key.startsWith(DataSourceConfigKeys.LOAD_PROPERTIES)) {
throw new IllegalArgumentException("Not support target properties key " + key);
if (key.startsWith(DataSourceConfigKeys.TABLE_PROPS_PREFIX)) {
continue;
}

if (key.equals(DataSourceConfigKeys.LOAD_PROPERTIES + LoadCommand.MAX_FILTER_RATIO_PROPERTY)) {
try {
Double.parseDouble(entry.getValue());
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid value for key '" + key + "': " + entry.getValue());
if (key.startsWith(DataSourceConfigKeys.LOAD_PROPERTIES)) {
if (!ALLOW_LOAD_KEYS.contains(key)) {
throw new IllegalArgumentException("Unsupported load property: '" + key
+ "'. Supported keys: " + ALLOW_LOAD_KEYS);
}
if (key.equals(DataSourceConfigKeys.LOAD_PROPERTIES + LoadCommand.MAX_FILTER_RATIO_PROPERTY)) {
try {
Double.parseDouble(entry.getValue());
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid value for key '" + key + "': " + entry.getValue());
}
}
continue;
}
throw new IllegalArgumentException("Not support target properties key " + key);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ private void checkUnmodifiableSourceProperties(Map<String, String> originSourceP
"The database property cannot be modified in ALTER JOB");
}

if (sourceProperties.containsKey(DataSourceConfigKeys.SCHEMA)) {
Preconditions.checkArgument(Objects.equals(
originSourceProperties.get(DataSourceConfigKeys.SCHEMA),
sourceProperties.get(DataSourceConfigKeys.SCHEMA)),
"The schema property cannot be modified in ALTER JOB");
}

if (sourceProperties.containsKey(DataSourceConfigKeys.INCLUDE_TABLES)) {
Preconditions.checkArgument(Objects.equals(
originSourceProperties.get(DataSourceConfigKeys.INCLUDE_TABLES),
Expand Down Expand Up @@ -301,7 +308,8 @@ private void checkUnmodifiableProperties(String originExecuteSql) throws Analysi
"The uri property cannot be modified in ALTER JOB");
break;
case "cdc_stream":
// type, jdbc_url, database, schema, table identify the source and cannot be changed.
// type, jdbc_url, database, schema, and table identify the source and cannot be changed.
// snapshot_* are materialized into split metadata on first fetch and never re-read.
// slot_name / publication_name are fixed at create time to keep ownership stable.
// user, password, driver_url, driver_class, etc. are modifiable (credential rotation).
for (String unmodifiable : new String[] {
Expand All @@ -310,6 +318,9 @@ private void checkUnmodifiableProperties(String originExecuteSql) throws Analysi
DataSourceConfigKeys.DATABASE,
DataSourceConfigKeys.SCHEMA,
DataSourceConfigKeys.TABLE,
DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY,
DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE,
DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
DataSourceConfigKeys.SLOT_NAME,
DataSourceConfigKeys.PUBLICATION_NAME}) {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docke
"user" = "root",
"password" = "123456",
"database" = "${mysqlDb}",
"include_tables" = "${table1}",
"include_tables" = "${table1}",
"offset" = "initial"
)
TO DATABASE ${currentDb} (
Expand All @@ -79,6 +79,27 @@ suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docke
exception "Not support target properties key table.create.properties1.replication_num"
}

// load.* keys outside the allow-list are silently dropped at runtime; reject at CREATE time
test {
sql """CREATE JOB ${jobName}
ON STREAMING
FROM MYSQL (
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
"driver_url" = "${driver_url}",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"user" = "root",
"password" = "123456",
"database" = "${mysqlDb}",
"include_tables" = "${table1}",
"offset" = "initial"
)
TO DATABASE ${currentDb} (
"load.where" = "age > 0"
)
"""
exception "Unsupported load property: 'load.where'"
}

//error jdbc url format
test {
sql """CREATE JOB ${jobName}
Expand Down Expand Up @@ -350,6 +371,18 @@ suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docke
exception "The exclude_tables property cannot be modified in ALTER JOB"
}

// alter schema (PG-only source identity; MySQL does not use it, but the
// from-to check is source-type agnostic and must reject any schema change)
test {
sql """ALTER JOB ${jobName}
FROM MYSQL (
"schema" = "any_schema"
)
TO DATABASE ${currentDb}
"""
exception "The schema property cannot be modified in ALTER JOB"
}

// snapshot_parallelism is cached in BE reader's pollExecutor on first initialize;
// reject to avoid silent staleness
test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,76 @@ suite("test_streaming_job_cdc_stream_mysql", "p0,external,mysql,external_docker,

qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY name """

// snapshot_split_key / snapshot_split_size / snapshot_parallelism are materialized
// into split metadata at CREATE and are never re-read; ALTER must reject them.
sql """PAUSE JOB where jobname = '${jobName}'"""
Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({
def s = sql """select status from jobs("type"="insert") where Name='${jobName}'"""
s.size() == 1 && s.get(0).get(0) == "PAUSED"
})

test {
sql """
ALTER JOB ${jobName}
INSERT INTO ${currentDb}.${dorisTable} (name, age)
SELECT name, age FROM cdc_stream(
"type" = "mysql",
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
"driver_url" = "${driver_url}",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"user" = "root",
"password" = "123456",
"database" = "${mysqlDb}",
"table" = "${mysqlTable}",
"offset" = "initial",
"snapshot_split_key" = "age"
)
"""
exception "snapshot_split_key"
}

test {
sql """
ALTER JOB ${jobName}
INSERT INTO ${currentDb}.${dorisTable} (name, age)
SELECT name, age FROM cdc_stream(
"type" = "mysql",
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
"driver_url" = "${driver_url}",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"user" = "root",
"password" = "123456",
"database" = "${mysqlDb}",
"table" = "${mysqlTable}",
"offset" = "initial",
"snapshot_split_key" = "name",
"snapshot_split_size" = "2048"
)
"""
exception "snapshot_split_size"
}

test {
sql """
ALTER JOB ${jobName}
INSERT INTO ${currentDb}.${dorisTable} (name, age)
SELECT name, age FROM cdc_stream(
"type" = "mysql",
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
"driver_url" = "${driver_url}",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"user" = "root",
"password" = "123456",
"database" = "${mysqlDb}",
"table" = "${mysqlTable}",
"offset" = "initial",
"snapshot_split_key" = "name",
"snapshot_parallelism" = "4"
)
"""
exception "snapshot_parallelism"
}

sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
sql """drop table if exists ${currentDb}.${dorisTable} force"""
}
Expand Down
Loading