Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量
</table>
</div>

## 示例
## 单数据源示例

MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
单数据源,从单个 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:

```yaml
source:
Expand All @@ -77,6 +77,47 @@ pipeline:
parallelism: 4
```

## 多数据源示例

多数据源,从多个mysql数据源读取数据同步到 Doris 的 Pipeline 可以定义如下:

```yaml
sources:
- type: mysql
name: MySQL multiple Source1
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5400-5404
server-time-zone: Asia/Shanghai
source.unique.id: 1

- type: mysql
name: MySQL multiple Source2
hostname: 127.0.0.2
Copy link
Copy Markdown
Contributor

@lvyanquan lvyanquan Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is essential to verify state compatibility.
Would adding or removing a Source from an existing job lead to state compatibility issues, or could reordering the Sources result in state inconsistency? Such incompatible operations must be exposed as errors to avoid silent handling.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is essential to verify state compatibility. Would adding or removing a Source from an existing job lead to state compatibility issues, or could reordering the Sources result in state inconsistency? Such incompatible operations must be exposed as errors to avoid silent handling.
@lvyanquan Thanks for the review!I have already made the modifications and tested them. Please refer to the following document
https://www.yuque.com/oniucium/lfk1te/yruqcmf9vbz4rxzv?singleDoc# 《multiple source从状态恢复验证》

port: 3307
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5405-5409
server-time-zone: Asia/Shanghai
source.unique.id: 2

sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass

pipeline:
name: MySQL to Doris Pipeline
operator.uid.prefix: multiple-source
parallelism: 4
```

## 连接器配置项

<div class="highlight">
Expand Down
45 changes: 43 additions & 2 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ You may need to configure the following dependencies manually, and pass it with
</table>
</div>

## Example
## single data source Example

An example of the pipeline for reading data from MySQL and sink to Doris can be defined as follows:
An example of the pipeline for reading data from single MySQL and sink to Doris can be defined as follows:

```yaml
source:
Expand All @@ -78,6 +78,47 @@ pipeline:
parallelism: 4
```

## multiple data source Example

An example of the pipeline for reading data from multiple MySQL datasource and sink to Doris can be defined as follows:

```yaml
sources:
- type: mysql
name: MySQL multiple Source1
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5400-5404
server-time-zone: Asia/Shanghai
source.unique.id: 1

- type: mysql
name: MySQL multiple Source2
hostname: 127.0.0.2
port: 3307
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5405-5409
server-time-zone: Asia/Shanghai
source.unique.id: 2

sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass

pipeline:
name: MySQL to Doris Pipeline
operator.uid.prefix: multiple-source
parallelism: 4
```

## Connector Options

<div class="highlight">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {

// Parent node keys
private static final String SOURCE_KEY = "source";
private static final String MULTIPLE_SOURCE_KEY = "sources";
private static final String SINK_KEY = "sink";
private static final String ROUTE_KEY = "route";
private static final String TRANSFORM_KEY = "transform";
Expand All @@ -70,6 +71,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {

// Source / sink keys
private static final String TYPE_KEY = "type";
private static final String SOURCES = "sources";
private static final String NAME_KEY = "name";
private static final String INCLUDE_SCHEMA_EVOLUTION_TYPES = "include.schema.changes";
private static final String EXCLUDE_SCHEMA_EVOLUTION_TYPES = "exclude.schema.changes";
Expand Down Expand Up @@ -126,7 +128,7 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
validateJsonNodeKeys(
TOP_LEVEL_NAME,
pipelineDefJsonNode,
Arrays.asList(SOURCE_KEY, SINK_KEY),
Arrays.asList(SOURCE_KEY, SINK_KEY, MULTIPLE_SOURCE_KEY),
Arrays.asList(ROUTE_KEY, TRANSFORM_KEY, PIPELINE_KEY));

// UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since
Expand All @@ -151,13 +153,20 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
SchemaChangeBehavior schemaChangeBehavior =
userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);

// Source is required
SourceDef sourceDef =
toSourceDef(
checkNotNull(
pipelineDefJsonNode.get(SOURCE_KEY),
"Missing required field \"%s\" in pipeline definition",
SOURCE_KEY));
JsonNode multipleSourceNode = pipelineDefJsonNode.get(MULTIPLE_SOURCE_KEY);
List<SourceDef> sourceDefs = new ArrayList<>();
SourceDef sourceDef = null;
if (multipleSourceNode != null) {
Iterator<JsonNode> it = multipleSourceNode.elements();
while (it.hasNext()) {
JsonNode sourceNode = it.next();
getSourceDefs(sourceNode, sourceDefs);
}
} else {
JsonNode sourceNode = pipelineDefJsonNode.get(SOURCE_KEY);
// Source is required
sourceDef = getSourceDefs(sourceNode, sourceDefs);
}

// Sink is required
SinkDef sinkDef =
Expand Down Expand Up @@ -189,7 +198,25 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
pipelineConfig.addAll(userPipelineConfig);

return new PipelineDef(
sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
sourceDefs,
sourceDef,
sinkDef,
routeDefs,
transformDefs,
udfDefs,
modelDefs,
pipelineConfig);
}

private SourceDef getSourceDefs(JsonNode root, List<SourceDef> sourceDefs) {
SourceDef sourceDef =
toSourceDef(
checkNotNull(
root,
"Missing required field \"%s\" in pipeline definition",
SOURCE_KEY));
sourceDefs.add(sourceDef);
return sourceDef;
}

private SourceDef toSourceDef(JsonNode sourceNode) {
Expand Down Expand Up @@ -451,6 +478,9 @@ private void validateJsonNodeKeys(

for (String key : requiredKeys) {
if (!presentedKeys.contains(key)) {
if (key.equals(SOURCE_KEY) && presentedKeys.contains(MULTIPLE_SOURCE_KEY)) {
continue;
}
throw new IllegalArgumentException(
String.format(
"Missing required field \"%s\" in %s configuration",
Expand Down
Loading
Loading