diff --git a/docs/en/concept/config.md b/docs/en/concept/config.md index a3f4094622b..7b35435c6ef 100644 --- a/docs/en/concept/config.md +++ b/docs/en/concept/config.md @@ -335,6 +335,91 @@ sink { - For dynamic parameters, you can use the following format: `-i date=$(date +"%Y%m%d")`. - Cannot use specified system reserved characters; they will not be replaced by `-i`, such as: `${database_name}`, `${schema_name}`, `${table_name}`, `${schema_full_name}`, `${table_full_name}`, `${primary_key}`, `${unique_key}`, `${field_names}`. For details, please refer to [Sink Parameter Placeholders](sink-options-placeholders.md). + +## Configuration References + +In the configuration file, we can define some common configuration information and directly reference it in the Job configuration, thereby reusing the same configuration. This is commonly applied to the reuse of connection information and the independence of variable configuration files for development and production environments, greatly improving the maintainability of the configuration. + +### reference examples + +ref.conf +```hocon +# 定义 MySQL 连接配置 +mysql_prod { + url = "jdbc:mysql://192.168.1.19:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + connection_check_timeout_sec = 100 + user = "root" + password = "111111" + fetch_size = 10000 + query="select * from not_exist" +} + +# 定义 Kafka 连接配置 +kafka_test { + bootstrap.servers = "kafka-test:9092" + topic = "test_topic" +} +``` + +mysql_to_console_by_ref.conf +```hocon +env { + parallelism = 1 + job.mode = "BATCH" + __st_config_ref_path__ = "D:/MyWorld/05Projects/java/contribute/seatunnel/seatunnel-examples/seatunnel-engine-examples/target/classes/examples/ref.conf" +} + +source { + Jdbc { + __st_config_ref_key__ = "mysql_prod" + query="select * from department" + } +} + +transform { +} + +sink { + console { + } +} +``` +If a parameter is defined in both `plugin` configuration and the `ref` configuration, the priority is:`plugin > ref`。 + +Therefore, the `query` parameter in the plugin will be merged to `query="select * from department"`。 + +And then the final submitted configuration is: +```hocon +env { + parallelism = 1 + job.mode = "BATCH" + __st_config_ref_path__ = "examples/ref.conf" +} + +source { + Jdbc { + url = "jdbc:mysql://192.168.1.19:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + connection_check_timeout_sec = 100 + user = "root" + password = "111111" + fetch_size = 10000 + query="select * from department" + } +} + +transform { +} + +sink { + console { + } +} +``` + + + ## What's More - Start write your own config file now, choose the [connector](../connector-v2/source) you want to use, and configure the parameters according to the connector's documentation. diff --git a/docs/zh/concept/JobEnvConfig.md b/docs/zh/concept/JobEnvConfig.md index 4dddb6e94a0..f6968b767f1 100644 --- a/docs/zh/concept/JobEnvConfig.md +++ b/docs/zh/concept/JobEnvConfig.md @@ -56,6 +56,12 @@ 当值为`CLIENT`时,SaveMode操作在作业提交的过程中执行,使用shell脚本提交作业时,该过程在提交作业的shell进程中执行。使用rest api提交作业时,该过程在http请求的处理线程中执行。 请尽量使用`CLUSTER`模式,因为当`CLUSTER`模式没有问题时,我们将删除`CLIENT`模式。 +### __st_config_ref_path__ + +此参数用于复用参数配置,通过Ref指定通用的配置文件。 +当设置该参数后,就可以在job config中的source、transform、sink中使用`__st_config_ref_key__`指定复用的配置信息,并合并至Job配置信息。 + + ## Flink 引擎参数 这里列出了一些与 Flink 中名称相对应的 SeaTunnel 参数名称,并非全部,更多内容请参考官方 [Flink Documentation](https://flink.apache.org/) for more. diff --git a/docs/zh/concept/config.md b/docs/zh/concept/config.md index a81142bcc77..bd51e9bc910 100644 --- a/docs/zh/concept/config.md +++ b/docs/zh/concept/config.md @@ -322,7 +322,88 @@ sink { - 值不能包含空格`' '`。例如, `-i jobName='this is a job name'`将被替换为`job.name = "this"`。 你可以使用环境变量传递带有空格的值。 - 如果要使用动态参数,可以使用以下格式: `-i date=$(date +"%Y%m%d")`。 - 不能使用指定系统保留字符,它将不会被`-i`替换,如:`${database_name}`、`${schema_name}`、`${table_name}`、`${schema_full_name}`、`${table_full_name}`、`${primary_key}`、`${unique_key}`、`${field_names}`。具体可参考[Sink参数占位符](sink-options-placeholders.md) + +## 配置引用 + +在配置文件中,我们可以定义一些通用的配置信息,在Job配置中直接引用,从而复用相同的配置。 +常见应用于连接信息复用,以及开发和生产环境的变量配置文件独立,极大提高配置维护性。 + +具体样例: +ref.conf +```hocon +# 定义 MySQL 连接配置 +mysql_prod { + url = "jdbc:mysql://192.168.1.19:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + connection_check_timeout_sec = 100 + user = "root" + password = "111111" + fetch_size = 10000 + query="select * from not_exist" +} + +# 定义 Kafka 连接配置 +kafka_test { + bootstrap.servers = "kafka-test:9092" + topic = "test_topic" +} +``` + +mysql_to_console_by_ref.conf +```hocon +env { + parallelism = 1 + job.mode = "BATCH" + __st_config_ref_path__ = "D:/MyWorld/05Projects/java/contribute/seatunnel/seatunnel-examples/seatunnel-engine-examples/target/classes/examples/ref.conf" +} + +source { + Jdbc { + __st_config_ref_key__ = "mysql_prod" + query="select * from department" + } +} + +transform { +} + +sink { + console { + } +} +``` +如果在`plugin`配置和`ref`配置中均定义了同名的配置项,优先级为:`plugin配置 > ref配置`。 + +故在配置合并时,将使用plugin中的query配置`query="select * from department"`。 +然后最终提交的配置是: +```hocon +env { + parallelism = 1 + job.mode = "BATCH" + __st_config_ref_path__ = "examples/ref.conf" +} + +source { + Jdbc { + url = "jdbc:mysql://192.168.1.19:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + connection_check_timeout_sec = 100 + user = "root" + password = "111111" + fetch_size = 10000 + query="select * from department" + } +} + +transform { +} + +sink { + console { + } +} +``` + ## 此外 如果你想了解更多关于格式配置的详细信息,请查看 [HOCON](https://github.com/lightbend/config/blob/main/HOCON.md)。 - diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java index 663727caf82..fb6626b222f 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java @@ -107,4 +107,10 @@ public class EnvCommonOptions { .mapType() .noDefaultValue() .withDescription("Define the worker where the job runs by tag"); + + public static Option REF_PATH = + Options.key("__st_config_ref_path__") + .stringType() + .defaultValue(null) + .withDescription("Define the ref config file path"); } diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java index 27a6e753c99..3b6e6f46a1e 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java @@ -173,29 +173,50 @@ private static Config processConfig( String jsonString = config.root().render(ConfigRenderOptions.concise()); ObjectNode jsonNodes = JsonUtils.parseObject(jsonString); Map configMap = JsonUtils.toMap(jsonNodes); - List> sources = - (ArrayList>) configMap.get(Constants.SOURCE); - List> sinks = - (ArrayList>) configMap.get(Constants.SINK); - Preconditions.checkArgument( - !sources.isEmpty(), "Miss config! Please check the config file."); - Preconditions.checkArgument( - !sinks.isEmpty(), "Miss config! Please check the config file."); - sources.forEach( - source -> { - for (String sensitiveOption : sensitiveOptions) { - source.computeIfPresent(sensitiveOption, processFunction); - } - }); - sinks.forEach( - sink -> { - for (String sensitiveOption : sensitiveOptions) { - sink.computeIfPresent(sensitiveOption, processFunction); - } - }); - configMap.put(Constants.SOURCE, sources); - configMap.put(Constants.SINK, sinks); - return ConfigFactory.parseMap(configMap); + if (configMap.containsKey(Constants.SOURCE)) { + + List> sources = + (ArrayList>) configMap.get(Constants.SOURCE); + List> sinks = + (ArrayList>) configMap.get(Constants.SINK); + Preconditions.checkArgument( + !sources.isEmpty(), "Miss config! Please check the config file."); + Preconditions.checkArgument( + !sinks.isEmpty(), "Miss config! Please check the config file."); + sources.forEach( + source -> { + for (String sensitiveOption : sensitiveOptions) { + source.computeIfPresent(sensitiveOption, processFunction); + } + }); + sinks.forEach( + sink -> { + for (String sensitiveOption : sensitiveOptions) { + sink.computeIfPresent(sensitiveOption, processFunction); + } + }); + configMap.put(Constants.SOURCE, sources); + configMap.put(Constants.SINK, sinks); + return ConfigFactory.parseMap(configMap); + } else { + Map> refMap = new HashMap<>(); + // get map element in ref + for (String key : configMap.keySet()) { + Object ref_config = configMap.get(key); + if (ref_config instanceof Map) { + Map refDict = (Map) ref_config; + refMap.put(key, refDict); + } + } + refMap.forEach( + (refId, RefConfig) -> { + Map ref_dit = new HashMap<>(RefConfig.size()); + for (String sensitiveOption : sensitiveOptions) { + ref_dit.computeIfPresent(sensitiveOption, processFunction); + } + }); + return ConfigFactory.parseMap(refMap); + } } public static Set getSensitiveOptions(Config config) { diff --git a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java index 917bc54f242..266920309a2 100644 --- a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java +++ b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java @@ -82,6 +82,28 @@ public void testParseConfig() throws URISyntaxException { config.getConfigList("source").get(0).getString("secret_key"), SECRET_KEY); } + @Test + public void testParseRefConfig() throws URISyntaxException { + URL resource = ConfigShadeTest.class.getResource("/ref.conf"); + Assertions.assertNotNull(resource); + Config refConfigs = ConfigBuilder.of(Paths.get(resource.toURI())); + // Assertions.assertEquals(config.); + Assertions.assertEquals( + refConfigs.getConfig("mysql_prod").getString("query"), "select * from not_exist"); + Assertions.assertEquals(refConfigs.getConfig("mysql_prod").getString("password"), "111111"); + + URL jobConfigUrl = ConfigShadeTest.class.getResource("/mysql_to_console_by_ref.conf"); + Config jobConfig = ConfigBuilder.of(Paths.get(jobConfigUrl.toURI())); + + Config mysqlPluginConfig = jobConfig.getConfigList("source").get(0); + String refId = mysqlPluginConfig.getString("__st_config_ref_key__ "); + Config refConfig = refConfigs.getConfig(refId); + Config renderConfig = + mysqlPluginConfig.withoutPath("__st_config_ref_key__").withFallback(refConfig); + Assertions.assertEquals(renderConfig.getString("query"), "select * from department"); + Assertions.assertEquals(renderConfig.getString("password"), "111111"); + } + @Test public void testUsePrivacyHandlerHocon() throws URISyntaxException { URL resource = ConfigShadeTest.class.getResource("/config.shade.conf"); diff --git a/seatunnel-core/seatunnel-core-starter/src/test/resources/mysql_to_console_by_ref.conf b/seatunnel-core/seatunnel-core-starter/src/test/resources/mysql_to_console_by_ref.conf new file mode 100644 index 00000000000..ff973bc2ded --- /dev/null +++ b/seatunnel-core/seatunnel-core-starter/src/test/resources/mysql_to_console_by_ref.conf @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + __st_config_ref_path__ = "./common/ref.conf" +} + +source { + Jdbc { + __st_config_ref_key__ = "mysql_prod" + query="select * from department" + } +} + +transform { +} + +sink { + console { + } +} \ No newline at end of file diff --git a/seatunnel-core/seatunnel-core-starter/src/test/resources/ref.conf b/seatunnel-core/seatunnel-core-starter/src/test/resources/ref.conf new file mode 100644 index 00000000000..81a8814a480 --- /dev/null +++ b/seatunnel-core/seatunnel-core-starter/src/test/resources/ref.conf @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# 定义 MySQL 连接配置 +mysql_prod { + url = "jdbc:mysql://192.168.1.19:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + connection_check_timeout_sec = 100 + user = "root" + password = "111111" + fetch_size = 10000 + query="select * from not_exist" +} + +# 定义 Kafka 连接配置 +kafka_test { + bootstrap.servers = "kafka-test:9092" + topic = "test_topic" +} + +sqlite_test { + url = "jdbc:sqlite:D:/MyWorld/05Projects/java/Seatunnel/SeaTunnel/db/test.db" + driver = "org.sqlite.JDBC" +} + diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java index f2e5036d549..fb1fbc6ebd0 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java @@ -72,6 +72,27 @@ public void testSimpleJobParse() { Assertions.assertEquals(3, actions.get(0).getParallelism()); } + @Test + public void testRefJobParse() { + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("/fake_to_console_by_ref.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setJobContext(new JobContext()); + MultipleTableJobConfigParser jobConfigParser = + new MultipleTableJobConfigParser(filePath, new IdGenerator(), jobConfig); + ImmutablePair, Set> parse = jobConfigParser.parse(null); + List actions = parse.getLeft(); + Assertions.assertEquals(1, actions.size()); + Assertions.assertEquals("Sink[0]-console-MultiTableSink", actions.get(0).getName()); + Assertions.assertEquals(1, actions.get(0).getUpstream().size()); + Assertions.assertEquals( + "Source[0]-FakeSource", actions.get(0).getUpstream().get(0).getName()); + + Assertions.assertFalse(jobConfig.getJobContext().isEnableCheckpoint()); + Assertions.assertEquals(100, actions.get(0).getUpstream().get(0).getParallelism()); + Assertions.assertEquals(100, actions.get(0).getParallelism()); + } + @Test public void testComplexJobParse() { Common.setDeployMode(DeployMode.CLIENT); diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fake_to_console_by_ref.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fake_to_console_by_ref.conf new file mode 100644 index 00000000000..22681995b0f --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fake_to_console_by_ref.conf @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + __st_config_ref_path__ = "./ref.conf" +} + +source { + FakeSource { + __st_config_ref_key__ = "fake_conn" + parallelism = 100 + bytes.length = 5 + } +} + +transform { +} + +sink { + console { + } +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/ref.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/ref.conf new file mode 100644 index 00000000000..d66c5610c80 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/ref.conf @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# 定义 MySQL 连接配置 +mysql_prod { + url = "jdbc:mysql://192.168.1.19:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + connection_check_timeout_sec = 100 + user = "root" + password = "111111" + fetch_size = 10000 + query="select * from not_exist" +} + +# 定义 Kafka 连接配置 +kafka_test { + bootstrap.servers = "kafka-test:9092" + topic = "test_topic" +} + + +fake_conn { + parallelism = 1 + schema = { + fields { + name = "string" + age = "int" + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index e3414b8dbdb..5417fcbf90e 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -107,6 +107,7 @@ public class MultipleTableJobConfigParser { private final List commonPluginJars; private final Config seaTunnelJobConfig; + private final Config refMaps; private final ReadonlyConfig envOptions; @@ -159,6 +160,24 @@ public MultipleTableJobConfigParser( this.isStartWithSavePoint = isStartWithSavePoint; this.seaTunnelJobConfig = ConfigBuilder.of(Paths.get(jobDefineFilePath), variables); this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); + + String RefPath = envOptions.getOptional(EnvCommonOptions.REF_PATH).orElse(null); + if (null != RefPath) { + if (!Paths.get(RefPath).isAbsolute()) { + RefPath = + Paths.get( + Paths.get(jobDefineFilePath) + .toAbsolutePath() + .getParent() + .toString(), + RefPath) + .toAbsolutePath() + .toString(); + } + this.refMaps = ConfigBuilder.of(Paths.get(RefPath), variables); + } else { + this.refMaps = null; + } this.pipelineCheckpoints = pipelineCheckpoints; } @@ -175,21 +194,54 @@ public MultipleTableJobConfigParser( this.isStartWithSavePoint = isStartWithSavePoint; this.seaTunnelJobConfig = seaTunnelJobConfig; this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); + + String RefPath = envOptions.getOptional(EnvCommonOptions.REF_PATH).orElse(null); + if (null != RefPath) { + this.refMaps = ConfigBuilder.of(Paths.get(RefPath), null); + } else { + this.refMaps = null; + } + this.pipelineCheckpoints = pipelineCheckpoints; } + private Config mergeWithRefConfig(Config pluginConfig, Config refConfigs) { + if (pluginConfig.hasPath("__st_config_ref_key__")) { + String refId = pluginConfig.getString("__st_config_ref_key__"); + if (null != refConfigs) { + Config refConfig = refConfigs.getConfig(refId); + if (refConfig == null) { + throw new IllegalArgumentException("ref ID not found: " + refId); + } + return pluginConfig.withoutPath("__st_config_ref_key__").withFallback(refConfig); + } else { + return pluginConfig; + } + } else { + return pluginConfig; + } + } + public ImmutablePair, Set> parse(ClassLoaderService classLoaderService) { this.fillJobConfigAndCommonJars(); - List sourceConfigs = + ArrayList sourceConfigs = new ArrayList<>(); + for (Config source : TypesafeConfigUtils.getConfigList( - seaTunnelJobConfig, "source", Collections.emptyList()); - List transformConfigs = + seaTunnelJobConfig, "source", Collections.emptyList())) { + sourceConfigs.add(this.mergeWithRefConfig(source, this.refMaps)); + } + ArrayList transformConfigs = new ArrayList<>(); + for (Config transform : TypesafeConfigUtils.getConfigList( - seaTunnelJobConfig, "transform", Collections.emptyList()); - List sinkConfigs = + seaTunnelJobConfig, "transform", Collections.emptyList())) { + transformConfigs.add(this.mergeWithRefConfig(transform, this.refMaps)); + } + ArrayList sinkConfigs = new ArrayList<>(); + for (Config sink : TypesafeConfigUtils.getConfigList( - seaTunnelJobConfig, "sink", Collections.emptyList()); - + seaTunnelJobConfig, "sink", Collections.emptyList())) { + sinkConfigs.add(this.mergeWithRefConfig(sink, this.refMaps)); + } List sourceConnectorJars = getConnectorJarList(sourceConfigs, PluginType.SOURCE); List transformConnectorJars = getConnectorJarList(transformConfigs, PluginType.TRANSFORM);