From 8294c05ea92c92e8fa90228b1402634184a49538 Mon Sep 17 00:00:00 2001 From: thirsd Date: Sun, 16 Mar 2025 10:40:15 +0800 Subject: [PATCH 1/8] support ref_config. in env option, support __st_config_ref_path__ to set path of ref_config, then you can use __st_config_ref_key__ in source\transform\sink to merge ref config. --- .../api/options/EnvCommonOptions.java | 6 ++ .../core/starter/utils/ConfigShadeUtils.java | 62 +++++++++++++------ .../parse/MultipleTableJobConfigParser.java | 58 ++++++++++++++--- 3 files changed, 99 insertions(+), 27 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java index 663727caf82..fb6626b222f 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java @@ -107,4 +107,10 @@ public class EnvCommonOptions { .mapType() .noDefaultValue() .withDescription("Define the worker where the job runs by tag"); + + public static Option REF_PATH = + Options.key("__st_config_ref_path__") + .stringType() + .defaultValue(null) + .withDescription("Define the ref config file path"); } diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java index 27a6e753c99..f637510c9bf 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java @@ -173,29 +173,51 @@ private static Config processConfig( String jsonString = config.root().render(ConfigRenderOptions.concise()); ObjectNode jsonNodes = JsonUtils.parseObject(jsonString); Map configMap = JsonUtils.toMap(jsonNodes); - List> sources = - (ArrayList>) configMap.get(Constants.SOURCE); - List> sinks = - (ArrayList>) configMap.get(Constants.SINK); - Preconditions.checkArgument( - !sources.isEmpty(), "Miss config! Please check the config file."); - Preconditions.checkArgument( - !sinks.isEmpty(), "Miss config! Please check the config file."); - sources.forEach( - source -> { - for (String sensitiveOption : sensitiveOptions) { - source.computeIfPresent(sensitiveOption, processFunction); - } - }); - sinks.forEach( - sink -> { + if (configMap.containsKey(Constants.SOURCE)) { + + List> sources = + (ArrayList>) configMap.get(Constants.SOURCE); + List> sinks = + (ArrayList>) configMap.get(Constants.SINK); + Preconditions.checkArgument( + !sources.isEmpty(), "Miss config! Please check the config file."); + Preconditions.checkArgument( + !sinks.isEmpty(), "Miss config! Please check the config file."); + sources.forEach( + source -> { + for (String sensitiveOption : sensitiveOptions) { + source.computeIfPresent(sensitiveOption, processFunction); + } + }); + sinks.forEach( + sink -> { + for (String sensitiveOption : sensitiveOptions) { + sink.computeIfPresent(sensitiveOption, processFunction); + } + }); + configMap.put(Constants.SOURCE, sources); + configMap.put(Constants.SINK, sinks); + return ConfigFactory.parseMap(configMap); + }else{ + Map> refMap = new HashMap<>(); + // get map element in ref + for (String key : configMap.keySet()) { + Object ref_config = configMap.get(key); + if (ref_config instanceof Map) { + // 2. 安全转换为 Map(泛型擦除后需处理警告) + Map refDict = (Map) ref_config; + refMap.put(key, refDict); + } + } + refMap.forEach( + (refId, RefConfig) -> { + Map ref_dit = new HashMap<>(RefConfig.size()); for (String sensitiveOption : sensitiveOptions) { - sink.computeIfPresent(sensitiveOption, processFunction); + ref_dit.computeIfPresent(sensitiveOption, processFunction); } }); - configMap.put(Constants.SOURCE, sources); - configMap.put(Constants.SINK, sinks); - return ConfigFactory.parseMap(configMap); + return ConfigFactory.parseMap(refMap); + } } public static Set getSensitiveOptions(Config config) { diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index e3414b8dbdb..dc01806cd36 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -107,6 +107,7 @@ public class MultipleTableJobConfigParser { private final List commonPluginJars; private final Config seaTunnelJobConfig; + private final Config refMaps; private final ReadonlyConfig envOptions; @@ -159,6 +160,15 @@ public MultipleTableJobConfigParser( this.isStartWithSavePoint = isStartWithSavePoint; this.seaTunnelJobConfig = ConfigBuilder.of(Paths.get(jobDefineFilePath), variables); this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); + if (null != envOptions.getOptional(EnvCommonOptions.REF_PATH).orElse(null)) { + this.refMaps = + ConfigBuilder.of(Paths.get( + envOptions.getOptional(EnvCommonOptions.REF_PATH) + .orElseThrow(() -> new IllegalStateException("path of ref config is unset")) + ), variables); + } else { + this.refMaps = null; + } this.pipelineCheckpoints = pipelineCheckpoints; } @@ -175,21 +185,55 @@ public MultipleTableJobConfigParser( this.isStartWithSavePoint = isStartWithSavePoint; this.seaTunnelJobConfig = seaTunnelJobConfig; this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); + if (null != envOptions.getOptional(EnvCommonOptions.REF_PATH).orElse(null)) { + this.refMaps = + ConfigBuilder.of(Paths.get( + envOptions.getOptional(EnvCommonOptions.REF_PATH) + .orElseThrow(() -> new IllegalStateException("path of ref config is unset")) + ), null); + } else { + this.refMaps = null; + } this.pipelineCheckpoints = pipelineCheckpoints; } + private Config mergeWithRefConfig(Config pluginConfig, Config refConfigs) { + if (pluginConfig.hasPath("__st_config_ref_key__ ")) { + String refId = pluginConfig.getString("__st_config_ref_key__ "); + if (null != refConfigs) { + Config refConfig = refConfigs.getConfig(refId); + if (refConfig == null) { + throw new IllegalArgumentException("ref ID not found: " + refId); + } + return pluginConfig.withoutPath("__st_config_ref_key__").withFallback(refConfig); + } else { + return pluginConfig; + } + } else { + return pluginConfig; + } + } + public ImmutablePair, Set> parse(ClassLoaderService classLoaderService) { this.fillJobConfigAndCommonJars(); - List sourceConfigs = + ArrayList sourceConfigs = new ArrayList<>(); + for (Config source : TypesafeConfigUtils.getConfigList( - seaTunnelJobConfig, "source", Collections.emptyList()); - List transformConfigs = + seaTunnelJobConfig, "source", Collections.emptyList())) { + sourceConfigs.add(this.mergeWithRefConfig(source, this.refMaps)); + } + ArrayList transformConfigs = new ArrayList<>(); + for (Config transform : TypesafeConfigUtils.getConfigList( - seaTunnelJobConfig, "transform", Collections.emptyList()); - List sinkConfigs = + seaTunnelJobConfig, "transform", Collections.emptyList())) { + transformConfigs.add(this.mergeWithRefConfig(transform, this.refMaps)); + } + ArrayList sinkConfigs = new ArrayList<>(); + for (Config sink : TypesafeConfigUtils.getConfigList( - seaTunnelJobConfig, "sink", Collections.emptyList()); - + seaTunnelJobConfig, "sink", Collections.emptyList())) { + sinkConfigs.add(this.mergeWithRefConfig(sink, this.refMaps)); + } List sourceConnectorJars = getConnectorJarList(sourceConfigs, PluginType.SOURCE); List transformConnectorJars = getConnectorJarList(transformConfigs, PluginType.TRANSFORM); From ed29abfa47666176db604937851cfdecd82d989b Mon Sep 17 00:00:00 2001 From: thirsd Date: Sun, 16 Mar 2025 10:42:56 +0800 Subject: [PATCH 2/8] Code Style Format --- .../core/starter/utils/ConfigShadeUtils.java | 14 +++++----- .../parse/MultipleTableJobConfigParser.java | 26 +++++++++++++------ 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java index f637510c9bf..b7320cf4f13 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java @@ -198,7 +198,7 @@ private static Config processConfig( configMap.put(Constants.SOURCE, sources); configMap.put(Constants.SINK, sinks); return ConfigFactory.parseMap(configMap); - }else{ + } else { Map> refMap = new HashMap<>(); // get map element in ref for (String key : configMap.keySet()) { @@ -210,12 +210,12 @@ private static Config processConfig( } } refMap.forEach( - (refId, RefConfig) -> { - Map ref_dit = new HashMap<>(RefConfig.size()); - for (String sensitiveOption : sensitiveOptions) { - ref_dit.computeIfPresent(sensitiveOption, processFunction); - } - }); + (refId, RefConfig) -> { + Map ref_dit = new HashMap<>(RefConfig.size()); + for (String sensitiveOption : sensitiveOptions) { + ref_dit.computeIfPresent(sensitiveOption, processFunction); + } + }); return ConfigFactory.parseMap(refMap); } } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index dc01806cd36..cce3b8a895d 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -162,10 +162,15 @@ public MultipleTableJobConfigParser( this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); if (null != envOptions.getOptional(EnvCommonOptions.REF_PATH).orElse(null)) { this.refMaps = - ConfigBuilder.of(Paths.get( - envOptions.getOptional(EnvCommonOptions.REF_PATH) - .orElseThrow(() -> new IllegalStateException("path of ref config is unset")) - ), variables); + ConfigBuilder.of( + Paths.get( + envOptions + .getOptional(EnvCommonOptions.REF_PATH) + .orElseThrow( + () -> + new IllegalStateException( + "path of ref config is unset"))), + variables); } else { this.refMaps = null; } @@ -187,10 +192,15 @@ public MultipleTableJobConfigParser( this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); if (null != envOptions.getOptional(EnvCommonOptions.REF_PATH).orElse(null)) { this.refMaps = - ConfigBuilder.of(Paths.get( - envOptions.getOptional(EnvCommonOptions.REF_PATH) - .orElseThrow(() -> new IllegalStateException("path of ref config is unset")) - ), null); + ConfigBuilder.of( + Paths.get( + envOptions + .getOptional(EnvCommonOptions.REF_PATH) + .orElseThrow( + () -> + new IllegalStateException( + "path of ref config is unset"))), + null); } else { this.refMaps = null; } From 8aefcf6c3f7694b26f5b50002e41bb33ff4f6b47 Mon Sep 17 00:00:00 2001 From: thirsd Date: Sun, 16 Mar 2025 11:01:40 +0800 Subject: [PATCH 3/8] add doc --- docs/zh/concept/JobEnvConfig.md | 6 +++ docs/zh/concept/config.md | 80 ++++++++++++++++++++++++++++++++- 2 files changed, 85 insertions(+), 1 deletion(-) diff --git a/docs/zh/concept/JobEnvConfig.md b/docs/zh/concept/JobEnvConfig.md index 4dddb6e94a0..f6968b767f1 100644 --- a/docs/zh/concept/JobEnvConfig.md +++ b/docs/zh/concept/JobEnvConfig.md @@ -56,6 +56,12 @@ 当值为`CLIENT`时,SaveMode操作在作业提交的过程中执行,使用shell脚本提交作业时,该过程在提交作业的shell进程中执行。使用rest api提交作业时,该过程在http请求的处理线程中执行。 请尽量使用`CLUSTER`模式,因为当`CLUSTER`模式没有问题时,我们将删除`CLIENT`模式。 +### __st_config_ref_path__ + +此参数用于复用参数配置,通过Ref指定通用的配置文件。 +当设置该参数后,就可以在job config中的source、transform、sink中使用`__st_config_ref_key__`指定复用的配置信息,并合并至Job配置信息。 + + ## Flink 引擎参数 这里列出了一些与 Flink 中名称相对应的 SeaTunnel 参数名称,并非全部,更多内容请参考官方 [Flink Documentation](https://flink.apache.org/) for more. diff --git a/docs/zh/concept/config.md b/docs/zh/concept/config.md index a81142bcc77..64964368910 100644 --- a/docs/zh/concept/config.md +++ b/docs/zh/concept/config.md @@ -322,7 +322,85 @@ sink { - 值不能包含空格`' '`。例如, `-i jobName='this is a job name'`将被替换为`job.name = "this"`。 你可以使用环境变量传递带有空格的值。 - 如果要使用动态参数,可以使用以下格式: `-i date=$(date +"%Y%m%d")`。 - 不能使用指定系统保留字符,它将不会被`-i`替换,如:`${database_name}`、`${schema_name}`、`${table_name}`、`${schema_full_name}`、`${table_full_name}`、`${primary_key}`、`${unique_key}`、`${field_names}`。具体可参考[Sink参数占位符](sink-options-placeholders.md) + +## 配置引用 + +在配置文件中,我们可以定义一些通用的配置信息,在Job配置中直接引用,从而复用相同的配置。 +常见应用于连接信息复用,以及开发和生产环境的变量配置文件独立,极大提高配置维护性。 + +具体样例: +ref.conf +```hocon +# 定义 MySQL 连接配置 +mysql_prod { + url = "jdbc:mysql://192.168.1.19:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + connection_check_timeout_sec = 100 + user = "root" + password = "111111" + fetch_size = 10000 +} + +# 定义 Kafka 连接配置 +kafka_test { + bootstrap.servers = "kafka-test:9092" + topic = "test_topic" +} +``` + +mysql_to_console_by_ref.conf +```hocon +env { + parallelism = 1 + job.mode = "BATCH" + __st_config_ref_path__ = "D:/MyWorld/05Projects/java/contribute/seatunnel/seatunnel-examples/seatunnel-engine-examples/target/classes/examples/ref.conf" +} + +source { + Jdbc { + __st_config_ref_key__ = "mysql_prod" + query="select * from department" + } +} + +transform { +} + +sink { + console { + } +} +``` + +然后最终提交的配置是: +```hocon +env { + parallelism = 1 + job.mode = "BATCH" + __st_config_ref_path__ = "examples/ref.conf" +} + +source { + Jdbc { + url = "jdbc:mysql://192.168.1.19:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + connection_check_timeout_sec = 100 + user = "root" + password = "111111" + fetch_size = 10000 + query="select * from department" + } +} + +transform { +} + +sink { + console { + } +} +``` + ## 此外 如果你想了解更多关于格式配置的详细信息,请查看 [HOCON](https://github.com/lightbend/config/blob/main/HOCON.md)。 - From 9c0fd04f8796df1e3c91fb465d8a51370aa1def8 Mon Sep 17 00:00:00 2001 From: thirsd Date: Tue, 18 Mar 2025 22:50:49 +0800 Subject: [PATCH 4/8] =?UTF-8?q?update=20config.md=EF=BC=8C=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0ref=E9=85=8D=E7=BD=AE=E5=92=8Cplugin=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E4=BC=98=E5=85=88=E7=BA=A7=E7=9A=84=E6=8F=8F=E8=BF=B0=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/zh/concept/config.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/zh/concept/config.md b/docs/zh/concept/config.md index 64964368910..bd51e9bc910 100644 --- a/docs/zh/concept/config.md +++ b/docs/zh/concept/config.md @@ -339,6 +339,7 @@ mysql_prod { user = "root" password = "111111" fetch_size = 10000 + query="select * from not_exist" } # 定义 Kafka 连接配置 @@ -371,7 +372,9 @@ sink { } } ``` +如果在`plugin`配置和`ref`配置中均定义了同名的配置项,优先级为:`plugin配置 > ref配置`。 +故在配置合并时,将使用plugin中的query配置`query="select * from department"`。 然后最终提交的配置是: ```hocon env { From 73b4e0aefd1e91d9ebc003cf0bb23048e4a60d77 Mon Sep 17 00:00:00 2001 From: thirsd Date: Wed, 19 Mar 2025 21:23:57 +0800 Subject: [PATCH 5/8] =?UTF-8?q?update=20docs/en/concept/config.md=EF=BC=8C?= =?UTF-8?q?Add=20examples=20of=20configuration=20references=20and=20add=20?= =?UTF-8?q?descriptions=20of=20ref=20configuration=20and=20plugin=20config?= =?UTF-8?q?uration=20priorities.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/en/concept/config.md | 85 +++++++++++++++++++ .../core/starter/utils/ConfigShadeUtils.java | 1 - 2 files changed, 85 insertions(+), 1 deletion(-) diff --git a/docs/en/concept/config.md b/docs/en/concept/config.md index a3f4094622b..7b35435c6ef 100644 --- a/docs/en/concept/config.md +++ b/docs/en/concept/config.md @@ -335,6 +335,91 @@ sink { - For dynamic parameters, you can use the following format: `-i date=$(date +"%Y%m%d")`. - Cannot use specified system reserved characters; they will not be replaced by `-i`, such as: `${database_name}`, `${schema_name}`, `${table_name}`, `${schema_full_name}`, `${table_full_name}`, `${primary_key}`, `${unique_key}`, `${field_names}`. For details, please refer to [Sink Parameter Placeholders](sink-options-placeholders.md). + +## Configuration References + +In the configuration file, we can define some common configuration information and directly reference it in the Job configuration, thereby reusing the same configuration. This is commonly applied to the reuse of connection information and the independence of variable configuration files for development and production environments, greatly improving the maintainability of the configuration. + +### reference examples + +ref.conf +```hocon +# 定义 MySQL 连接配置 +mysql_prod { + url = "jdbc:mysql://192.168.1.19:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + connection_check_timeout_sec = 100 + user = "root" + password = "111111" + fetch_size = 10000 + query="select * from not_exist" +} + +# 定义 Kafka 连接配置 +kafka_test { + bootstrap.servers = "kafka-test:9092" + topic = "test_topic" +} +``` + +mysql_to_console_by_ref.conf +```hocon +env { + parallelism = 1 + job.mode = "BATCH" + __st_config_ref_path__ = "D:/MyWorld/05Projects/java/contribute/seatunnel/seatunnel-examples/seatunnel-engine-examples/target/classes/examples/ref.conf" +} + +source { + Jdbc { + __st_config_ref_key__ = "mysql_prod" + query="select * from department" + } +} + +transform { +} + +sink { + console { + } +} +``` +If a parameter is defined in both `plugin` configuration and the `ref` configuration, the priority is:`plugin > ref`。 + +Therefore, the `query` parameter in the plugin will be merged to `query="select * from department"`。 + +And then the final submitted configuration is: +```hocon +env { + parallelism = 1 + job.mode = "BATCH" + __st_config_ref_path__ = "examples/ref.conf" +} + +source { + Jdbc { + url = "jdbc:mysql://192.168.1.19:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + connection_check_timeout_sec = 100 + user = "root" + password = "111111" + fetch_size = 10000 + query="select * from department" + } +} + +transform { +} + +sink { + console { + } +} +``` + + + ## What's More - Start write your own config file now, choose the [connector](../connector-v2/source) you want to use, and configure the parameters according to the connector's documentation. diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java index b7320cf4f13..3b6e6f46a1e 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java @@ -204,7 +204,6 @@ private static Config processConfig( for (String key : configMap.keySet()) { Object ref_config = configMap.get(key); if (ref_config instanceof Map) { - // 2. 安全转换为 Map(泛型擦除后需处理警告) Map refDict = (Map) ref_config; refMap.put(key, refDict); } From 580ecd7a6d32ef3e01164342a714736dd8c52fd7 Mon Sep 17 00:00:00 2001 From: thirsd Date: Sun, 23 Mar 2025 00:47:12 +0800 Subject: [PATCH 6/8] add test examples --- .../core/starter/utils/ConfigShadeTest.java | 22 ++++++++++ .../resources/mysql_to_console_by_ref.conf | 20 +++++++++ .../src/test/resources/ref.conf | 23 ++++++++++ .../MultipleTableJobConfigParserTest.java | 21 ++++++++++ .../resources/fake_to_console_by_ref.conf | 21 ++++++++++ .../src/test/resources/ref.conf | 28 +++++++++++++ .../parse/MultipleTableJobConfigParser.java | 42 +++++++++---------- 7 files changed, 155 insertions(+), 22 deletions(-) create mode 100644 seatunnel-core/seatunnel-core-starter/src/test/resources/mysql_to_console_by_ref.conf create mode 100644 seatunnel-core/seatunnel-core-starter/src/test/resources/ref.conf create mode 100644 seatunnel-engine/seatunnel-engine-client/src/test/resources/fake_to_console_by_ref.conf create mode 100644 seatunnel-engine/seatunnel-engine-client/src/test/resources/ref.conf diff --git a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java index 917bc54f242..266920309a2 100644 --- a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java +++ b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java @@ -82,6 +82,28 @@ public void testParseConfig() throws URISyntaxException { config.getConfigList("source").get(0).getString("secret_key"), SECRET_KEY); } + @Test + public void testParseRefConfig() throws URISyntaxException { + URL resource = ConfigShadeTest.class.getResource("/ref.conf"); + Assertions.assertNotNull(resource); + Config refConfigs = ConfigBuilder.of(Paths.get(resource.toURI())); + // Assertions.assertEquals(config.); + Assertions.assertEquals( + refConfigs.getConfig("mysql_prod").getString("query"), "select * from not_exist"); + Assertions.assertEquals(refConfigs.getConfig("mysql_prod").getString("password"), "111111"); + + URL jobConfigUrl = ConfigShadeTest.class.getResource("/mysql_to_console_by_ref.conf"); + Config jobConfig = ConfigBuilder.of(Paths.get(jobConfigUrl.toURI())); + + Config mysqlPluginConfig = jobConfig.getConfigList("source").get(0); + String refId = mysqlPluginConfig.getString("__st_config_ref_key__ "); + Config refConfig = refConfigs.getConfig(refId); + Config renderConfig = + mysqlPluginConfig.withoutPath("__st_config_ref_key__").withFallback(refConfig); + Assertions.assertEquals(renderConfig.getString("query"), "select * from department"); + Assertions.assertEquals(renderConfig.getString("password"), "111111"); + } + @Test public void testUsePrivacyHandlerHocon() throws URISyntaxException { URL resource = ConfigShadeTest.class.getResource("/config.shade.conf"); diff --git a/seatunnel-core/seatunnel-core-starter/src/test/resources/mysql_to_console_by_ref.conf b/seatunnel-core/seatunnel-core-starter/src/test/resources/mysql_to_console_by_ref.conf new file mode 100644 index 00000000000..32f3cba9db0 --- /dev/null +++ b/seatunnel-core/seatunnel-core-starter/src/test/resources/mysql_to_console_by_ref.conf @@ -0,0 +1,20 @@ +env { + parallelism = 1 + job.mode = "BATCH" + __st_config_ref_path__ = "./common/ref.conf" +} + +source { + Jdbc { + __st_config_ref_key__ = "mysql_prod" + query="select * from department" + } +} + +transform { +} + +sink { + console { + } +} \ No newline at end of file diff --git a/seatunnel-core/seatunnel-core-starter/src/test/resources/ref.conf b/seatunnel-core/seatunnel-core-starter/src/test/resources/ref.conf new file mode 100644 index 00000000000..834ed643735 --- /dev/null +++ b/seatunnel-core/seatunnel-core-starter/src/test/resources/ref.conf @@ -0,0 +1,23 @@ + +# 定义 MySQL 连接配置 +mysql_prod { + url = "jdbc:mysql://192.168.1.19:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + connection_check_timeout_sec = 100 + user = "root" + password = "111111" + fetch_size = 10000 + query="select * from not_exist" +} + +# 定义 Kafka 连接配置 +kafka_test { + bootstrap.servers = "kafka-test:9092" + topic = "test_topic" +} + +sqlite_test { + url = "jdbc:sqlite:D:/MyWorld/05Projects/java/Seatunnel/SeaTunnel/db/test.db" + driver = "org.sqlite.JDBC" +} + diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java index f2e5036d549..fb1fbc6ebd0 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java @@ -72,6 +72,27 @@ public void testSimpleJobParse() { Assertions.assertEquals(3, actions.get(0).getParallelism()); } + @Test + public void testRefJobParse() { + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("/fake_to_console_by_ref.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setJobContext(new JobContext()); + MultipleTableJobConfigParser jobConfigParser = + new MultipleTableJobConfigParser(filePath, new IdGenerator(), jobConfig); + ImmutablePair, Set> parse = jobConfigParser.parse(null); + List actions = parse.getLeft(); + Assertions.assertEquals(1, actions.size()); + Assertions.assertEquals("Sink[0]-console-MultiTableSink", actions.get(0).getName()); + Assertions.assertEquals(1, actions.get(0).getUpstream().size()); + Assertions.assertEquals( + "Source[0]-FakeSource", actions.get(0).getUpstream().get(0).getName()); + + Assertions.assertFalse(jobConfig.getJobContext().isEnableCheckpoint()); + Assertions.assertEquals(100, actions.get(0).getUpstream().get(0).getParallelism()); + Assertions.assertEquals(100, actions.get(0).getParallelism()); + } + @Test public void testComplexJobParse() { Common.setDeployMode(DeployMode.CLIENT); diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fake_to_console_by_ref.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fake_to_console_by_ref.conf new file mode 100644 index 00000000000..7953b66bb7d --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fake_to_console_by_ref.conf @@ -0,0 +1,21 @@ +env { + parallelism = 1 + job.mode = "BATCH" + __st_config_ref_path__ = "./ref.conf" +} + +source { + FakeSource { + __st_config_ref_key__ = "fake_conn" + parallelism = 100 + bytes.length = 5 + } +} + +transform { +} + +sink { + console { + } +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/ref.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/ref.conf new file mode 100644 index 00000000000..9251ecf4ee3 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/ref.conf @@ -0,0 +1,28 @@ + +# 定义 MySQL 连接配置 +mysql_prod { + url = "jdbc:mysql://192.168.1.19:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + connection_check_timeout_sec = 100 + user = "root" + password = "111111" + fetch_size = 10000 + query="select * from not_exist" +} + +# 定义 Kafka 连接配置 +kafka_test { + bootstrap.servers = "kafka-test:9092" + topic = "test_topic" +} + + +fake_conn { + parallelism = 1 + schema = { + fields { + name = "string" + age = "int" + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index cce3b8a895d..1ebf03ef95b 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -160,17 +160,21 @@ public MultipleTableJobConfigParser( this.isStartWithSavePoint = isStartWithSavePoint; this.seaTunnelJobConfig = ConfigBuilder.of(Paths.get(jobDefineFilePath), variables); this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); - if (null != envOptions.getOptional(EnvCommonOptions.REF_PATH).orElse(null)) { - this.refMaps = - ConfigBuilder.of( - Paths.get( - envOptions - .getOptional(EnvCommonOptions.REF_PATH) - .orElseThrow( - () -> - new IllegalStateException( - "path of ref config is unset"))), - variables); + + String RefPath = envOptions.getOptional(EnvCommonOptions.REF_PATH).orElse(null); + if (null != RefPath) { + if (!Paths.get(RefPath).isAbsolute()) { + RefPath = + Paths.get( + Paths.get(jobDefineFilePath) + .toAbsolutePath() + .getParent() + .toString(), + RefPath) + .toAbsolutePath() + .toString(); + } + this.refMaps = ConfigBuilder.of(Paths.get(RefPath), variables); } else { this.refMaps = null; } @@ -190,20 +194,14 @@ public MultipleTableJobConfigParser( this.isStartWithSavePoint = isStartWithSavePoint; this.seaTunnelJobConfig = seaTunnelJobConfig; this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); - if (null != envOptions.getOptional(EnvCommonOptions.REF_PATH).orElse(null)) { - this.refMaps = - ConfigBuilder.of( - Paths.get( - envOptions - .getOptional(EnvCommonOptions.REF_PATH) - .orElseThrow( - () -> - new IllegalStateException( - "path of ref config is unset"))), - null); + + String RefPath = envOptions.getOptional(EnvCommonOptions.REF_PATH).orElse(null); + if (null != RefPath) { + this.refMaps = ConfigBuilder.of(Paths.get(RefPath), null); } else { this.refMaps = null; } + this.pipelineCheckpoints = pipelineCheckpoints; } From 7ab6ec750fe724f5eaf175a07d8a51252c420fcc Mon Sep 17 00:00:00 2001 From: thirsd Date: Sun, 23 Mar 2025 01:26:43 +0800 Subject: [PATCH 7/8] add license header --- .../test/resources/mysql_to_console_by_ref.conf | 17 +++++++++++++++++ .../src/test/resources/ref.conf | 16 ++++++++++++++++ .../test/resources/fake_to_console_by_ref.conf | 17 +++++++++++++++++ .../src/test/resources/ref.conf | 16 ++++++++++++++++ 4 files changed, 66 insertions(+) diff --git a/seatunnel-core/seatunnel-core-starter/src/test/resources/mysql_to_console_by_ref.conf b/seatunnel-core/seatunnel-core-starter/src/test/resources/mysql_to_console_by_ref.conf index 32f3cba9db0..ff973bc2ded 100644 --- a/seatunnel-core/seatunnel-core-starter/src/test/resources/mysql_to_console_by_ref.conf +++ b/seatunnel-core/seatunnel-core-starter/src/test/resources/mysql_to_console_by_ref.conf @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + env { parallelism = 1 job.mode = "BATCH" diff --git a/seatunnel-core/seatunnel-core-starter/src/test/resources/ref.conf b/seatunnel-core/seatunnel-core-starter/src/test/resources/ref.conf index 834ed643735..81a8814a480 100644 --- a/seatunnel-core/seatunnel-core-starter/src/test/resources/ref.conf +++ b/seatunnel-core/seatunnel-core-starter/src/test/resources/ref.conf @@ -1,3 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# # 定义 MySQL 连接配置 mysql_prod { diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fake_to_console_by_ref.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fake_to_console_by_ref.conf index 7953b66bb7d..22681995b0f 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fake_to_console_by_ref.conf +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fake_to_console_by_ref.conf @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + env { parallelism = 1 job.mode = "BATCH" diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/ref.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/ref.conf index 9251ecf4ee3..d66c5610c80 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/ref.conf +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/ref.conf @@ -1,3 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# # 定义 MySQL 连接配置 mysql_prod { From 2973521712204bd35eceb921715c279805bde841 Mon Sep 17 00:00:00 2001 From: thirsd Date: Mon, 31 Mar 2025 21:19:53 +0800 Subject: [PATCH 8/8] Update seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../engine/core/parse/MultipleTableJobConfigParser.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 1ebf03ef95b..5417fcbf90e 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -206,8 +206,8 @@ public MultipleTableJobConfigParser( } private Config mergeWithRefConfig(Config pluginConfig, Config refConfigs) { - if (pluginConfig.hasPath("__st_config_ref_key__ ")) { - String refId = pluginConfig.getString("__st_config_ref_key__ "); + if (pluginConfig.hasPath("__st_config_ref_key__")) { + String refId = pluginConfig.getString("__st_config_ref_key__"); if (null != refConfigs) { Config refConfig = refConfigs.getConfig(refId); if (refConfig == null) {