Skip to content

[Feature][Config] Support nested references to external files in configuration files #8984

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/zh/concept/JobEnvConfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
当值为`CLIENT`时,SaveMode操作在作业提交的过程中执行,使用shell脚本提交作业时,该过程在提交作业的shell进程中执行。使用rest api提交作业时,该过程在http请求的处理线程中执行。
请尽量使用`CLUSTER`模式,因为当`CLUSTER`模式没有问题时,我们将删除`CLIENT`模式。

### __st_config_ref_path__

此参数用于复用参数配置,通过Ref指定通用的配置文件。
当设置该参数后,就可以在job config中的source、transform、sink中使用`__st_config_ref_key__`指定复用的配置信息,并合并至Job配置信息。


## Flink 引擎参数

这里列出了一些与 Flink 中名称相对应的 SeaTunnel 参数名称,并非全部,更多内容请参考官方 [Flink Documentation](https://flink.apache.org/) for more.
Expand Down
80 changes: 79 additions & 1 deletion docs/zh/concept/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,85 @@ sink {
- 值不能包含空格`' '`。例如, `-i jobName='this is a job name'`将被替换为`job.name = "this"`。 你可以使用环境变量传递带有空格的值。
- 如果要使用动态参数,可以使用以下格式: `-i date=$(date +"%Y%m%d")`。
- 不能使用指定系统保留字符,它将不会被`-i`替换,如:`${database_name}`、`${schema_name}`、`${table_name}`、`${schema_full_name}`、`${table_full_name}`、`${primary_key}`、`${unique_key}`、`${field_names}`。具体可参考[Sink参数占位符](sink-options-placeholders.md)

## 配置引用

在配置文件中,我们可以定义一些通用的配置信息,在Job配置中直接引用,从而复用相同的配置。
常见应用于连接信息复用,以及开发和生产环境的变量配置文件独立,极大提高配置维护性。

具体样例:
ref.conf
```hocon
# 定义 MySQL 连接配置
mysql_prod {
url = "jdbc:mysql://192.168.1.19:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "111111"
fetch_size = 10000
}

# 定义 Kafka 连接配置
kafka_test {
bootstrap.servers = "kafka-test:9092"
topic = "test_topic"
}
```

mysql_to_console_by_ref.conf
```hocon
env {
parallelism = 1
job.mode = "BATCH"
__st_config_ref_path__ = "D:/MyWorld/05Projects/java/contribute/seatunnel/seatunnel-examples/seatunnel-engine-examples/target/classes/examples/ref.conf"
}

source {
Jdbc {
__st_config_ref_key__ = "mysql_prod"
query="select * from department"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code, looks if the mysql_prod ref config also contains query parameter, it will overwrite the existed query parameter.

pluginConfig.withoutPath("__st_config_ref_key__").withFallback(refConfig);

Please add the priority for replacements when there are duplicate parameters.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

We can just describe it in the document to let use know this thing. Replace behavior is good to me.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已经补充了配置优先级。

}
}

transform {
}

sink {
console {
}
}
```

然后最终提交的配置是:
```hocon
env {
parallelism = 1
job.mode = "BATCH"
__st_config_ref_path__ = "examples/ref.conf"
}

source {
Jdbc {
url = "jdbc:mysql://192.168.1.19:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "111111"
fetch_size = 10000
query="select * from department"
}
}

transform {
}

sink {
console {
}
}
```

## 此外

如果你想了解更多关于格式配置的详细信息,请查看 [HOCON](https://github.com/lightbend/config/blob/main/HOCON.md)。

Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,10 @@ public class EnvCommonOptions {
.mapType()
.noDefaultValue()
.withDescription("Define the worker where the job runs by tag");

public static Option<String> REF_PATH =
Options.key("__st_config_ref_path__")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__st_config_ref_path__ looks weird, how about naming it to config_ref_path?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not surprising, he cannot conflict with normal attributes

.stringType()
.defaultValue(null)
.withDescription("Define the ref config file path");
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,29 +173,51 @@ private static Config processConfig(
String jsonString = config.root().render(ConfigRenderOptions.concise());
ObjectNode jsonNodes = JsonUtils.parseObject(jsonString);
Map<String, Object> configMap = JsonUtils.toMap(jsonNodes);
List<Map<String, Object>> sources =
(ArrayList<Map<String, Object>>) configMap.get(Constants.SOURCE);
List<Map<String, Object>> sinks =
(ArrayList<Map<String, Object>>) configMap.get(Constants.SINK);
Preconditions.checkArgument(
!sources.isEmpty(), "Miss <Source> config! Please check the config file.");
Preconditions.checkArgument(
!sinks.isEmpty(), "Miss <Sink> config! Please check the config file.");
sources.forEach(
source -> {
for (String sensitiveOption : sensitiveOptions) {
source.computeIfPresent(sensitiveOption, processFunction);
}
});
sinks.forEach(
sink -> {
for (String sensitiveOption : sensitiveOptions) {
sink.computeIfPresent(sensitiveOption, processFunction);
}
});
configMap.put(Constants.SOURCE, sources);
configMap.put(Constants.SINK, sinks);
return ConfigFactory.parseMap(configMap);
if (configMap.containsKey(Constants.SOURCE)) {

List<Map<String, Object>> sources =
(ArrayList<Map<String, Object>>) configMap.get(Constants.SOURCE);
List<Map<String, Object>> sinks =
(ArrayList<Map<String, Object>>) configMap.get(Constants.SINK);
Preconditions.checkArgument(
!sources.isEmpty(), "Miss <Source> config! Please check the config file.");
Preconditions.checkArgument(
!sinks.isEmpty(), "Miss <Sink> config! Please check the config file.");
sources.forEach(
source -> {
for (String sensitiveOption : sensitiveOptions) {
source.computeIfPresent(sensitiveOption, processFunction);
}
});
sinks.forEach(
sink -> {
for (String sensitiveOption : sensitiveOptions) {
sink.computeIfPresent(sensitiveOption, processFunction);
}
});
configMap.put(Constants.SOURCE, sources);
configMap.put(Constants.SINK, sinks);
return ConfigFactory.parseMap(configMap);
} else {
Map<String, Map<String, Object>> refMap = new HashMap<>();
// get map element in ref
for (String key : configMap.keySet()) {
Object ref_config = configMap.get(key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should name the variables in the camelcase style.

if (ref_config instanceof Map) {
// 2. 安全转换为 Map(泛型擦除后需处理警告)
Map<String, Object> refDict = (Map<String, Object>) ref_config;
refMap.put(key, refDict);
}
}
refMap.forEach(
(refId, RefConfig) -> {
Map<String, Object> ref_dit = new HashMap<>(RefConfig.size());
for (String sensitiveOption : sensitiveOptions) {
ref_dit.computeIfPresent(sensitiveOption, processFunction);
}
});
return ConfigFactory.parseMap(refMap);
}
}

public static Set<String> getSensitiveOptions(Config config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class MultipleTableJobConfigParser {

private final List<URL> commonPluginJars;
private final Config seaTunnelJobConfig;
private final Config refMaps;

private final ReadonlyConfig envOptions;

Expand Down Expand Up @@ -159,6 +160,20 @@ public MultipleTableJobConfigParser(
this.isStartWithSavePoint = isStartWithSavePoint;
this.seaTunnelJobConfig = ConfigBuilder.of(Paths.get(jobDefineFilePath), variables);
this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
if (null != envOptions.getOptional(EnvCommonOptions.REF_PATH).orElse(null)) {
this.refMaps =
ConfigBuilder.of(
Paths.get(
envOptions
.getOptional(EnvCommonOptions.REF_PATH)
.orElseThrow(
() ->
new IllegalStateException(
"path of ref config is unset"))),
variables);
} else {
this.refMaps = null;
}
this.pipelineCheckpoints = pipelineCheckpoints;
}

Expand All @@ -175,21 +190,60 @@ public MultipleTableJobConfigParser(
this.isStartWithSavePoint = isStartWithSavePoint;
this.seaTunnelJobConfig = seaTunnelJobConfig;
this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
if (null != envOptions.getOptional(EnvCommonOptions.REF_PATH).orElse(null)) {
this.refMaps =
ConfigBuilder.of(
Paths.get(
envOptions
.getOptional(EnvCommonOptions.REF_PATH)
.orElseThrow(
() ->
new IllegalStateException(
"path of ref config is unset"))),
null);
} else {
this.refMaps = null;
}
this.pipelineCheckpoints = pipelineCheckpoints;
}

private Config mergeWithRefConfig(Config pluginConfig, Config refConfigs) {
if (pluginConfig.hasPath("__st_config_ref_key__ ")) {
String refId = pluginConfig.getString("__st_config_ref_key__ ");
if (null != refConfigs) {
Config refConfig = refConfigs.getConfig(refId);
if (refConfig == null) {
throw new IllegalArgumentException("ref ID not found: " + refId);
}
return pluginConfig.withoutPath("__st_config_ref_key__").withFallback(refConfig);
} else {
return pluginConfig;
}
} else {
return pluginConfig;
}
}

public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService classLoaderService) {
this.fillJobConfigAndCommonJars();
List<? extends Config> sourceConfigs =
ArrayList<Config> sourceConfigs = new ArrayList<>();
for (Config source :
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "source", Collections.emptyList());
List<? extends Config> transformConfigs =
seaTunnelJobConfig, "source", Collections.emptyList())) {
sourceConfigs.add(this.mergeWithRefConfig(source, this.refMaps));
}
ArrayList<Config> transformConfigs = new ArrayList<>();
for (Config transform :
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "transform", Collections.emptyList());
List<? extends Config> sinkConfigs =
seaTunnelJobConfig, "transform", Collections.emptyList())) {
transformConfigs.add(this.mergeWithRefConfig(transform, this.refMaps));
}
ArrayList<Config> sinkConfigs = new ArrayList<>();
for (Config sink :
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "sink", Collections.emptyList());

seaTunnelJobConfig, "sink", Collections.emptyList())) {
sinkConfigs.add(this.mergeWithRefConfig(sink, this.refMaps));
}
List<URL> sourceConnectorJars = getConnectorJarList(sourceConfigs, PluginType.SOURCE);
List<URL> transformConnectorJars =
getConnectorJarList(transformConfigs, PluginType.TRANSFORM);
Expand Down
Loading