Skip to content

[Feature] Support connection config, and use conn_id to replace conn info in job config #8945

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public final class Constants {

public static final String SINK = "sink";

public static final String CONNECTION = "connection";

public static final String SOURCE_SERIALIZATION = "source.serialization";

public static final String SINK_SERIALIZATION = "sink.serialization";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ public abstract class AbstractCommandArgs extends CommandArgs {
description = "Config file")
protected String configFile;

/** connect file path */
@Parameter(
names = {"-conn", "--connect"},
description = "Connect Config file")
protected String connectFile;

/** user-defined parameters */
@Parameter(
names = {"-i", "--variable"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,28 +173,42 @@ private static Config processConfig(
String jsonString = config.root().render(ConfigRenderOptions.concise());
ObjectNode jsonNodes = JsonUtils.parseObject(jsonString);
Map<String, Object> configMap = JsonUtils.toMap(jsonNodes);
List<Map<String, Object>> sources =
(ArrayList<Map<String, Object>>) configMap.get(Constants.SOURCE);
List<Map<String, Object>> sinks =
(ArrayList<Map<String, Object>>) configMap.get(Constants.SINK);
Preconditions.checkArgument(
!sources.isEmpty(), "Miss <Source> config! Please check the config file.");
Preconditions.checkArgument(
!sinks.isEmpty(), "Miss <Sink> config! Please check the config file.");
sources.forEach(
source -> {
for (String sensitiveOption : sensitiveOptions) {
source.computeIfPresent(sensitiveOption, processFunction);
}
});
sinks.forEach(
sink -> {
for (String sensitiveOption : sensitiveOptions) {
sink.computeIfPresent(sensitiveOption, processFunction);
}
});
configMap.put(Constants.SOURCE, sources);
configMap.put(Constants.SINK, sinks);
if (configMap.containsKey(Constants.CONNECTION)) {
Map<String, Map<String, Object>> connects =
(Map<String, Map<String, Object>>) configMap.get(Constants.CONNECTION);
connects.forEach(
(conn_id, connect) -> {
Map<String, Object> conn_dit = new HashMap<>(connect.size());
for (String sensitiveOption : sensitiveOptions) {
conn_dit.computeIfPresent(sensitiveOption, processFunction);
}
connect = conn_dit;
Comment on lines +181 to +185
Copy link
Preview

Copilot AI Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The variable name 'conn_dit' appears to be a typo; consider renaming it to 'conn_dict' for clarity.

Suggested change
Map<String, Object> conn_dit = new HashMap<>(connect.size());
for (String sensitiveOption : sensitiveOptions) {
conn_dit.computeIfPresent(sensitiveOption, processFunction);
}
connect = conn_dit;
Map<String, Object> conn_dict = new HashMap<>(connect.size());
for (String sensitiveOption : sensitiveOptions) {
conn_dict.computeIfPresent(sensitiveOption, processFunction);
}
connect = conn_dict;

Copilot uses AI. Check for mistakes.

});
configMap.put(Constants.CONNECTION, connects);
} else {
List<Map<String, Object>> sources =
(ArrayList<Map<String, Object>>) configMap.get(Constants.SOURCE);
List<Map<String, Object>> sinks =
(ArrayList<Map<String, Object>>) configMap.get(Constants.SINK);
Preconditions.checkArgument(
!sources.isEmpty(), "Miss <Source> config! Please check the config file.");
Preconditions.checkArgument(
!sinks.isEmpty(), "Miss <Sink> config! Please check the config file.");
sources.forEach(
source -> {
for (String sensitiveOption : sensitiveOptions) {
source.computeIfPresent(sensitiveOption, processFunction);
}
});
sinks.forEach(
sink -> {
for (String sensitiveOption : sensitiveOptions) {
sink.computeIfPresent(sensitiveOption, processFunction);
}
});
configMap.put(Constants.SOURCE, sources);
configMap.put(Constants.SINK, sinks);
}
return ConfigFactory.parseMap(configMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import lombok.extern.slf4j.Slf4j;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
Expand Down Expand Up @@ -136,6 +137,14 @@ public void execute() throws CommandExecuteException {
} else {
Path configFile = FileUtils.getConfigPath(clientCommandArgs);
checkConfigExist(configFile);

String str_conn_path = null;
if (null != clientCommandArgs.getConnectFile()) {
Path connFile = Paths.get(clientCommandArgs.getConnectFile());
checkConfigExist(connFile);
str_conn_path = connFile.toAbsolutePath().toString();
}

JobConfig jobConfig = new JobConfig();
ClientJobExecutionEnvironment jobExecutionEnv;
jobConfig.setName(clientCommandArgs.getJobName());
Expand All @@ -146,7 +155,8 @@ public void execute() throws CommandExecuteException {
clientCommandArgs.getVariables(),
jobConfig,
seaTunnelConfig,
Long.parseLong(clientCommandArgs.getRestoreJobId()));
Long.parseLong(clientCommandArgs.getRestoreJobId()),
str_conn_path);
} else {
jobExecutionEnv =
engineClient.createExecutionContext(
Expand All @@ -156,7 +166,8 @@ public void execute() throws CommandExecuteException {
seaTunnelConfig,
clientCommandArgs.getCustomJobId() != null
? Long.parseLong(clientCommandArgs.getCustomJobId())
: null);
: null,
str_conn_path);
}

// get job start time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,25 @@ public ClientJobExecutionEnvironment createExecutionContext(
jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig, jobId);
}

@Override
public ClientJobExecutionEnvironment createExecutionContext(
@NonNull String filePath,
List<String> variables,
@NonNull JobConfig jobConfig,
@NonNull SeaTunnelConfig seaTunnelConfig,
Long jobId,
String connPath) {
return new ClientJobExecutionEnvironment(
jobConfig,
filePath,
variables,
hazelcastClient,
seaTunnelConfig,
false,
jobId,
connPath);
}

@Override
public ClientJobExecutionEnvironment restoreExecutionContext(
@NonNull String filePath,
Expand All @@ -98,6 +117,25 @@ public ClientJobExecutionEnvironment restoreExecutionContext(
jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig, true, jobId);
}

@Override
public ClientJobExecutionEnvironment restoreExecutionContext(
@NonNull String filePath,
List<String> variables,
@NonNull JobConfig jobConfig,
@NonNull SeaTunnelConfig seaTunnelConfig,
@NonNull Long jobId,
String connPath) {
return new ClientJobExecutionEnvironment(
jobConfig,
filePath,
variables,
hazelcastClient,
seaTunnelConfig,
true,
jobId,
connPath);
}

@Override
public JobClient createJobClient() {
return new JobClient(hazelcastClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ ClientJobExecutionEnvironment createExecutionContext(
@NonNull SeaTunnelConfig seaTunnelConfig,
Long jobId);

ClientJobExecutionEnvironment createExecutionContext(
@NonNull String filePath,
List<String> variables,
@NonNull JobConfig config,
@NonNull SeaTunnelConfig seaTunnelConfig,
Long jobId,
String connPath);

ClientJobExecutionEnvironment restoreExecutionContext(
@NonNull String filePath,
@NonNull JobConfig config,
Expand All @@ -59,6 +67,14 @@ ClientJobExecutionEnvironment restoreExecutionContext(
@NonNull SeaTunnelConfig seaTunnelConfig,
@NonNull Long jobId);

ClientJobExecutionEnvironment restoreExecutionContext(
@NonNull String filePath,
List<String> variables,
@NonNull JobConfig config,
@NonNull SeaTunnelConfig seaTunnelConfig,
@NonNull Long jobId,
String connPath);

JobClient createJobClient();

void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class ClientJobExecutionEnvironment extends AbstractJobEnvironment {

private final String jobFilePath;

private final String connFilePath;

private final List<String> variables;

private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
Expand All @@ -63,9 +65,11 @@ public ClientJobExecutionEnvironment(
SeaTunnelHazelcastClient seaTunnelHazelcastClient,
SeaTunnelConfig seaTunnelConfig,
boolean isStartWithSavePoint,
Long jobId) {
Long jobId,
String connFilePath) {
super(jobConfig, isStartWithSavePoint);
this.jobFilePath = jobFilePath;
this.connFilePath = connFilePath;
this.variables = variables;
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobClient = new JobClient(seaTunnelHazelcastClient);
Expand All @@ -80,6 +84,25 @@ public ClientJobExecutionEnvironment(
this.connectorPackageClient = new ConnectorPackageClient(seaTunnelHazelcastClient);
}

public ClientJobExecutionEnvironment(
JobConfig jobConfig,
String jobFilePath,
List<String> variables,
SeaTunnelHazelcastClient seaTunnelHazelcastClient,
SeaTunnelConfig seaTunnelConfig,
boolean isStartWithSavePoint,
Long jobId) {
this(
jobConfig,
jobFilePath,
variables,
seaTunnelHazelcastClient,
seaTunnelConfig,
isStartWithSavePoint,
jobId,
null);
}

public ClientJobExecutionEnvironment(
JobConfig jobConfig,
String jobFilePath,
Expand All @@ -94,7 +117,8 @@ public ClientJobExecutionEnvironment(
seaTunnelHazelcastClient,
seaTunnelConfig,
false,
jobId);
jobId,
null);
}

/** Search all jars in SEATUNNEL_HOME/plugins */
Expand All @@ -114,7 +138,8 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
jobConfig,
commonPluginJars,
isStartWithSavePoint,
pipelineCheckpoints);
pipelineCheckpoints,
connFilePath);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class MultipleTableJobConfigParser {

private final List<URL> commonPluginJars;
private final Config seaTunnelJobConfig;
private final Config seaTunnelConnConfig;

private final ReadonlyConfig envOptions;

Expand Down Expand Up @@ -142,7 +143,8 @@ public MultipleTableJobConfigParser(
jobConfig,
commonPluginJars,
isStartWithSavePoint,
Collections.emptyList());
Collections.emptyList(),
null);
}

public MultipleTableJobConfigParser(
Expand All @@ -152,12 +154,18 @@ public MultipleTableJobConfigParser(
JobConfig jobConfig,
List<URL> commonPluginJars,
boolean isStartWithSavePoint,
List<JobPipelineCheckpointData> pipelineCheckpoints) {
List<JobPipelineCheckpointData> pipelineCheckpoints,
String connFilePath) {
this.idGenerator = idGenerator;
this.jobConfig = jobConfig;
this.commonPluginJars = commonPluginJars;
this.isStartWithSavePoint = isStartWithSavePoint;
this.seaTunnelJobConfig = ConfigBuilder.of(Paths.get(jobDefineFilePath), variables);
if (null != connFilePath) {
this.seaTunnelConnConfig = ConfigBuilder.of(Paths.get(connFilePath), variables);
} else {
this.seaTunnelConnConfig = null;
}
this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
this.pipelineCheckpoints = pipelineCheckpoints;
}
Expand All @@ -176,19 +184,46 @@ public MultipleTableJobConfigParser(
this.seaTunnelJobConfig = seaTunnelJobConfig;
this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
this.pipelineCheckpoints = pipelineCheckpoints;
this.seaTunnelConnConfig = null;
}

private Config mergeWithConnectionConfig(Config pluginConfig, Config connConfig) {
if (pluginConfig.hasPath("conn_id")) {
String connId = pluginConfig.getString("conn_id");
if (null != connConfig) {
Config connectionConfig = connConfig.getConfig("connection").getConfig(connId);

if (connectionConfig == null) {
throw new IllegalArgumentException("Connection ID not found: " + connId);
}
// support conn_id replace to real connect configuration
return pluginConfig.withoutPath("conn_id").withFallback(connectionConfig);
} else {
return pluginConfig;
}
} else {
return pluginConfig;
}
}

public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService classLoaderService) {
this.fillJobConfigAndCommonJars();
List<? extends Config> sourceConfigs =
ArrayList<Config> sourceConfigs = new ArrayList<>();
for (Config sink :
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "source", Collections.emptyList());
seaTunnelJobConfig, "source", Collections.emptyList())) {
sourceConfigs.add(this.mergeWithConnectionConfig(sink, this.seaTunnelConnConfig));
}

List<? extends Config> transformConfigs =
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "transform", Collections.emptyList());
List<? extends Config> sinkConfigs =
ArrayList<Config> sinkConfigs = new ArrayList<>();
for (Config sink :
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "sink", Collections.emptyList());
seaTunnelJobConfig, "sink", Collections.emptyList())) {
sinkConfigs.add(this.mergeWithConnectionConfig(sink, this.seaTunnelConnConfig));
}

List<URL> sourceConnectorJars = getConnectorJarList(sourceConfigs, PluginType.SOURCE);
List<URL> transformConnectorJars =
Expand Down
Loading