Skip to content

Commit b8d8fba

Browse files
branch-4.1: [fix](streaming-job) reject silent-no-op ALTER keys and unsupported load.* properties #62680 (#62989)
Cherry-picked from #62680 Co-authored-by: wudi <wudi@selectdb.com>
1 parent f04c890 commit b8d8fba

4 files changed

Lines changed: 137 additions & 11 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import com.fasterxml.jackson.databind.JsonNode;
2525
import com.fasterxml.jackson.databind.ObjectMapper;
26+
import com.google.common.collect.ImmutableSortedSet;
2627
import com.google.common.collect.Sets;
2728

2829
import java.util.Map;
@@ -61,6 +62,11 @@ public class DataSourceConfigValidator {
6162
DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX
6263
);
6364

65+
private static final Set<String> ALLOW_LOAD_KEYS = ImmutableSortedSet.of(
66+
DataSourceConfigKeys.LOAD_PROPERTIES + LoadCommand.MAX_FILTER_RATIO_PROPERTY,
67+
DataSourceConfigKeys.LOAD_PROPERTIES + LoadCommand.STRICT_MODE
68+
);
69+
6470
private static final String TABLE_LEVEL_PREFIX = DataSourceConfigKeys.TABLE + ".";
6571

6672
public static void validateSource(Map<String, String> input,
@@ -101,18 +107,24 @@ public static void validateSource(Map<String, String> input,
101107
public static void validateTarget(Map<String, String> input) throws IllegalArgumentException {
102108
for (Map.Entry<String, String> entry : input.entrySet()) {
103109
String key = entry.getKey();
104-
if (!key.startsWith(DataSourceConfigKeys.TABLE_PROPS_PREFIX)
105-
&& !key.startsWith(DataSourceConfigKeys.LOAD_PROPERTIES)) {
106-
throw new IllegalArgumentException("Not support target properties key " + key);
110+
if (key.startsWith(DataSourceConfigKeys.TABLE_PROPS_PREFIX)) {
111+
continue;
107112
}
108-
109-
if (key.equals(DataSourceConfigKeys.LOAD_PROPERTIES + LoadCommand.MAX_FILTER_RATIO_PROPERTY)) {
110-
try {
111-
Double.parseDouble(entry.getValue());
112-
} catch (NumberFormatException e) {
113-
throw new IllegalArgumentException("Invalid value for key '" + key + "': " + entry.getValue());
113+
if (key.startsWith(DataSourceConfigKeys.LOAD_PROPERTIES)) {
114+
if (!ALLOW_LOAD_KEYS.contains(key)) {
115+
throw new IllegalArgumentException("Unsupported load property: '" + key
116+
+ "'. Supported keys: " + ALLOW_LOAD_KEYS);
117+
}
118+
if (key.equals(DataSourceConfigKeys.LOAD_PROPERTIES + LoadCommand.MAX_FILTER_RATIO_PROPERTY)) {
119+
try {
120+
Double.parseDouble(entry.getValue());
121+
} catch (NumberFormatException e) {
122+
throw new IllegalArgumentException("Invalid value for key '" + key + "': " + entry.getValue());
123+
}
114124
}
125+
continue;
115126
}
127+
throw new IllegalArgumentException("Not support target properties key " + key);
116128
}
117129
}
118130

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,13 @@ private void checkUnmodifiableSourceProperties(Map<String, String> originSourceP
200200
"The database property cannot be modified in ALTER JOB");
201201
}
202202

203+
if (sourceProperties.containsKey(DataSourceConfigKeys.SCHEMA)) {
204+
Preconditions.checkArgument(Objects.equals(
205+
originSourceProperties.get(DataSourceConfigKeys.SCHEMA),
206+
sourceProperties.get(DataSourceConfigKeys.SCHEMA)),
207+
"The schema property cannot be modified in ALTER JOB");
208+
}
209+
203210
if (sourceProperties.containsKey(DataSourceConfigKeys.INCLUDE_TABLES)) {
204211
Preconditions.checkArgument(Objects.equals(
205212
originSourceProperties.get(DataSourceConfigKeys.INCLUDE_TABLES),
@@ -301,7 +308,8 @@ private void checkUnmodifiableProperties(String originExecuteSql) throws Analysi
301308
"The uri property cannot be modified in ALTER JOB");
302309
break;
303310
case "cdc_stream":
304-
// type, jdbc_url, database, schema, table identify the source and cannot be changed.
311+
// type, jdbc_url, database, schema, and table identify the source and cannot be changed.
312+
// snapshot_* are materialized into split metadata on first fetch and never re-read.
305313
// slot_name / publication_name are fixed at create time to keep ownership stable.
306314
// user, password, driver_url, driver_class, etc. are modifiable (credential rotation).
307315
for (String unmodifiable : new String[] {
@@ -310,6 +318,9 @@ private void checkUnmodifiableProperties(String originExecuteSql) throws Analysi
310318
DataSourceConfigKeys.DATABASE,
311319
DataSourceConfigKeys.SCHEMA,
312320
DataSourceConfigKeys.TABLE,
321+
DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY,
322+
DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE,
323+
DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
313324
DataSourceConfigKeys.SLOT_NAME,
314325
DataSourceConfigKeys.PUBLICATION_NAME}) {
315326
Preconditions.checkArgument(

regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docke
6969
"user" = "root",
7070
"password" = "123456",
7171
"database" = "${mysqlDb}",
72-
"include_tables" = "${table1}",
72+
"include_tables" = "${table1}",
7373
"offset" = "initial"
7474
)
7575
TO DATABASE ${currentDb} (
@@ -79,6 +79,27 @@ suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docke
7979
exception "Not support target properties key table.create.properties1.replication_num"
8080
}
8181

82+
// load.* keys outside the allow-list are silently dropped at runtime; reject at CREATE time
83+
test {
84+
sql """CREATE JOB ${jobName}
85+
ON STREAMING
86+
FROM MYSQL (
87+
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
88+
"driver_url" = "${driver_url}",
89+
"driver_class" = "com.mysql.cj.jdbc.Driver",
90+
"user" = "root",
91+
"password" = "123456",
92+
"database" = "${mysqlDb}",
93+
"include_tables" = "${table1}",
94+
"offset" = "initial"
95+
)
96+
TO DATABASE ${currentDb} (
97+
"load.where" = "age > 0"
98+
)
99+
"""
100+
exception "Unsupported load property: 'load.where'"
101+
}
102+
82103
//error jdbc url format
83104
test {
84105
sql """CREATE JOB ${jobName}
@@ -350,6 +371,18 @@ suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docke
350371
exception "The exclude_tables property cannot be modified in ALTER JOB"
351372
}
352373

374+
// alter schema (PG-only source identity; MySQL does not use it, but the
375+
// from-to check is source-type agnostic and must reject any schema change)
376+
test {
377+
sql """ALTER JOB ${jobName}
378+
FROM MYSQL (
379+
"schema" = "any_schema"
380+
)
381+
TO DATABASE ${currentDb}
382+
"""
383+
exception "The schema property cannot be modified in ALTER JOB"
384+
}
385+
353386
// snapshot_parallelism is cached in BE reader's pollExecutor on first initialize;
354387
// reject to avoid silent staleness
355388
test {

regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.groovy

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,76 @@ suite("test_streaming_job_cdc_stream_mysql", "p0,external,mysql,external_docker,
122122

123123
qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY name """
124124

125+
// snapshot_split_key / snapshot_split_size / snapshot_parallelism are materialized
126+
// into split metadata at CREATE and are never re-read; ALTER must reject them.
127+
sql """PAUSE JOB where jobname = '${jobName}'"""
128+
Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({
129+
def s = sql """select status from jobs("type"="insert") where Name='${jobName}'"""
130+
s.size() == 1 && s.get(0).get(0) == "PAUSED"
131+
})
132+
133+
test {
134+
sql """
135+
ALTER JOB ${jobName}
136+
INSERT INTO ${currentDb}.${dorisTable} (name, age)
137+
SELECT name, age FROM cdc_stream(
138+
"type" = "mysql",
139+
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
140+
"driver_url" = "${driver_url}",
141+
"driver_class" = "com.mysql.cj.jdbc.Driver",
142+
"user" = "root",
143+
"password" = "123456",
144+
"database" = "${mysqlDb}",
145+
"table" = "${mysqlTable}",
146+
"offset" = "initial",
147+
"snapshot_split_key" = "age"
148+
)
149+
"""
150+
exception "snapshot_split_key"
151+
}
152+
153+
test {
154+
sql """
155+
ALTER JOB ${jobName}
156+
INSERT INTO ${currentDb}.${dorisTable} (name, age)
157+
SELECT name, age FROM cdc_stream(
158+
"type" = "mysql",
159+
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
160+
"driver_url" = "${driver_url}",
161+
"driver_class" = "com.mysql.cj.jdbc.Driver",
162+
"user" = "root",
163+
"password" = "123456",
164+
"database" = "${mysqlDb}",
165+
"table" = "${mysqlTable}",
166+
"offset" = "initial",
167+
"snapshot_split_key" = "name",
168+
"snapshot_split_size" = "2048"
169+
)
170+
"""
171+
exception "snapshot_split_size"
172+
}
173+
174+
test {
175+
sql """
176+
ALTER JOB ${jobName}
177+
INSERT INTO ${currentDb}.${dorisTable} (name, age)
178+
SELECT name, age FROM cdc_stream(
179+
"type" = "mysql",
180+
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
181+
"driver_url" = "${driver_url}",
182+
"driver_class" = "com.mysql.cj.jdbc.Driver",
183+
"user" = "root",
184+
"password" = "123456",
185+
"database" = "${mysqlDb}",
186+
"table" = "${mysqlTable}",
187+
"offset" = "initial",
188+
"snapshot_split_key" = "name",
189+
"snapshot_parallelism" = "4"
190+
)
191+
"""
192+
exception "snapshot_parallelism"
193+
}
194+
125195
sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
126196
sql """drop table if exists ${currentDb}.${dorisTable} force"""
127197
}

0 commit comments

Comments
 (0)