Skip to content

Commit 2225400

Browse files
author
linjc13
committed
[FLINK-36794] [cdc-composer/cli] pipeline cdc connector support multiple data sources
1 parent 8edc345 commit 2225400

16 files changed

Lines changed: 963 additions & 109 deletions

File tree

docs/content.zh/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量
5050
</table>
5151
</div>
5252

53-
## 示例
53+
## 单数据源示例
5454

55-
MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
55+
单数据源,从单个 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
5656

5757
```yaml
5858
source:
@@ -77,6 +77,47 @@ pipeline:
7777
parallelism: 4
7878
```
7979
80+
## 多数据源示例
81+
82+
多数据源,从多个mysql数据源读取数据同步到 Doris 的 Pipeline 可以定义如下:
83+
84+
```yaml
85+
sources:
86+
- type: mysql
87+
name: MySQL multiple Source1
88+
hostname: 127.0.0.1
89+
port: 3306
90+
username: admin
91+
password: pass
92+
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
93+
server-id: 5400-5404
94+
server-time-zone: Asia/Shanghai
95+
source.unique.id: 1
96+
97+
- type: mysql
98+
name: MySQL multiple Source2
99+
hostname: 127.0.0.2
100+
port: 3307
101+
username: admin
102+
password: pass
103+
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
104+
server-id: 5405-5409
105+
server-time-zone: Asia/Shanghai
106+
source.unique.id: 2
107+
108+
sink:
109+
type: doris
110+
name: Doris Sink
111+
fenodes: 127.0.0.1:8030
112+
username: root
113+
password: pass
114+
115+
pipeline:
116+
name: MySQL to Doris Pipeline
117+
operator.uid.prefix: multiple-source
118+
parallelism: 4
119+
```
120+
80121
## 连接器配置项
81122
82123
<div class="highlight">

docs/content/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ You may need to configure the following dependencies manually, and pass it with
5151
</table>
5252
</div>
5353

54-
## Example
54+
## single data source Example
5555

56-
An example of the pipeline for reading data from MySQL and sink to Doris can be defined as follows:
56+
An example of the pipeline for reading data from single MySQL and sink to Doris can be defined as follows:
5757

5858
```yaml
5959
source:
@@ -78,6 +78,47 @@ pipeline:
7878
parallelism: 4
7979
```
8080
81+
## multiple data source Example
82+
83+
An example of the pipeline for reading data from multiple MySQL datasource and sink to Doris can be defined as follows:
84+
85+
```yaml
86+
sources:
87+
- type: mysql
88+
name: MySQL multiple Source1
89+
hostname: 127.0.0.1
90+
port: 3306
91+
username: admin
92+
password: pass
93+
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
94+
server-id: 5400-5404
95+
server-time-zone: Asia/Shanghai
96+
source.unique.id: 1
97+
98+
- type: mysql
99+
name: MySQL multiple Source2
100+
hostname: 127.0.0.2
101+
port: 3307
102+
username: admin
103+
password: pass
104+
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
105+
server-id: 5405-5409
106+
server-time-zone: Asia/Shanghai
107+
source.unique.id: 2
108+
109+
sink:
110+
type: doris
111+
name: Doris Sink
112+
fenodes: 127.0.0.1:8030
113+
username: root
114+
password: pass
115+
116+
pipeline:
117+
name: MySQL to Doris Pipeline
118+
operator.uid.prefix: multiple-source
119+
parallelism: 4
120+
```
121+
81122
## Connector Options
82123
83124
<div class="highlight">

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
6262

6363
// Parent node keys
6464
private static final String SOURCE_KEY = "source";
65+
private static final String MULTIPLE_SOURCE_KEY = "sources";
6566
private static final String SINK_KEY = "sink";
6667
private static final String ROUTE_KEY = "route";
6768
private static final String TRANSFORM_KEY = "transform";
@@ -70,6 +71,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
7071

7172
// Source / sink keys
7273
private static final String TYPE_KEY = "type";
74+
private static final String SOURCES = "sources";
7375
private static final String NAME_KEY = "name";
7476
private static final String INCLUDE_SCHEMA_EVOLUTION_TYPES = "include.schema.changes";
7577
private static final String EXCLUDE_SCHEMA_EVOLUTION_TYPES = "exclude.schema.changes";
@@ -126,7 +128,7 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
126128
validateJsonNodeKeys(
127129
TOP_LEVEL_NAME,
128130
pipelineDefJsonNode,
129-
Arrays.asList(SOURCE_KEY, SINK_KEY),
131+
Arrays.asList(SOURCE_KEY, SINK_KEY, MULTIPLE_SOURCE_KEY),
130132
Arrays.asList(ROUTE_KEY, TRANSFORM_KEY, PIPELINE_KEY));
131133

132134
// UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since
@@ -151,13 +153,20 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
151153
SchemaChangeBehavior schemaChangeBehavior =
152154
userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
153155

154-
// Source is required
155-
SourceDef sourceDef =
156-
toSourceDef(
157-
checkNotNull(
158-
pipelineDefJsonNode.get(SOURCE_KEY),
159-
"Missing required field \"%s\" in pipeline definition",
160-
SOURCE_KEY));
156+
JsonNode multipleSourceNode = pipelineDefJsonNode.get(MULTIPLE_SOURCE_KEY);
157+
List<SourceDef> sourceDefs = new ArrayList<>();
158+
SourceDef sourceDef = null;
159+
if (multipleSourceNode != null) {
160+
Iterator<JsonNode> it = multipleSourceNode.elements();
161+
while (it.hasNext()) {
162+
JsonNode sourceNode = it.next();
163+
getSourceDefs(sourceNode, sourceDefs);
164+
}
165+
} else {
166+
JsonNode sourceNode = pipelineDefJsonNode.get(SOURCE_KEY);
167+
// Source is required
168+
sourceDef = getSourceDefs(sourceNode, sourceDefs);
169+
}
161170

162171
// Sink is required
163172
SinkDef sinkDef =
@@ -189,7 +198,25 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
189198
pipelineConfig.addAll(userPipelineConfig);
190199

191200
return new PipelineDef(
192-
sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
201+
sourceDefs,
202+
sourceDef,
203+
sinkDef,
204+
routeDefs,
205+
transformDefs,
206+
udfDefs,
207+
modelDefs,
208+
pipelineConfig);
209+
}
210+
211+
private SourceDef getSourceDefs(JsonNode root, List<SourceDef> sourceDefs) {
212+
SourceDef sourceDef =
213+
toSourceDef(
214+
checkNotNull(
215+
root,
216+
"Missing required field \"%s\" in pipeline definition",
217+
SOURCE_KEY));
218+
sourceDefs.add(sourceDef);
219+
return sourceDef;
193220
}
194221

195222
private SourceDef toSourceDef(JsonNode sourceNode) {
@@ -451,6 +478,9 @@ private void validateJsonNodeKeys(
451478

452479
for (String key : requiredKeys) {
453480
if (!presentedKeys.contains(key)) {
481+
if (key.equals(SOURCE_KEY) && presentedKeys.contains(MULTIPLE_SOURCE_KEY)) {
482+
continue;
483+
}
454484
throw new IllegalArgumentException(
455485
String.format(
456486
"Missing required field \"%s\" in %s configuration",

0 commit comments

Comments
 (0)