[Improve](streaming job) support custom table name mapping for CDC streaming job #61317
[Improve](streaming job) support custom table name mapping for CDC streaming job #61317JNSimba wants to merge 2 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
There was a problem hiding this comment.
Pull request overview
Adds support for streaming CDC jobs (Postgres) to map upstream source table names to different Doris target table names via per-table config (table.<src>.target_table), including schema-change DDL routing and regression coverage.
Changes:
- Introduce per-table config key constants and validation for
table.<tableName>.<suffix>(addstarget_tablesuffix). - Update FE table auto-creation to create Doris tables using mapped target names while keeping CDC monitoring based on source table names.
- Update CDC client to route stream-load writes and schema-change DDLs to mapped target tables; add regression tests for mapping + multi-source merge.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.groovy | New regression suite covering table name mapping and two-source-to-one-target merge. |
| regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.out | Expected results for the new mapping regression suite. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java | Adds helper to parse all table.<src>.target_table mappings from config. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java | Route schema-change DDLs to the mapped Doris target table. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java | Cache parsed source→target mappings and provide resolveTargetTable(). |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java | Route stream-load writes to mapped target table names. |
| fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java | Generate CREATE TABLE commands keyed by source table name and create Doris tables using mapped target names. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Use the new source→CreateTableCommand mapping and ensure CDC monitors source tables. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java | Validate per-table config key format and allowlisted suffixes. |
| fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java | Adds per-table key constants (table, exclude_columns, target_table). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| public static Map<String, String> parseAllTargetTableMappings(Map<String, String> config) { | ||
| String prefix = DataSourceConfigKeys.TABLE + "."; | ||
| String suffix = "." + DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX; | ||
| Map<String, String> result = new HashMap<>(); | ||
| for (Map.Entry<String, String> entry : config.entrySet()) { | ||
| String key = entry.getKey(); | ||
| if (key.startsWith(prefix) && key.endsWith(suffix)) { |
| String key = entry.getKey(); | ||
| if (key.startsWith(prefix) && key.endsWith(suffix)) { | ||
| String srcTable = key.substring(prefix.length(), key.length() - suffix.length()); | ||
| String dstTable = entry.getValue().trim(); |
|
|
||
| // Known suffixes for per-table config keys (format: "table.<tableName>.<suffix>") | ||
| private static final Set<String> ALLOW_TABLE_LEVEL_SUFFIXES = Sets.newHashSet( | ||
| DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX |
| String suffix = parts[parts.length - 1]; | ||
| if (!ALLOW_TABLE_LEVEL_SUFFIXES.contains(suffix)) { | ||
| throw new IllegalArgumentException("Unknown per-table config key: '" + key + "'"); | ||
| } |
There was a problem hiding this comment.
Code Review Summary
This PR adds support for mapping upstream (PostgreSQL) table names to custom downstream (Doris) table names in CDC streaming jobs. The design is generally sound — the mapping is applied consistently across FE table creation, CDC client DML writes, and schema change DDL generation.
Critical Checkpoints
Goal achievement: The code accomplishes the stated goal. Table name mapping is applied in all three necessary places: (1) FE CreateTableCommand uses the mapped target name, (2) PipelineCoordinator routes DML records to the mapped Doris table, (3) PostgresDebeziumJsonDeserializer generates DDLs with the mapped name. Regression tests cover both basic mapping and multi-table merge scenarios.
Modification focus: The change is focused and touches only the necessary files. Good.
Concurrency: targetTableMappingsCache in DebeziumJsonDeserializer is a plain HashMap written once in init() and only read afterwards. This is safe in practice but could be made more robust with Collections.unmodifiableMap(). Low risk, not blocking.
Lifecycle management: No special lifecycle concerns.
Configuration items: New config key format table.<src>.target_table is validated in DataSourceConfigValidator. Dynamic changes not applicable (job creation time only).
Incompatible changes: The return type of generateCreateTableCmds changed from List<CreateTableCommand> to LinkedHashMap<String, CreateTableCommand>. This has only one caller (createTableIfNotExists), so no compatibility concern.
Parallel code paths: MySqlDebeziumJsonDeserializer exists as a parallel path. Its DML write routing goes through the same PipelineCoordinator code (correctly mapped). Its handleSchemaChangeEvent() is a TODO stub that returns empty, so no mapping needed yet. When implemented, it will need to use resolveTargetTable() — the infrastructure is already in the base class. Acceptable.
Test coverage: Good. Two regression test cases cover basic mapping (INSERT/UPDATE/DELETE) and multi-table merge (two PG tables → one Doris table). Tests use ORDER BY for deterministic output. Tables are dropped before use, not after. .out file appears auto-generated.
Observability: No new critical paths requiring additional logging. Existing log messages correctly use source table identifiers.
Persistence/transactions: Not applicable — no EditLog or transaction modifications.
Issues Found
-
[CRITICAL] Compilation error in
ConfigUtil.java: Missingimport java.util.HashMapandimport org.apache.doris.job.cdc.DataSourceConfigKeys. The newparseAllTargetTableMappingsmethod usesHashMapandDataSourceConfigKeysbut neither is imported. This file will not compile. -
[Minor] Dead code:
TABLE_EXCLUDE_COLUMNS_SUFFIXis declared inDataSourceConfigKeysbut never referenced anywhere in the codebase (not in the validator'sALLOW_TABLE_LEVEL_SUFFIXES, not in any consumer). If this is a placeholder for a future feature, it should be removed from this PR and added when actually needed to avoid confusion. -
[Minor] Validator rejects table names containing dots: The
DataSourceConfigValidatorsplits on.and requires exactly 3 parts. A table name containing a dot (e.g.,my.table) would produce more parts and be rejected. PostgreSQL allows dots in quoted identifiers. Consider documenting this limitation or usingindexOf/lastIndexOfinstead ofsplit.
| for (Map.Entry<String, String> entry : config.entrySet()) { | ||
| String key = entry.getKey(); | ||
| if (key.startsWith(prefix) && key.endsWith(suffix)) { | ||
| String srcTable = key.substring(prefix.length(), key.length() - suffix.length()); |
There was a problem hiding this comment.
[CRITICAL] Compilation error: HashMap is used here but java.util.HashMap is not imported. Similarly, DataSourceConfigKeys (used on the two lines above) is not imported either. This file will fail to compile.
Add the following imports:
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import java.util.HashMap;| public static final String TABLE_TARGET_TABLE_SUFFIX = "target_table"; | ||
|
|
||
| // target properties | ||
| public static final String TABLE_PROPS_PREFIX = "table.create.properties."; |
There was a problem hiding this comment.
[Minor] Dead code: TABLE_EXCLUDE_COLUMNS_SUFFIX is declared here but is never used anywhere in the codebase — not in DataSourceConfigValidator.ALLOW_TABLE_LEVEL_SUFFIXES, not in any consumer code. If this is a placeholder for a future feature, consider removing it from this PR to avoid confusion and adding it when the feature is actually implemented.
| throw new IllegalArgumentException("Malformed per-table config key: '" + key | ||
| + "'. Expected format: table.<tableName>.<suffix>"); | ||
| } | ||
| String suffix = parts[parts.length - 1]; |
There was a problem hiding this comment.
[Minor] Table names with dots will be rejected: split("\\.", -1) with parts.length != 3 means that a table name containing a dot (e.g., table.my.dotted.table.target_table) will produce more than 3 parts and fail validation. PostgreSQL allows dots in quoted identifiers.
Consider using indexOf/lastIndexOf instead:
int firstDot = key.indexOf('.', TABLE_LEVEL_PREFIX.length());
int lastDot = key.lastIndexOf('.');
if (firstDot == -1 || firstDot != lastDot - ???) { ... }Or at minimum, document this limitation (no dots in source table names).
|
run buildall |
TPC-H: Total hot run time: 26847 ms |
TPC-DS: Total hot run time: 169055 ms |
What problem does this PR solve?
Summary
Add support for mapping upstream (PostgreSQL) table names to custom downstream (Doris) table names
in CDC streaming jobs. Without this feature, the Doris target table must have the same name as the
upstream source table.
New configuration
Key format:
"table.<srcTable>.target_table" = "<dstTable>"in theFROMclause properties.When not configured, behavior is unchanged (target table name = source table name).
Key design decisions
distinguish source names (for CDC monitoring) from target names (for DDL) — this fixes a bug
where the CDC split assigner would look up the Doris target table name in PostgreSQL
Test plan
created with target name, not source name)
incremental)
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)