Skip to content

Commit fbc1b2b

Browse files
author
linjc13
committed
[FLINK-36794] [cdc-composer/cli] pipeline cdc connector support multiple data sources
1 parent cc561c0 commit fbc1b2b

14 files changed

Lines changed: 905 additions & 103 deletions

File tree

docs/content.zh/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量
5050
</table>
5151
</div>
5252

53-
## 示例
53+
## 单数据源示例
5454

55-
MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
55+
单数据源,从单个 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
5656

5757
```yaml
5858
source:
@@ -77,6 +77,44 @@ pipeline:
7777
parallelism: 4
7878
```
7979
80+
## 多数据源示例
81+
82+
多数据源,从多个mysql数据源读取数据同步到 Doris 的 Pipeline 可以定义如下:
83+
84+
```yaml
85+
sources:
86+
- type: mysql
87+
name: MySQL multiple Source1
88+
hostname: 127.0.0.1
89+
port: 3306
90+
username: admin
91+
password: pass
92+
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
93+
server-id: 5400-5404
94+
server-time-zone: Asia/Shanghai
95+
96+
- type: mysql
97+
name: MySQL multiple Source2
98+
hostname: 127.0.0.2
99+
port: 3307
100+
username: admin
101+
password: pass
102+
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
103+
server-id: 5405-5409
104+
server-time-zone: Asia/Shanghai
105+
106+
sink:
107+
type: doris
108+
name: Doris Sink
109+
fenodes: 127.0.0.1:8030
110+
username: root
111+
password: pass
112+
113+
pipeline:
114+
name: MySQL to Doris Pipeline
115+
parallelism: 4
116+
```
117+
80118
## 连接器配置项
81119
82120
<div class="highlight">

docs/content/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ You may need to configure the following dependencies manually, and pass it with
5151
</table>
5252
</div>
5353

54-
## Example
54+
## single data source Example
5555

56-
An example of the pipeline for reading data from MySQL and sink to Doris can be defined as follows:
56+
An example of the pipeline for reading data from single MySQL and sink to Doris can be defined as follows:
5757

5858
```yaml
5959
source:
@@ -78,6 +78,44 @@ pipeline:
7878
parallelism: 4
7979
```
8080
81+
## multiple data source Example
82+
83+
An example of the pipeline for reading data from multiple MySQL datasource and sink to Doris can be defined as follows:
84+
85+
```yaml
86+
sources:
87+
- type: mysql
88+
name: MySQL multiple Source1
89+
hostname: 127.0.0.1
90+
port: 3306
91+
username: admin
92+
password: pass
93+
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
94+
server-id: 5400-5404
95+
server-time-zone: Asia/Shanghai
96+
97+
- type: mysql
98+
name: MySQL multiple Source2
99+
hostname: 127.0.0.2
100+
port: 3307
101+
username: admin
102+
password: pass
103+
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
104+
server-id: 5405-5409
105+
server-time-zone: Asia/Shanghai
106+
107+
sink:
108+
type: doris
109+
name: Doris Sink
110+
fenodes: 127.0.0.1:8030
111+
username: root
112+
password: pass
113+
114+
pipeline:
115+
name: MySQL to Doris Pipeline
116+
parallelism: 4
117+
```
118+
81119
## Connector Options
82120
83121
<div class="highlight">

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
import java.util.ArrayList;
4444
import java.util.Arrays;
45+
import java.util.Iterator;
4546
import java.util.List;
4647
import java.util.Map;
4748
import java.util.Optional;
@@ -57,6 +58,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
5758

5859
// Parent node keys
5960
private static final String SOURCE_KEY = "source";
61+
private static final String MULTIPLE_SOURCE_KEY = "sources";
6062
private static final String SINK_KEY = "sink";
6163
private static final String ROUTE_KEY = "route";
6264
private static final String TRANSFORM_KEY = "transform";
@@ -65,6 +67,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
6567

6668
// Source / sink keys
6769
private static final String TYPE_KEY = "type";
70+
private static final String SOURCES = "sources";
6871
private static final String NAME_KEY = "name";
6972
private static final String INCLUDE_SCHEMA_EVOLUTION_TYPES = "include.schema.changes";
7073
private static final String EXCLUDE_SCHEMA_EVOLUTION_TYPES = "exclude.schema.changes";
@@ -139,13 +142,20 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
139142
SchemaChangeBehavior schemaChangeBehavior =
140143
userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
141144

142-
// Source is required
143-
SourceDef sourceDef =
144-
toSourceDef(
145-
checkNotNull(
146-
pipelineDefJsonNode.get(SOURCE_KEY),
147-
"Missing required field \"%s\" in pipeline definition",
148-
SOURCE_KEY));
145+
JsonNode multipleSourceNode = pipelineDefJsonNode.get(MULTIPLE_SOURCE_KEY);
146+
List<SourceDef> sourceDefs = new ArrayList<>();
147+
SourceDef sourceDef = null;
148+
if (multipleSourceNode != null) {
149+
Iterator<JsonNode> it = multipleSourceNode.elements();
150+
while (it.hasNext()) {
151+
JsonNode sourceNode = it.next();
152+
getSourceDefs(sourceNode, sourceDefs);
153+
}
154+
} else {
155+
JsonNode sourceNode = pipelineDefJsonNode.get(SOURCE_KEY);
156+
// Source is required
157+
sourceDef = getSourceDefs(sourceNode, sourceDefs);
158+
}
149159

150160
// Sink is required
151161
SinkDef sinkDef =
@@ -175,7 +185,25 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
175185
pipelineConfig.addAll(userPipelineConfig);
176186

177187
return new PipelineDef(
178-
sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
188+
sourceDefs,
189+
sourceDef,
190+
sinkDef,
191+
routeDefs,
192+
transformDefs,
193+
udfDefs,
194+
modelDefs,
195+
pipelineConfig);
196+
}
197+
198+
private SourceDef getSourceDefs(JsonNode root, List<SourceDef> sourceDefs) {
199+
SourceDef sourceDef =
200+
toSourceDef(
201+
checkNotNull(
202+
root,
203+
"Missing required field \"%s\" in pipeline definition",
204+
SOURCE_KEY));
205+
sourceDefs.add(sourceDef);
206+
return sourceDef;
179207
}
180208

181209
private SourceDef toSourceDef(JsonNode sourceNode) {

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java

Lines changed: 86 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@
3838
import java.net.URL;
3939
import java.time.Duration;
4040
import java.time.ZoneId;
41+
import java.util.ArrayList;
4142
import java.util.Arrays;
4243
import java.util.Collections;
4344
import java.util.LinkedHashMap;
45+
import java.util.List;
4446
import java.util.Set;
4547

4648
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
@@ -290,25 +292,36 @@ private void testSchemaEvolutionTypesParsing(
290292
.build())));
291293
}
292294

295+
@Test
296+
void testMultipleSourceDefinition() throws Exception {
297+
URL resource = Resources.getResource("definitions/multiple_source_mtd.yaml");
298+
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
299+
PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration());
300+
assertThat(pipelineDef).isInstanceOf(PipelineDef.class);
301+
}
302+
303+
SourceDef sourceDef =
304+
new SourceDef(
305+
"mysql",
306+
"source-database",
307+
Configuration.fromMap(
308+
ImmutableMap.<String, String>builder()
309+
.put("host", "localhost")
310+
.put("port", "3306")
311+
.put("username", "admin")
312+
.put("password", "pass")
313+
.put(
314+
"tables",
315+
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
316+
.put("chunk-column", "app_order_.*:id,web_order:product_id")
317+
.put("capture-new-tables", "true")
318+
.build()));
319+
List<SourceDef> sourceDefs = new ArrayList<>(Arrays.asList(new SourceDef[] {sourceDef}));
320+
293321
private final PipelineDef fullDef =
294322
new PipelineDef(
295-
new SourceDef(
296-
"mysql",
297-
"source-database",
298-
Configuration.fromMap(
299-
ImmutableMap.<String, String>builder()
300-
.put("host", "localhost")
301-
.put("port", "3306")
302-
.put("username", "admin")
303-
.put("password", "pass")
304-
.put(
305-
"tables",
306-
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
307-
.put(
308-
"chunk-column",
309-
"app_order_.*:id,web_order:product_id")
310-
.put("capture-new-tables", "true")
311-
.build())),
323+
null,
324+
sourceDef,
312325
new SinkDef(
313326
"kafka",
314327
"sink-queue",
@@ -428,25 +441,27 @@ void testParsingFullDefinitionFromString() throws Exception {
428441
assertThat(pipelineDef).isEqualTo(fullDef);
429442
}
430443

444+
SourceDef fullsourceDef =
445+
new SourceDef(
446+
"mysql",
447+
"source-database",
448+
Configuration.fromMap(
449+
ImmutableMap.<String, String>builder()
450+
.put("host", "localhost")
451+
.put("port", "3306")
452+
.put("username", "admin")
453+
.put("password", "pass")
454+
.put(
455+
"tables",
456+
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
457+
.put("chunk-column", "app_order_.*:id,web_order:product_id")
458+
.put("capture-new-tables", "true")
459+
.build()));
460+
431461
private final PipelineDef fullDefWithGlobalConf =
432462
new PipelineDef(
433-
new SourceDef(
434-
"mysql",
435-
"source-database",
436-
Configuration.fromMap(
437-
ImmutableMap.<String, String>builder()
438-
.put("host", "localhost")
439-
.put("port", "3306")
440-
.put("username", "admin")
441-
.put("password", "pass")
442-
.put(
443-
"tables",
444-
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
445-
.put(
446-
"chunk-column",
447-
"app_order_.*:id,web_order:product_id")
448-
.put("capture-new-tables", "true")
449-
.build())),
463+
null,
464+
fullsourceDef,
450465
new SinkDef(
451466
"kafka",
452467
"sink-queue",
@@ -507,21 +522,24 @@ void testParsingFullDefinitionFromString() throws Exception {
507522
.put("execution.runtime-mode", "STREAMING")
508523
.build()));
509524

525+
SourceDef defSourceDef =
526+
new SourceDef(
527+
"mysql",
528+
null,
529+
Configuration.fromMap(
530+
ImmutableMap.<String, String>builder()
531+
.put("host", "localhost")
532+
.put("port", "3306")
533+
.put("username", "admin")
534+
.put("password", "pass")
535+
.put(
536+
"tables",
537+
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
538+
.build()));
539+
510540
private final PipelineDef defWithOptional =
511541
new PipelineDef(
512-
new SourceDef(
513-
"mysql",
514-
null,
515-
Configuration.fromMap(
516-
ImmutableMap.<String, String>builder()
517-
.put("host", "localhost")
518-
.put("port", "3306")
519-
.put("username", "admin")
520-
.put("password", "pass")
521-
.put(
522-
"tables",
523-
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
524-
.build())),
542+
defSourceDef,
525543
new SinkDef(
526544
"kafka",
527545
null,
@@ -548,9 +566,11 @@ void testParsingFullDefinitionFromString() throws Exception {
548566
.put("parallelism", "4")
549567
.build()));
550568

569+
SourceDef mysqlSourceDef = new SourceDef("mysql", null, new Configuration());
570+
551571
private final PipelineDef minimizedDef =
552572
new PipelineDef(
553-
new SourceDef("mysql", null, new Configuration()),
573+
mysqlSourceDef,
554574
new SinkDef(
555575
"kafka",
556576
null,
@@ -568,25 +588,26 @@ void testParsingFullDefinitionFromString() throws Exception {
568588
Collections.singletonMap(
569589
"local-time-zone", ZoneId.systemDefault().toString())));
570590

591+
SourceDef routeRepSymDef =
592+
new SourceDef(
593+
"mysql",
594+
"source-database",
595+
Configuration.fromMap(
596+
ImmutableMap.<String, String>builder()
597+
.put("host", "localhost")
598+
.put("port", "3306")
599+
.put("username", "admin")
600+
.put("password", "pass")
601+
.put(
602+
"tables",
603+
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
604+
.put("chunk-column", "app_order_.*:id,web_order:product_id")
605+
.put("capture-new-tables", "true")
606+
.build()));
607+
571608
private final PipelineDef fullDefWithRouteRepSym =
572609
new PipelineDef(
573-
new SourceDef(
574-
"mysql",
575-
"source-database",
576-
Configuration.fromMap(
577-
ImmutableMap.<String, String>builder()
578-
.put("host", "localhost")
579-
.put("port", "3306")
580-
.put("username", "admin")
581-
.put("password", "pass")
582-
.put(
583-
"tables",
584-
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
585-
.put(
586-
"chunk-column",
587-
"app_order_.*:id,web_order:product_id")
588-
.put("capture-new-tables", "true")
589-
.build())),
610+
routeRepSymDef,
590611
new SinkDef(
591612
"kafka",
592613
"sink-queue",

0 commit comments

Comments
 (0)