diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
index fc38481fdbc..0a5ef47cf21 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
@@ -50,9 +50,9 @@ MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量
-## 示例
+## 单数据源示例
-从 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
+单数据源,从单个 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
```yaml
source:
@@ -77,6 +77,47 @@ pipeline:
parallelism: 4
```
+## 多数据源示例
+
+多数据源,从多个mysql数据源读取数据同步到 Doris 的 Pipeline 可以定义如下:
+
+```yaml
+sources:
+ - type: mysql
+ name: MySQL multiple Source1
+ hostname: 127.0.0.1
+ port: 3306
+ username: admin
+ password: pass
+ tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+ server-id: 5400-5404
+ server-time-zone: Asia/Shanghai
+ source.unique.id: 1
+
+ - type: mysql
+ name: MySQL multiple Source2
+ hostname: 127.0.0.2
+ port: 3307
+ username: admin
+ password: pass
+ tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+ server-id: 5405-5409
+ server-time-zone: Asia/Shanghai
+ source.unique.id: 2
+
+sink:
+ type: doris
+ name: Doris Sink
+ fenodes: 127.0.0.1:8030
+ username: root
+ password: pass
+
+pipeline:
+ name: MySQL to Doris Pipeline
+ operator.uid.prefix: multiple-source
+ parallelism: 4
+```
+
## 连接器配置项
diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md
index a8964cdaffb..7c49995184f 100644
--- a/docs/content/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md
@@ -51,9 +51,9 @@ You may need to configure the following dependencies manually, and pass it with
-## Example
+## single data source Example
-An example of the pipeline for reading data from MySQL and sink to Doris can be defined as follows:
+An example of the pipeline for reading data from single MySQL and sink to Doris can be defined as follows:
```yaml
source:
@@ -78,6 +78,47 @@ pipeline:
parallelism: 4
```
+## multiple data source Example
+
+An example of the pipeline for reading data from multiple MySQL datasource and sink to Doris can be defined as follows:
+
+```yaml
+sources:
+ - type: mysql
+ name: MySQL multiple Source1
+ hostname: 127.0.0.1
+ port: 3306
+ username: admin
+ password: pass
+ tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+ server-id: 5400-5404
+ server-time-zone: Asia/Shanghai
+ source.unique.id: 1
+
+ - type: mysql
+ name: MySQL multiple Source2
+ hostname: 127.0.0.2
+ port: 3307
+ username: admin
+ password: pass
+ tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+ server-id: 5405-5409
+ server-time-zone: Asia/Shanghai
+ source.unique.id: 2
+
+sink:
+ type: doris
+ name: Doris Sink
+ fenodes: 127.0.0.1:8030
+ username: root
+ password: pass
+
+pipeline:
+ name: MySQL to Doris Pipeline
+ operator.uid.prefix: multiple-source
+ parallelism: 4
+```
+
## Connector Options
diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index efb4e79d26b..c850c82358d 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -62,6 +62,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
// Parent node keys
private static final String SOURCE_KEY = "source";
+ private static final String MULTIPLE_SOURCE_KEY = "sources";
private static final String SINK_KEY = "sink";
private static final String ROUTE_KEY = "route";
private static final String TRANSFORM_KEY = "transform";
@@ -70,6 +71,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
// Source / sink keys
private static final String TYPE_KEY = "type";
+ private static final String SOURCES = "sources";
private static final String NAME_KEY = "name";
private static final String INCLUDE_SCHEMA_EVOLUTION_TYPES = "include.schema.changes";
private static final String EXCLUDE_SCHEMA_EVOLUTION_TYPES = "exclude.schema.changes";
@@ -126,7 +128,7 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
validateJsonNodeKeys(
TOP_LEVEL_NAME,
pipelineDefJsonNode,
- Arrays.asList(SOURCE_KEY, SINK_KEY),
+ Arrays.asList(SOURCE_KEY, SINK_KEY, MULTIPLE_SOURCE_KEY),
Arrays.asList(ROUTE_KEY, TRANSFORM_KEY, PIPELINE_KEY));
// UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since
@@ -151,13 +153,20 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
SchemaChangeBehavior schemaChangeBehavior =
userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
- // Source is required
- SourceDef sourceDef =
- toSourceDef(
- checkNotNull(
- pipelineDefJsonNode.get(SOURCE_KEY),
- "Missing required field \"%s\" in pipeline definition",
- SOURCE_KEY));
+ JsonNode multipleSourceNode = pipelineDefJsonNode.get(MULTIPLE_SOURCE_KEY);
+ List
sourceDefs = new ArrayList<>();
+ SourceDef sourceDef = null;
+ if (multipleSourceNode != null) {
+ Iterator it = multipleSourceNode.elements();
+ while (it.hasNext()) {
+ JsonNode sourceNode = it.next();
+ getSourceDefs(sourceNode, sourceDefs);
+ }
+ } else {
+ JsonNode sourceNode = pipelineDefJsonNode.get(SOURCE_KEY);
+ // Source is required
+ sourceDef = getSourceDefs(sourceNode, sourceDefs);
+ }
// Sink is required
SinkDef sinkDef =
@@ -189,7 +198,25 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
pipelineConfig.addAll(userPipelineConfig);
return new PipelineDef(
- sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
+ sourceDefs,
+ sourceDef,
+ sinkDef,
+ routeDefs,
+ transformDefs,
+ udfDefs,
+ modelDefs,
+ pipelineConfig);
+ }
+
+ private SourceDef getSourceDefs(JsonNode root, List sourceDefs) {
+ SourceDef sourceDef =
+ toSourceDef(
+ checkNotNull(
+ root,
+ "Missing required field \"%s\" in pipeline definition",
+ SOURCE_KEY));
+ sourceDefs.add(sourceDef);
+ return sourceDef;
}
private SourceDef toSourceDef(JsonNode sourceNode) {
@@ -451,6 +478,9 @@ private void validateJsonNodeKeys(
for (String key : requiredKeys) {
if (!presentedKeys.contains(key)) {
+ if (key.equals(SOURCE_KEY) && presentedKeys.contains(MULTIPLE_SOURCE_KEY)) {
+ continue;
+ }
throw new IllegalArgumentException(
String.format(
"Missing required field \"%s\" in %s configuration",
diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index bf4c377ee2c..ed5d869b81f 100644
--- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -38,9 +38,11 @@
import java.net.URL;
import java.time.Duration;
import java.time.ZoneId;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Set;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
@@ -290,25 +292,36 @@ private void testSchemaEvolutionTypesParsing(
.build())));
}
+ @Test
+ void testMultipleSourceDefinition() throws Exception {
+ URL resource = Resources.getResource("definitions/multiple_source_mtd.yaml");
+ YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
+ PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration());
+ assertThat(pipelineDef).isInstanceOf(PipelineDef.class);
+ }
+
+ SourceDef sourceDef =
+ new SourceDef(
+ "mysql",
+ "source-database",
+ Configuration.fromMap(
+ ImmutableMap.builder()
+ .put("host", "localhost")
+ .put("port", "3306")
+ .put("username", "admin")
+ .put("password", "pass")
+ .put(
+ "tables",
+ "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
+ .put("chunk-column", "app_order_.*:id,web_order:product_id")
+ .put("capture-new-tables", "true")
+ .build()));
+ List sourceDefs = new ArrayList<>(Arrays.asList(new SourceDef[] {sourceDef}));
+
private final PipelineDef fullDef =
new PipelineDef(
- new SourceDef(
- "mysql",
- "source-database",
- Configuration.fromMap(
- ImmutableMap.builder()
- .put("host", "localhost")
- .put("port", "3306")
- .put("username", "admin")
- .put("password", "pass")
- .put(
- "tables",
- "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
- .put(
- "chunk-column",
- "app_order_.*:id,web_order:product_id")
- .put("capture-new-tables", "true")
- .build())),
+ null,
+ sourceDef,
new SinkDef(
"kafka",
"sink-queue",
@@ -428,25 +441,27 @@ void testParsingFullDefinitionFromString() throws Exception {
assertThat(pipelineDef).isEqualTo(fullDef);
}
+ SourceDef fullsourceDef =
+ new SourceDef(
+ "mysql",
+ "source-database",
+ Configuration.fromMap(
+ ImmutableMap.builder()
+ .put("host", "localhost")
+ .put("port", "3306")
+ .put("username", "admin")
+ .put("password", "pass")
+ .put(
+ "tables",
+ "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
+ .put("chunk-column", "app_order_.*:id,web_order:product_id")
+ .put("capture-new-tables", "true")
+ .build()));
+
private final PipelineDef fullDefWithGlobalConf =
new PipelineDef(
- new SourceDef(
- "mysql",
- "source-database",
- Configuration.fromMap(
- ImmutableMap.builder()
- .put("host", "localhost")
- .put("port", "3306")
- .put("username", "admin")
- .put("password", "pass")
- .put(
- "tables",
- "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
- .put(
- "chunk-column",
- "app_order_.*:id,web_order:product_id")
- .put("capture-new-tables", "true")
- .build())),
+ null,
+ fullsourceDef,
new SinkDef(
"kafka",
"sink-queue",
@@ -507,21 +522,24 @@ void testParsingFullDefinitionFromString() throws Exception {
.put("execution.runtime-mode", "STREAMING")
.build()));
+ SourceDef defSourceDef =
+ new SourceDef(
+ "mysql",
+ null,
+ Configuration.fromMap(
+ ImmutableMap.builder()
+ .put("host", "localhost")
+ .put("port", "3306")
+ .put("username", "admin")
+ .put("password", "pass")
+ .put(
+ "tables",
+ "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
+ .build()));
+
private final PipelineDef defWithOptional =
new PipelineDef(
- new SourceDef(
- "mysql",
- null,
- Configuration.fromMap(
- ImmutableMap.builder()
- .put("host", "localhost")
- .put("port", "3306")
- .put("username", "admin")
- .put("password", "pass")
- .put(
- "tables",
- "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
- .build())),
+ defSourceDef,
new SinkDef(
"kafka",
null,
@@ -548,9 +566,11 @@ void testParsingFullDefinitionFromString() throws Exception {
.put("parallelism", "4")
.build()));
+ SourceDef mysqlSourceDef = new SourceDef("mysql", null, new Configuration());
+
private final PipelineDef minimizedDef =
new PipelineDef(
- new SourceDef("mysql", null, new Configuration()),
+ mysqlSourceDef,
new SinkDef(
"kafka",
null,
@@ -568,25 +588,26 @@ void testParsingFullDefinitionFromString() throws Exception {
Collections.singletonMap(
"local-time-zone", ZoneId.systemDefault().toString())));
+ SourceDef routeRepSymDef =
+ new SourceDef(
+ "mysql",
+ "source-database",
+ Configuration.fromMap(
+ ImmutableMap.builder()
+ .put("host", "localhost")
+ .put("port", "3306")
+ .put("username", "admin")
+ .put("password", "pass")
+ .put(
+ "tables",
+ "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
+ .put("chunk-column", "app_order_.*:id,web_order:product_id")
+ .put("capture-new-tables", "true")
+ .build()));
+
private final PipelineDef fullDefWithRouteRepSym =
new PipelineDef(
- new SourceDef(
- "mysql",
- "source-database",
- Configuration.fromMap(
- ImmutableMap.builder()
- .put("host", "localhost")
- .put("port", "3306")
- .put("username", "admin")
- .put("password", "pass")
- .put(
- "tables",
- "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
- .put(
- "chunk-column",
- "app_order_.*:id,web_order:product_id")
- .put("capture-new-tables", "true")
- .build())),
+ routeRepSymDef,
new SinkDef(
"kafka",
"sink-queue",
diff --git a/flink-cdc-cli/src/test/resources/definitions/multiple_source_mtd.yaml b/flink-cdc-cli/src/test/resources/definitions/multiple_source_mtd.yaml
new file mode 100644
index 00000000000..ff5b2f5c696
--- /dev/null
+++ b/flink-cdc-cli/src/test/resources/definitions/multiple_source_mtd.yaml
@@ -0,0 +1,56 @@
+################################################################################
+# Copyright 2023 Ververica Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+sources:
+ - type: mysql
+ hostname: 127.0.0.1
+ port: 50001
+ username: datawings
+ password: 123
+ tables: test.table01
+ server-id: 5400-5404
+ server-time-zone: Asia/Shanghai
+ source.unique.id: 1
+
+ - type: mysql
+ hostname: 127.0.0.2
+ port: 50002
+ username: datawings
+ password: 123
+ tables: test.table02
+ server-id: 5404-5408
+ server-time-zone: Asia/Shanghai
+ source.unique.id: 2
+
+
+route:
+ - source-table: test.table01
+ sink-table: test.table01
+ description: sync table to destination table1
+ - source-table: test.table02
+ sink-table: test.table02
+ description: sync table to destination table2
+
+sink:
+ type: doris
+ fenodes: 127.0.0.1:9033
+ username: root
+ password: 123
+ table.create.properties.light_schema_change: false
+ table.create.properties.replication_num: 1
+
+pipeline:
+ name: Sync MySQL Database to doris
+ operator.uid.prefix: multiple-source
+ parallelism: 1
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
index 8e4ff02276e..db605412c98 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
@@ -41,6 +41,7 @@ public class FactoryHelper {
private final Factory factory;
private final Factory.Context context;
+ private static final String MULTIPLE_SOURCE_UNIQUE_ID = "source.unique.id";
private FactoryHelper(Factory factory, Factory.Context context) {
this.factory = factory;
@@ -94,6 +95,10 @@ public static void validateUnconsumedKeys(
final Set remainingOptionKeys = new HashSet<>(allOptionKeys);
remainingOptionKeys.removeAll(consumedOptionKeys);
if (!remainingOptionKeys.isEmpty()) {
+ if (remainingOptionKeys.size() == 1
+ && remainingOptionKeys.iterator().next().equals(MULTIPLE_SOURCE_UNIQUE_ID)) {
+ return;
+ }
throw new ValidationException(
String.format(
"Unsupported options found for '%s'.\n\n"
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java
index 985613e131d..d746090354a 100644
--- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java
@@ -24,6 +24,8 @@
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
+import javax.annotation.Nullable;
+
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
@@ -53,7 +55,8 @@
* before being submitted to the computing engine.
*/
public class PipelineDef {
- private final SourceDef source;
+ @Nullable private final List sources;
+ @Nullable private final SourceDef source;
private final SinkDef sink;
private final List routes;
private final List transforms;
@@ -62,6 +65,7 @@ public class PipelineDef {
private final Configuration config;
public PipelineDef(
+ List sources,
SourceDef source,
SinkDef sink,
List routes,
@@ -69,13 +73,24 @@ public PipelineDef(
List udfs,
List models,
Configuration config) {
- this.source = source;
+ this.sources = sources;
this.sink = sink;
this.routes = routes;
this.transforms = transforms;
this.udfs = udfs;
this.models = models;
this.config = evaluatePipelineRuntimeExecutionMode(evaluatePipelineTimeZone(config));
+ this.source = source;
+ }
+
+ public PipelineDef(
+ List sources,
+ SinkDef sink,
+ List routes,
+ List transforms,
+ List udfs,
+ Configuration config) {
+ this(sources, null, sink, routes, transforms, udfs, new ArrayList<>(), config);
}
public PipelineDef(
@@ -85,7 +100,11 @@ public PipelineDef(
List transforms,
List udfs,
Configuration config) {
- this(source, sink, routes, transforms, udfs, new ArrayList<>(), config);
+ this(null, source, sink, routes, transforms, udfs, new ArrayList<>(), config);
+ }
+
+ public List getSources() {
+ return sources;
}
public SourceDef getSource() {
@@ -119,9 +138,11 @@ public Configuration getConfig() {
@Override
public String toString() {
return "PipelineDef{"
- + "source="
+ + "sources="
+ + sources
+ + ",source="
+ source
- + ", sink="
+ + ",sink="
+ sink
+ ", routes="
+ routes
@@ -145,7 +166,8 @@ public boolean equals(Object o) {
return false;
}
PipelineDef that = (PipelineDef) o;
- return Objects.equals(source, that.source)
+ return Objects.equals(sources, that.sources)
+ && Objects.equals(source, that.source)
&& Objects.equals(sink, that.sink)
&& Objects.equals(routes, that.routes)
&& Objects.equals(transforms, that.transforms)
@@ -156,7 +178,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
- return Objects.hash(source, sink, routes, transforms, udfs, models, config);
+ return Objects.hash(sources, source, sink, routes, transforms, udfs, models, config);
}
// ------------------------------------------------------------------------
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index 6de9045af39..cb4c129cd91 100644
--- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -19,6 +19,7 @@
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
@@ -29,6 +30,7 @@
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
import org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator;
@@ -41,6 +43,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.FlinkRuntimeException;
import javax.annotation.Nullable;
@@ -59,6 +62,7 @@
public class FlinkPipelineComposer implements PipelineComposer {
private static final String SCHEMA_OPERATOR_UID_SUFFIX = "schema-operator";
+ private static final String MULTIPLE_SOURCE_UNIQUE_ID = "source.unique.id";
private final StreamExecutionEnvironment env;
private final boolean isBlocking;
@@ -164,21 +168,62 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)
// And required constructors
OperatorIDGenerator schemaOperatorIDGenerator = new OperatorIDGenerator(schemaOperatorUid);
- DataSource dataSource =
- sourceTranslator.createDataSource(pipelineDef.getSource(), pipelineDefConfig, env);
- DataSink dataSink =
- sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env);
+ List sourceDefs = pipelineDef.getSources();
- boolean isParallelMetadataSource = dataSource.isParallelMetadataSource();
+ // O ---> Source
+ DataStream stream = null;
+ DataSource dataSource = null;
+ boolean isParallelMetadataSource;
// O ---> Source
- DataStream stream =
- sourceTranslator.translate(
- pipelineDef.getSource(),
- dataSource,
- env,
- parallelism,
- operatorUidGenerator);
+ if (sourceDefs != null) {
+ for (SourceDef source : sourceDefs) {
+ String multipleSourceUniqueId =
+ source.getConfig()
+ .get(
+ ConfigOptions.key(MULTIPLE_SOURCE_UNIQUE_ID)
+ .stringType()
+ .noDefaultValue());
+ if (operatorUidPrefix == null || multipleSourceUniqueId == null) {
+ throw new FlinkRuntimeException(
+ "When there are multiple sources, the values of source option 'source.unique.id' and pipeline 'operator.uid.refix' cannot be empty!");
+ }
+ dataSource = sourceTranslator.createDataSource(source, pipelineDefConfig, env);
+ OperatorUidGenerator operatorUidGeneratorForMultipleSource =
+ new OperatorUidGenerator(operatorUidPrefix + multipleSourceUniqueId);
+ DataStream streamBranch =
+ sourceTranslator.translate(
+ source,
+ dataSource,
+ env,
+ parallelism,
+ operatorUidGeneratorForMultipleSource);
+ if (stream == null) {
+ stream = streamBranch;
+ } else {
+ stream = stream.union(streamBranch);
+ }
+ }
+ if (sourceDefs.size() > 1) {
+ isParallelMetadataSource = true;
+ } else {
+ isParallelMetadataSource = dataSource.isParallelMetadataSource();
+ }
+ } else {
+ dataSource =
+ sourceTranslator.createDataSource(
+ pipelineDef.getSource(), pipelineDefConfig, env);
+ stream =
+ sourceTranslator.translate(
+ pipelineDef.getSource(),
+ dataSource,
+ env,
+ parallelism,
+ operatorUidGenerator);
+ isParallelMetadataSource = dataSource.isParallelMetadataSource();
+ }
+ DataSink dataSink =
+ sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env);
// Source ---> PreTransform
stream =
diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkMultipleSourceComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkMultipleSourceComposerITCase.java
new file mode 100644
index 00000000000..40a07b7b6fc
--- /dev/null
+++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkMultipleSourceComposerITCase.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink;
+
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.definition.SourceDef;
+import org.apache.flink.cdc.connectors.values.ValuesDatabase;
+import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
+import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions;
+import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.EventSetId;
+import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration test for {@link FlinkPipelineComposer}. */
+class FlinkMultipleSourceComposerITCase {
+ private static final String MULTIPLE_SOURCE_UNIQUE_ID = "source.unique.id";
+ private static final int MAX_PARALLELISM = 4;
+ private static final String LINE_SEPARATOR = System.lineSeparator();
+ // Always use parent-first classloader for CDC classes.
+ // The reason is that ValuesDatabase uses static field for holding data, we need to make sure
+ // the class is loaded by AppClassloader so that we can verify data in the test case.
+ private static final org.apache.flink.configuration.Configuration MINI_CLUSTER_CONFIG =
+ new org.apache.flink.configuration.Configuration();
+
+ static {
+ MINI_CLUSTER_CONFIG.set(
+ ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
+ Collections.singletonList("org.apache.flink.cdc"));
+ }
+
+ /**
+ * Use {@link MiniClusterExtension} to reduce the overhead of restarting the MiniCluster for
+ * every test case.
+ */
+ @RegisterExtension
+ static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(MAX_PARALLELISM)
+ .setConfiguration(MINI_CLUSTER_CONFIG)
+ .build());
+
+ private final PrintStream standardOut = System.out;
+ private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream();
+
+ @BeforeEach
+ void init() {
+ // Take over STDOUT as we need to check the output of values sink
+ System.setOut(new PrintStream(outCaptor));
+ // Initialize in-memory database
+ }
+
+ @AfterEach
+ void cleanup() {
+ System.setOut(standardOut);
+ }
+
+ @Test
+ @EnumSource
+ void testSingleSplitMultipleSources() throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig1 = new Configuration();
+ sourceConfig1.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ EventSetId.SINGLE_SPLIT_MULTI_SOURCE_TABLE_FIRST);
+
+ sourceConfig1.set(
+ ConfigOptions.key(MULTIPLE_SOURCE_UNIQUE_ID).stringType().noDefaultValue(), "1");
+ Configuration sourceConfig2 = new Configuration();
+ sourceConfig2.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ EventSetId.SINGLE_SPLIT_MULTI_SOURCE_TABLE_SECOND);
+ sourceConfig2.set(
+ ConfigOptions.key(MULTIPLE_SOURCE_UNIQUE_ID).stringType().noDefaultValue(), "2");
+ SourceDef sourceDef1 =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source1", sourceConfig1);
+ SourceDef sourceDef2 =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source2", sourceConfig2);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.LENIENT);
+ pipelineConfig.set(PipelineOptions.PIPELINE_OPERATOR_UID_PREFIX, "multiple-source");
+ List sourceDefs = new ArrayList<>();
+ sourceDefs.add(sourceDef1);
+ sourceDefs.add(sourceDef2);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDefs,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+ List table1Results = ValuesDatabase.getAllResults();
+
+ assertThat(table1Results)
+ .containsExactly(
+ "default_namespace3.default_schema3.table3:col1=1;col2=1;col3=",
+ "default_namespace3.default_schema3.table3:col1=2;col2=2;col3=",
+ "default_namespace3.default_schema3.table3:col1=3;col2=3;col3=",
+ "default_namespace4.default_schema4.table4:col1=1;col2=1;col4=",
+ "default_namespace4.default_schema4.table4:col1=2;col2=2;col4=",
+ "default_namespace4.default_schema4.table4:col1=3;col2=3;col4=");
+
+ String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
+ assertThat(outputEvents)
+ .contains(
+ "CreateTableEvent{tableId=default_namespace3.default_schema3.table3, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
+ "DataChangeEvent{tableId=default_namespace3.default_schema3.table3, before=[], after=[1, 1, null], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace3.default_schema3.table3, before=[], after=[2, 2, null], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace3.default_schema3.table3, before=[], after=[3, 3, null], op=INSERT, meta=()}",
+ "AddColumnEvent{tableId=default_namespace3.default_schema3.table3, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
+ "CreateTableEvent{tableId=default_namespace4.default_schema4.table4, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
+ "DataChangeEvent{tableId=default_namespace4.default_schema4.table4, before=[], after=[1, 1, null], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace4.default_schema4.table4, before=[], after=[2, 2, null], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace4.default_schema4.table4, before=[], after=[3, 3, null], op=INSERT, meta=()}",
+ "AddColumnEvent{tableId=default_namespace4.default_schema4.table4, addedColumns=[ColumnWithPosition{column=`col4` STRING, position=LAST, existedColumnName=null}]}");
+ }
+
+ BinaryRecordData generate(Schema schema, Object... fields) {
+ return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
+ .generate(
+ Arrays.stream(fields)
+ .map(
+ e ->
+ (e instanceof String)
+ ? BinaryStringData.fromString((String) e)
+ : e)
+ .toArray());
+ }
+}
diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index 13cf54e7f13..c476fb2fc67 100644
--- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -1455,6 +1455,8 @@ void testMergingDecimalWithVariousPrecisions(ValuesDataSink.SinkApi sinkApi) thr
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
+ List sourceDefs = new ArrayList<>();
+ sourceDefs.add(sourceDef);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
index f6bb4f4295e..068a1f9ca68 100644
--- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
+++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
@@ -136,7 +136,6 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -194,7 +193,6 @@ void testSingleSplitSingleTableWithDefaultValue(ValuesDataSink.SinkApi sinkApi)
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -253,7 +251,6 @@ void testSingleSplitMultipleTables(ValuesDataSink.SinkApi sinkApi) throws Except
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -376,7 +373,6 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -437,7 +433,6 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -507,7 +502,6 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -562,7 +556,6 @@ void testOneToOneRouting() throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -635,7 +628,6 @@ void testIdenticalOneToOneRouting() throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -832,7 +824,6 @@ void testMergingWithRoute() throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -1040,7 +1031,6 @@ void testTransformMergingWithRoute() throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -1104,7 +1094,6 @@ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 10ea31e5315..055f47605f8 100644
--- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -2123,6 +2123,7 @@ void testTransformWithCommentsAndDefaultExpr() throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
index 9f512bcc40f..1adffd9f62a 100644
--- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
+++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
@@ -877,6 +877,7 @@ void testTransformWithModel(ValuesDataSink.SinkApi sinkApi, String language) thr
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
+ null,
sourceDef,
sinkDef,
Collections.emptyList(),
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMultipleSourceITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMultipleSourceITCase.java
new file mode 100644
index 00000000000..900752c203d
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMultipleSourceITCase.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.definition.SourceDef;
+import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
+import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.connectors.values.ValuesDatabase;
+import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
+import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
+import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT tests for {@link org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource}. */
+public class MySqlMultipleSourceITCase extends MySqlSourceTestBase {
+
+ private static final String MULTIPLE_SOURCE_UNIQUE_ID = "source.unique.id";
+ protected static final MySqlContainer MYSQL8_CONTAINER =
+ createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL8_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD);
+ private final UniqueDatabase fullTypesMySql57Database =
+ new UniqueDatabase(
+ MYSQL_CONTAINER,
+ "column_type_test",
+ MySqSourceTestUtils.TEST_USER,
+ MySqSourceTestUtils.TEST_PASSWORD);
+
+ private static final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+ private static final PrintStream standardOut = System.out;
+ private static final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream();
+
+ @BeforeAll
+ public static void startContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Containers are started.");
+ TestValuesTableFactory.clearAllData();
+ env.setParallelism(4);
+ env.enableCheckpointing(2000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ System.setOut(new PrintStream(outCaptor));
+ }
+
+ @AfterAll
+ public static void stopContainers() {
+ LOG.info("Stopping containers...");
+ MYSQL8_CONTAINER.stop();
+ MYSQL_CONTAINER.stop();
+ LOG.info("Containers are stopped.");
+ System.setOut(standardOut);
+ }
+
+ @Test
+ public void testSingleSplitMultipleSources() throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+ inventoryDatabase.createAndInitialize();
+ fullTypesMySql57Database.createAndInitialize();
+ // Take over STDOUT as we need to check the output of values sink
+
+ // Setup value source
+ Configuration sourceConfig1 = new Configuration();
+ sourceConfig1.set(MySqlDataSourceOptions.PORT, MYSQL_CONTAINER.getDatabasePort());
+ sourceConfig1.set(MySqlDataSourceOptions.HOSTNAME, MYSQL_CONTAINER.getHost());
+ sourceConfig1.set(MySqlDataSourceOptions.TABLES, "column_type_test_\\.*.precision_types");
+ sourceConfig1.set(MySqlDataSourceOptions.SERVER_ID, "52300");
+ sourceConfig1.set(MySqlDataSourceOptions.USERNAME, MYSQL_CONTAINER.getUsername());
+ sourceConfig1.set(MySqlDataSourceOptions.PASSWORD, MYSQL_CONTAINER.getPassword());
+ sourceConfig1.set(MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED, true);
+ sourceConfig1.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC");
+ sourceConfig1.set(MySqlDataSourceOptions.SCAN_STARTUP_MODE, "initial");
+ sourceConfig1.set(
+ ConfigOptions.key(MULTIPLE_SOURCE_UNIQUE_ID).stringType().noDefaultValue(), "1");
+ Configuration sourceConfig2 = new Configuration();
+ sourceConfig2.set(MySqlDataSourceOptions.PORT, MYSQL8_CONTAINER.getDatabasePort());
+ sourceConfig2.set(MySqlDataSourceOptions.HOSTNAME, MYSQL8_CONTAINER.getHost());
+ sourceConfig2.set(MySqlDataSourceOptions.TABLES, "inventory_\\.*.products");
+ sourceConfig2.set(MySqlDataSourceOptions.SERVER_ID, "52300");
+ sourceConfig2.set(MySqlDataSourceOptions.USERNAME, MYSQL8_CONTAINER.getUsername());
+ sourceConfig2.set(MySqlDataSourceOptions.PASSWORD, MYSQL8_CONTAINER.getPassword());
+ sourceConfig2.set(MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED, true);
+ sourceConfig2.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC");
+ sourceConfig2.set(MySqlDataSourceOptions.SCAN_STARTUP_MODE, "initial");
+ sourceConfig2.set(
+ ConfigOptions.key(MULTIPLE_SOURCE_UNIQUE_ID).stringType().noDefaultValue(), "2");
+ SourceDef sourceDef1 =
+ new SourceDef(MySqlDataSourceFactory.IDENTIFIER, "mysql Source1", sourceConfig1);
+ SourceDef sourceDef2 =
+ new SourceDef(MySqlDataSourceFactory.IDENTIFIER, "mysql Source2", sourceConfig2);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.LENIENT);
+ pipelineConfig.set(PipelineOptions.PIPELINE_OPERATOR_UID_PREFIX, "multiple-source");
+ List sourceDefs = new ArrayList<>();
+ sourceDefs.add(sourceDef1);
+ sourceDefs.add(sourceDef2);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDefs,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ Thread thread =
+ new Thread(
+ () -> {
+ try {
+ execution.execute();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ Thread.sleep(60000);
+ try (Connection connection = inventoryDatabase.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`products` ADD COLUMN `cols5` BIT NULL;",
+ inventoryDatabase.getDatabaseName()));
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ try (Connection connection = fullTypesMySql57Database.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`precision_types` ADD COLUMN `cols5` BIT NULL;",
+ fullTypesMySql57Database.getDatabaseName()));
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ Thread.sleep(60000);
+ List table1Results = ValuesDatabase.getAllResults();
+ Collections.sort(table1Results);
+ List outputEvents = Arrays.asList(outCaptor.toString().trim().split("\n"));
+ outputEvents =
+ outputEvents.stream().map(s -> s.replaceAll("\r", "")).collect(Collectors.toList());
+ Collections.sort(outputEvents);
+ assertThat(table1Results)
+ .containsExactly(
+ fullTypesMySql57Database.getDatabaseName()
+ + ".precision_types:id=1;decimal_c0=123.40;decimal_c1=1234.5000;decimal_c2=1234.5600;time_c=18:00;time_3_c=18:00:22.100;time_6_c=18:00:22.100;datetime_c=2020-07-17T18:00;datetime3_c=2020-07-17T18:00:22;datetime6_c=2020-07-17T18:00:22;timestamp_c=2020-07-17T18:00;timestamp3_c=2020-07-17T18:00:22;timestamp6_c=2020-07-17T18:00:22;float_c0=2.0;float_c1=3.0;float_c2=5.0;real_c0=7.0;real_c1=11.0;real_c2=13.0;double_c0=17.0;double_c1=19.0;double_c2=23.0;double_precision_c0=29.0;double_precision_c1=31.0;double_precision_c2=37.0;cols5=",
+ inventoryDatabase.getDatabaseName()
+ + ".products:id=101;name=scooter;description=Small 2-wheel scooter;weight=3.14;cols5=",
+ inventoryDatabase.getDatabaseName()
+ + ".products:id=102;name=car battery;description=12V car battery;weight=8.1;cols5=",
+ inventoryDatabase.getDatabaseName()
+ + ".products:id=103;name=12-pack drill bits;description=12-pack of drill bits with sizes ranging from #40 to #3;weight=0.8;cols5=",
+ inventoryDatabase.getDatabaseName()
+ + ".products:id=104;name=hammer;description=12oz carpenter's hammer;weight=0.75;cols5=",
+ inventoryDatabase.getDatabaseName()
+ + ".products:id=105;name=hammer;description=14oz carpenter's hammer;weight=0.875;cols5=",
+ inventoryDatabase.getDatabaseName()
+ + ".products:id=106;name=hammer;description=16oz carpenter's hammer;weight=1.0;cols5=",
+ inventoryDatabase.getDatabaseName()
+ + ".products:id=107;name=rocks;description=box of assorted rocks;weight=5.3;cols5=",
+ inventoryDatabase.getDatabaseName()
+ + ".products:id=108;name=jacket;description=water resistent black wind breaker;weight=0.1;cols5=",
+ inventoryDatabase.getDatabaseName()
+ + ".products:id=109;name=spare tire;description=24 inch spare tire;weight=22.2;cols5=");
+
+ assertThat(outputEvents)
+ .containsExactly(
+ "AddColumnEvent{tableId="
+ + fullTypesMySql57Database.getDatabaseName()
+ + ".precision_types, addedColumns=[ColumnWithPosition{column=`cols5` BOOLEAN, position=LAST, existedColumnName=null}]}",
+ "AddColumnEvent{tableId="
+ + inventoryDatabase.getDatabaseName()
+ + ".products, addedColumns=[ColumnWithPosition{column=`cols5` BOOLEAN, position=LAST, existedColumnName=null}]}",
+ "CreateTableEvent{tableId="
+ + fullTypesMySql57Database.getDatabaseName()
+ + ".precision_types, schema=columns={`id` DECIMAL(20, 0) NOT NULL,`decimal_c0` DECIMAL(6, 2),`decimal_c1` DECIMAL(9, 4),`decimal_c2` DECIMAL(20, 4),`time_c` TIME(0),`time_3_c` TIME(3),`time_6_c` TIME(6),`datetime_c` TIMESTAMP(0),`datetime3_c` TIMESTAMP(3),`datetime6_c` TIMESTAMP(6),`timestamp_c` TIMESTAMP_LTZ(0),`timestamp3_c` TIMESTAMP_LTZ(3),`timestamp6_c` TIMESTAMP_LTZ(6),`float_c0` DOUBLE,`float_c1` DOUBLE,`float_c2` DOUBLE,`real_c0` DOUBLE,`real_c1` DOUBLE,`real_c2` DOUBLE,`double_c0` DOUBLE,`double_c1` DOUBLE,`double_c2` DOUBLE,`double_precision_c0` DOUBLE,`double_precision_c1` DOUBLE,`double_precision_c2` DOUBLE}, primaryKeys=id, options=()}",
+ "CreateTableEvent{tableId="
+ + inventoryDatabase.getDatabaseName()
+ + ".products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}",
+ "DataChangeEvent{tableId="
+ + fullTypesMySql57Database.getDatabaseName()
+ + ".precision_types, before=[], after=[1, 123.40, 1234.5000, 1234.5600, 18:00, 18:00:22.100, 18:00:22.100, 2020-07-17T18:00, 2020-07-17T18:00:22, 2020-07-17T18:00:22, 2020-07-17T18:00, 2020-07-17T18:00:22, 2020-07-17T18:00:22, 2.0, 3.0, 5.0, 7.0, 11.0, 13.0, 17.0, 19.0, 23.0, 29.0, 31.0, 37.0], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId="
+ + inventoryDatabase.getDatabaseName()
+ + ".products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId="
+ + inventoryDatabase.getDatabaseName()
+ + ".products, before=[], after=[102, car battery, 12V car battery, 8.1], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId="
+ + inventoryDatabase.getDatabaseName()
+ + ".products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId="
+ + inventoryDatabase.getDatabaseName()
+ + ".products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId="
+ + inventoryDatabase.getDatabaseName()
+ + ".products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId="
+ + inventoryDatabase.getDatabaseName()
+ + ".products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId="
+ + inventoryDatabase.getDatabaseName()
+ + ".products, before=[], after=[107, rocks, box of assorted rocks, 5.3], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId="
+ + inventoryDatabase.getDatabaseName()
+ + ".products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId="
+ + inventoryDatabase.getDatabaseName()
+ + ".products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2], op=INSERT, meta=()}");
+
+ thread.interrupt();
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java
index bcb55094a42..61af4d1d2ac 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java
@@ -53,6 +53,8 @@ public enum EventSetId {
SINGLE_SPLIT_SINGLE_TABLE,
SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE,
SINGLE_SPLIT_MULTI_TABLES,
+ SINGLE_SPLIT_MULTI_SOURCE_TABLE_FIRST,
+ SINGLE_SPLIT_MULTI_SOURCE_TABLE_SECOND,
MULTI_SPLITS_SINGLE_TABLE,
CUSTOM_SOURCE_EVENTS,
TRANSFORM_TABLE,
@@ -83,11 +85,17 @@ public boolean isBatchEvent() {
public static final TableId TABLE_2 =
TableId.tableId("default_namespace", "default_schema", "table2");
+ public static final TableId TABLE_3 =
+ TableId.tableId("default_namespace3", "default_schema3", "table3");
+
+ public static final TableId TABLE_4 =
+ TableId.tableId("default_namespace4", "default_schema4", "table4");
+
/**
* create events of {@link DataChangeEvent} and {@link SchemaChangeEvent} for {@link
* ValuesDataSource}.
*/
- private static List> sourceEvents;
+ private static List> sourceEvents = new ArrayList<>();
public static List> getSourceEvents() {
if (sourceEvents == null) {
@@ -126,6 +134,16 @@ public static void setSourceEvents(EventSetId eventType) {
sourceEvents = singleSplitMultiTables();
break;
}
+ case SINGLE_SPLIT_MULTI_SOURCE_TABLE_FIRST:
+ {
+ sourceEvents.addAll(singleSplitMultiSourceTableFirst());
+ break;
+ }
+ case SINGLE_SPLIT_MULTI_SOURCE_TABLE_SECOND:
+ {
+ sourceEvents.addAll(singleSplitMultiSourceTableSecond());
+ break;
+ }
case MULTI_SPLITS_SINGLE_TABLE:
{
sourceEvents = multiSplitsSingleTable();
@@ -264,6 +282,122 @@ public static List> singleSplitSingleTable() {
return eventOfSplits;
}
+ public static List> singleSplitMultiSourceTableFirst() {
+ List> eventOfSplits = new ArrayList<>();
+ List split = new ArrayList<>();
+
+ // create table
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col2", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ CreateTableEvent createTableEvent2 = new CreateTableEvent(TABLE_3, schema);
+ split.add(createTableEvent2);
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
+
+ // insert into table2
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ TABLE_3,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("1")
+ }));
+ split.add(insertEvent1);
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ TABLE_3,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("2"),
+ BinaryStringData.fromString("2")
+ }));
+ split.add(insertEvent2);
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ TABLE_3,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("3"),
+ BinaryStringData.fromString("3")
+ }));
+ split.add(insertEvent3);
+ AddColumnEvent.ColumnWithPosition columnWithPosition =
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("col3", DataTypes.STRING()));
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(TABLE_3, Collections.singletonList(columnWithPosition));
+ split.add(addColumnEvent);
+ DropColumnEvent dropColumnEvent =
+ new DropColumnEvent(TABLE_3, Collections.singletonList("col3"));
+ split.add(dropColumnEvent);
+ eventOfSplits.add(split);
+ return eventOfSplits;
+ }
+
+ public static List> singleSplitMultiSourceTableSecond() {
+ List> eventOfSplits = new ArrayList<>();
+ List split = new ArrayList<>();
+
+ // create table
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col2", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ CreateTableEvent createTableEvent2 = new CreateTableEvent(TABLE_4, schema);
+ split.add(createTableEvent2);
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
+
+ // insert into table2
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ TABLE_4,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("1")
+ }));
+ split.add(insertEvent1);
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ TABLE_4,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("2"),
+ BinaryStringData.fromString("2")
+ }));
+ split.add(insertEvent2);
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ TABLE_4,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("3"),
+ BinaryStringData.fromString("3")
+ }));
+ split.add(insertEvent3);
+ AddColumnEvent.ColumnWithPosition columnWithPosition =
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("col4", DataTypes.STRING()));
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(TABLE_4, Collections.singletonList(columnWithPosition));
+ split.add(addColumnEvent);
+ DropColumnEvent dropColumnEvent =
+ new DropColumnEvent(TABLE_4, Collections.singletonList("col4"));
+ split.add(dropColumnEvent);
+ eventOfSplits.add(split);
+ return eventOfSplits;
+ }
+
public static List> singleSplitSingleTableWithDefaultValue() {
List> eventOfSplits = singleSplitSingleTable();
// add column with default value
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
index 53344dc87ff..8ba3f34ec80 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
@@ -473,6 +473,10 @@ private boolean shouldIgnoreException(Throwable throwable) {
*/
private void consumeUnusedSchemaDerivationBytes(DataInputStream in) throws IOException {
TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
+ // This scenario is due to the reduction of sources
+ if (in.available() < 4) {
+ return;
+ }
int derivationMappingSize = in.readInt();
Map> derivationMapping = new HashMap<>(derivationMappingSize);
for (int i = 0; i < derivationMappingSize; i++) {