diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java index cadd447efc7..e3a93cfa366 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java @@ -29,6 +29,8 @@ public final class Constants { public static final String SINK = "sink"; + public static final String CONNECTION = "connection"; + public static final String SOURCE_SERIALIZATION = "source.serialization"; public static final String SINK_SERIALIZATION = "sink.serialization"; diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java index 5d620c96eea..54266974133 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java @@ -38,6 +38,12 @@ public abstract class AbstractCommandArgs extends CommandArgs { description = "Config file") protected String configFile; + /** connect file path */ + @Parameter( + names = {"-conn", "--connect"}, + description = "Connect Config file") + protected String connectFile; + /** user-defined parameters */ @Parameter( names = {"-i", "--variable"}, diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java index 27a6e753c99..45639fe2c61 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java @@ -173,28 +173,42 @@ private static Config processConfig( String jsonString = config.root().render(ConfigRenderOptions.concise()); ObjectNode jsonNodes = JsonUtils.parseObject(jsonString); Map configMap = JsonUtils.toMap(jsonNodes); - List> sources = - (ArrayList>) configMap.get(Constants.SOURCE); - List> sinks = - (ArrayList>) configMap.get(Constants.SINK); - Preconditions.checkArgument( - !sources.isEmpty(), "Miss config! Please check the config file."); - Preconditions.checkArgument( - !sinks.isEmpty(), "Miss config! Please check the config file."); - sources.forEach( - source -> { - for (String sensitiveOption : sensitiveOptions) { - source.computeIfPresent(sensitiveOption, processFunction); - } - }); - sinks.forEach( - sink -> { - for (String sensitiveOption : sensitiveOptions) { - sink.computeIfPresent(sensitiveOption, processFunction); - } - }); - configMap.put(Constants.SOURCE, sources); - configMap.put(Constants.SINK, sinks); + if (configMap.containsKey(Constants.CONNECTION)) { + Map> connects = + (Map>) configMap.get(Constants.CONNECTION); + connects.forEach( + (conn_id, connect) -> { + Map conn_dit = new HashMap<>(connect.size()); + for (String sensitiveOption : sensitiveOptions) { + conn_dit.computeIfPresent(sensitiveOption, processFunction); + } + connect = conn_dit; + }); + configMap.put(Constants.CONNECTION, connects); + } else { + List> sources = + (ArrayList>) configMap.get(Constants.SOURCE); + List> sinks = + (ArrayList>) configMap.get(Constants.SINK); + Preconditions.checkArgument( + !sources.isEmpty(), "Miss config! Please check the config file."); + Preconditions.checkArgument( + !sinks.isEmpty(), "Miss config! Please check the config file."); + sources.forEach( + source -> { + for (String sensitiveOption : sensitiveOptions) { + source.computeIfPresent(sensitiveOption, processFunction); + } + }); + sinks.forEach( + sink -> { + for (String sensitiveOption : sensitiveOptions) { + sink.computeIfPresent(sensitiveOption, processFunction); + } + }); + configMap.put(Constants.SOURCE, sources); + configMap.put(Constants.SINK, sinks); + } return ConfigFactory.parseMap(configMap); } diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index bb7fe67a874..67d72aec03a 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -52,6 +52,7 @@ import lombok.extern.slf4j.Slf4j; import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; import java.time.LocalDateTime; import java.util.Collections; @@ -136,6 +137,14 @@ public void execute() throws CommandExecuteException { } else { Path configFile = FileUtils.getConfigPath(clientCommandArgs); checkConfigExist(configFile); + + String str_conn_path = null; + if (null != clientCommandArgs.getConnectFile()) { + Path connFile = Paths.get(clientCommandArgs.getConnectFile()); + checkConfigExist(connFile); + str_conn_path = connFile.toAbsolutePath().toString(); + } + JobConfig jobConfig = new JobConfig(); ClientJobExecutionEnvironment jobExecutionEnv; jobConfig.setName(clientCommandArgs.getJobName()); @@ -146,7 +155,8 @@ public void execute() throws CommandExecuteException { clientCommandArgs.getVariables(), jobConfig, seaTunnelConfig, - Long.parseLong(clientCommandArgs.getRestoreJobId())); + Long.parseLong(clientCommandArgs.getRestoreJobId()), + str_conn_path); } else { jobExecutionEnv = engineClient.createExecutionContext( @@ -156,7 +166,8 @@ public void execute() throws CommandExecuteException { seaTunnelConfig, clientCommandArgs.getCustomJobId() != null ? Long.parseLong(clientCommandArgs.getCustomJobId()) - : null); + : null, + str_conn_path); } // get job start time diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java index c38d4adc927..c92265453c2 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java @@ -78,6 +78,25 @@ public ClientJobExecutionEnvironment createExecutionContext( jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig, jobId); } + @Override + public ClientJobExecutionEnvironment createExecutionContext( + @NonNull String filePath, + List variables, + @NonNull JobConfig jobConfig, + @NonNull SeaTunnelConfig seaTunnelConfig, + Long jobId, + String connPath) { + return new ClientJobExecutionEnvironment( + jobConfig, + filePath, + variables, + hazelcastClient, + seaTunnelConfig, + false, + jobId, + connPath); + } + @Override public ClientJobExecutionEnvironment restoreExecutionContext( @NonNull String filePath, @@ -98,6 +117,25 @@ public ClientJobExecutionEnvironment restoreExecutionContext( jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig, true, jobId); } + @Override + public ClientJobExecutionEnvironment restoreExecutionContext( + @NonNull String filePath, + List variables, + @NonNull JobConfig jobConfig, + @NonNull SeaTunnelConfig seaTunnelConfig, + @NonNull Long jobId, + String connPath) { + return new ClientJobExecutionEnvironment( + jobConfig, + filePath, + variables, + hazelcastClient, + seaTunnelConfig, + true, + jobId, + connPath); + } + @Override public JobClient createJobClient() { return new JobClient(hazelcastClient); diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java index a275f3cab77..a5b352ba1a4 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java @@ -46,6 +46,14 @@ ClientJobExecutionEnvironment createExecutionContext( @NonNull SeaTunnelConfig seaTunnelConfig, Long jobId); + ClientJobExecutionEnvironment createExecutionContext( + @NonNull String filePath, + List variables, + @NonNull JobConfig config, + @NonNull SeaTunnelConfig seaTunnelConfig, + Long jobId, + String connPath); + ClientJobExecutionEnvironment restoreExecutionContext( @NonNull String filePath, @NonNull JobConfig config, @@ -59,6 +67,14 @@ ClientJobExecutionEnvironment restoreExecutionContext( @NonNull SeaTunnelConfig seaTunnelConfig, @NonNull Long jobId); + ClientJobExecutionEnvironment restoreExecutionContext( + @NonNull String filePath, + List variables, + @NonNull JobConfig config, + @NonNull SeaTunnelConfig seaTunnelConfig, + @NonNull Long jobId, + String connPath); + JobClient createJobClient(); void close(); diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java index 1f526a9a43a..ad9955d4e0b 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java @@ -45,6 +45,8 @@ public class ClientJobExecutionEnvironment extends AbstractJobEnvironment { private final String jobFilePath; + private final String connFilePath; + private final List variables; private final SeaTunnelHazelcastClient seaTunnelHazelcastClient; @@ -63,9 +65,11 @@ public ClientJobExecutionEnvironment( SeaTunnelHazelcastClient seaTunnelHazelcastClient, SeaTunnelConfig seaTunnelConfig, boolean isStartWithSavePoint, - Long jobId) { + Long jobId, + String connFilePath) { super(jobConfig, isStartWithSavePoint); this.jobFilePath = jobFilePath; + this.connFilePath = connFilePath; this.variables = variables; this.seaTunnelHazelcastClient = seaTunnelHazelcastClient; this.jobClient = new JobClient(seaTunnelHazelcastClient); @@ -80,6 +84,25 @@ public ClientJobExecutionEnvironment( this.connectorPackageClient = new ConnectorPackageClient(seaTunnelHazelcastClient); } + public ClientJobExecutionEnvironment( + JobConfig jobConfig, + String jobFilePath, + List variables, + SeaTunnelHazelcastClient seaTunnelHazelcastClient, + SeaTunnelConfig seaTunnelConfig, + boolean isStartWithSavePoint, + Long jobId) { + this( + jobConfig, + jobFilePath, + variables, + seaTunnelHazelcastClient, + seaTunnelConfig, + isStartWithSavePoint, + jobId, + null); + } + public ClientJobExecutionEnvironment( JobConfig jobConfig, String jobFilePath, @@ -94,7 +117,8 @@ public ClientJobExecutionEnvironment( seaTunnelHazelcastClient, seaTunnelConfig, false, - jobId); + jobId, + null); } /** Search all jars in SEATUNNEL_HOME/plugins */ @@ -114,7 +138,8 @@ protected MultipleTableJobConfigParser getJobConfigParser() { jobConfig, commonPluginJars, isStartWithSavePoint, - pipelineCheckpoints); + pipelineCheckpoints, + connFilePath); } @VisibleForTesting diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index e3414b8dbdb..420295bcc5f 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -107,6 +107,7 @@ public class MultipleTableJobConfigParser { private final List commonPluginJars; private final Config seaTunnelJobConfig; + private final Config seaTunnelConnConfig; private final ReadonlyConfig envOptions; @@ -142,7 +143,8 @@ public MultipleTableJobConfigParser( jobConfig, commonPluginJars, isStartWithSavePoint, - Collections.emptyList()); + Collections.emptyList(), + null); } public MultipleTableJobConfigParser( @@ -152,12 +154,18 @@ public MultipleTableJobConfigParser( JobConfig jobConfig, List commonPluginJars, boolean isStartWithSavePoint, - List pipelineCheckpoints) { + List pipelineCheckpoints, + String connFilePath) { this.idGenerator = idGenerator; this.jobConfig = jobConfig; this.commonPluginJars = commonPluginJars; this.isStartWithSavePoint = isStartWithSavePoint; this.seaTunnelJobConfig = ConfigBuilder.of(Paths.get(jobDefineFilePath), variables); + if (null != connFilePath) { + this.seaTunnelConnConfig = ConfigBuilder.of(Paths.get(connFilePath), variables); + } else { + this.seaTunnelConnConfig = null; + } this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); this.pipelineCheckpoints = pipelineCheckpoints; } @@ -176,19 +184,46 @@ public MultipleTableJobConfigParser( this.seaTunnelJobConfig = seaTunnelJobConfig; this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); this.pipelineCheckpoints = pipelineCheckpoints; + this.seaTunnelConnConfig = null; + } + + private Config mergeWithConnectionConfig(Config pluginConfig, Config connConfig) { + if (pluginConfig.hasPath("conn_id")) { + String connId = pluginConfig.getString("conn_id"); + if (null != connConfig) { + Config connectionConfig = connConfig.getConfig("connection").getConfig(connId); + + if (connectionConfig == null) { + throw new IllegalArgumentException("Connection ID not found: " + connId); + } + // support conn_id replace to real connect configuration + return pluginConfig.withoutPath("conn_id").withFallback(connectionConfig); + } else { + return pluginConfig; + } + } else { + return pluginConfig; + } } public ImmutablePair, Set> parse(ClassLoaderService classLoaderService) { this.fillJobConfigAndCommonJars(); - List sourceConfigs = + ArrayList sourceConfigs = new ArrayList<>(); + for (Config sink : TypesafeConfigUtils.getConfigList( - seaTunnelJobConfig, "source", Collections.emptyList()); + seaTunnelJobConfig, "source", Collections.emptyList())) { + sourceConfigs.add(this.mergeWithConnectionConfig(sink, this.seaTunnelConnConfig)); + } + List transformConfigs = TypesafeConfigUtils.getConfigList( seaTunnelJobConfig, "transform", Collections.emptyList()); - List sinkConfigs = + ArrayList sinkConfigs = new ArrayList<>(); + for (Config sink : TypesafeConfigUtils.getConfigList( - seaTunnelJobConfig, "sink", Collections.emptyList()); + seaTunnelJobConfig, "sink", Collections.emptyList())) { + sinkConfigs.add(this.mergeWithConnectionConfig(sink, this.seaTunnelConnConfig)); + } List sourceConnectorJars = getConnectorJarList(sourceConfigs, PluginType.SOURCE); List transformConnectorJars =