Skip to content

Commit bb5851e

Browse files
authored
Merge branch 'apache:dev' into dev
2 parents 60377f4 + 068bbf7 commit bb5851e

File tree

8 files changed

+101
-11
lines changed

8 files changed

+101
-11
lines changed

Diff for: seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java

+5
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public class ClientCommandArgs extends AbstractCommandArgs {
7878
description = "Get job metrics by JobId")
7979
private String metricsJobId;
8080

81+
@Parameter(
82+
names = {"--set-job-id"},
83+
description = "Set custom job id for job")
84+
private String customJobId;
85+
8186
@Parameter(
8287
names = {"--get_running_job_metrics"},
8388
description = "Gets metrics for running jobs")

Diff for: seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,10 @@ public void execute() throws CommandExecuteException {
144144
configFile.toString(),
145145
clientCommandArgs.getVariables(),
146146
jobConfig,
147-
seaTunnelConfig);
147+
seaTunnelConfig,
148+
clientCommandArgs.getCustomJobId() != null
149+
? Long.parseLong(clientCommandArgs.getCustomJobId())
150+
: null);
148151
}
149152

150153
// get job start time

Diff for: seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ public void testExecuteClientCommandArgsWithPluginName()
4141
Assertions.assertDoesNotThrow(() -> SeaTunnel.run(clientCommandArgs.buildCommand()));
4242
}
4343

44+
@Test
45+
public void testSetJobId() throws FileNotFoundException, URISyntaxException {
46+
String configurePath = "/config/fake_to_inmemory.json";
47+
String configFile = MultiTableSinkTest.getTestConfigFile(configurePath);
48+
long jobId = 999;
49+
ClientCommandArgs clientCommandArgs = buildClientCommandArgs(configFile, jobId);
50+
Assertions.assertDoesNotThrow(() -> SeaTunnel.run(clientCommandArgs.buildCommand()));
51+
}
52+
4453
@Test
4554
public void testExecuteClientCommandArgsWithoutPluginName()
4655
throws FileNotFoundException, URISyntaxException {
@@ -58,12 +67,19 @@ public void testExecuteClientCommandArgsWithoutPluginName()
5867
commandExecuteException.getCause().getMessage());
5968
}
6069

61-
private static ClientCommandArgs buildClientCommandArgs(String configFile) {
70+
private static ClientCommandArgs buildClientCommandArgs(String configFile, Long jobId) {
6271
ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
6372
clientCommandArgs.setVariables(new ArrayList<>());
6473
clientCommandArgs.setConfigFile(configFile);
6574
clientCommandArgs.setMasterType(MasterType.LOCAL);
6675
clientCommandArgs.setCheckConfig(false);
76+
if (jobId != null) {
77+
clientCommandArgs.setCustomJobId(String.valueOf(jobId));
78+
}
6779
return clientCommandArgs;
6880
}
81+
82+
private static ClientCommandArgs buildClientCommandArgs(String configFile) {
83+
return buildClientCommandArgs(configFile, null);
84+
}
6985
}

Diff for: seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,18 @@ public ClientJobExecutionEnvironment createExecutionContext(
6464
@NonNull JobConfig jobConfig,
6565
@NonNull SeaTunnelConfig seaTunnelConfig) {
6666
return new ClientJobExecutionEnvironment(
67-
jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig);
67+
jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig, null);
68+
}
69+
70+
@Override
71+
public ClientJobExecutionEnvironment createExecutionContext(
72+
@NonNull String filePath,
73+
List<String> variables,
74+
@NonNull JobConfig jobConfig,
75+
@NonNull SeaTunnelConfig seaTunnelConfig,
76+
Long jobId) {
77+
return new ClientJobExecutionEnvironment(
78+
jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig, jobId);
6879
}
6980

7081
@Override

Diff for: seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java

+7
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ ClientJobExecutionEnvironment createExecutionContext(
3939
@NonNull JobConfig config,
4040
@NonNull SeaTunnelConfig seaTunnelConfig);
4141

42+
ClientJobExecutionEnvironment createExecutionContext(
43+
@NonNull String filePath,
44+
List<String> variables,
45+
@NonNull JobConfig config,
46+
@NonNull SeaTunnelConfig seaTunnelConfig,
47+
Long jobId);
48+
4249
ClientJobExecutionEnvironment restoreExecutionContext(
4350
@NonNull String filePath,
4451
@NonNull JobConfig config,

Diff for: seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,13 @@ public ClientJobExecutionEnvironment(
6767
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
6868
this.jobClient = new JobClient(seaTunnelHazelcastClient);
6969
this.seaTunnelConfig = seaTunnelConfig;
70-
this.jobConfig.setJobContext(
71-
new JobContext(isStartWithSavePoint ? jobId : jobClient.getNewJobId()));
70+
Long finalJobId;
71+
if (isStartWithSavePoint || jobId != null) {
72+
finalJobId = jobId;
73+
} else {
74+
finalJobId = jobClient.getNewJobId();
75+
}
76+
this.jobConfig.setJobContext(new JobContext(finalJobId));
7277
this.connectorPackageClient = new ConnectorPackageClient(seaTunnelHazelcastClient);
7378
}
7479

@@ -77,15 +82,16 @@ public ClientJobExecutionEnvironment(
7782
String jobFilePath,
7883
List<String> variables,
7984
SeaTunnelHazelcastClient seaTunnelHazelcastClient,
80-
SeaTunnelConfig seaTunnelConfig) {
85+
SeaTunnelConfig seaTunnelConfig,
86+
Long jobId) {
8187
this(
8288
jobConfig,
8389
jobFilePath,
8490
variables,
8591
seaTunnelHazelcastClient,
8692
seaTunnelConfig,
8793
false,
88-
null);
94+
jobId);
8995
}
9096

9197
/** Search all jars in SEATUNNEL_HOME/plugins */

Diff for: seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java

+39
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
4848
import lombok.extern.slf4j.Slf4j;
4949

50+
import java.util.ArrayList;
5051
import java.util.concurrent.CompletableFuture;
5152
import java.util.concurrent.ExecutionException;
5253
import java.util.concurrent.TimeUnit;
@@ -312,6 +313,44 @@ public void testCancelJob() throws ExecutionException, InterruptedException {
312313
}
313314
}
314315

316+
@Test
317+
public void testSetJobId() throws ExecutionException, InterruptedException {
318+
Common.setDeployMode(DeployMode.CLIENT);
319+
String filePath = TestUtils.getResource("/streaming_fake_to_console.conf");
320+
JobConfig jobConfig = new JobConfig();
321+
jobConfig.setName("testSetJobId");
322+
long jobId = 12345;
323+
SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
324+
JobClient jobClient = seaTunnelClient.getJobClient();
325+
try {
326+
ClientJobExecutionEnvironment jobExecutionEnv =
327+
seaTunnelClient.createExecutionContext(
328+
filePath, new ArrayList<>(), jobConfig, SEATUNNEL_CONFIG, jobId);
329+
330+
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
331+
332+
Assertions.assertEquals(jobId, clientJobProxy.getJobId());
333+
334+
await().atMost(30000, TimeUnit.MILLISECONDS)
335+
.untilAsserted(
336+
() ->
337+
Assertions.assertEquals(
338+
"RUNNING", jobClient.getJobStatus(jobId)));
339+
340+
jobClient.cancelJob(jobId);
341+
342+
await().atMost(30000, TimeUnit.MILLISECONDS)
343+
.untilAsserted(
344+
() ->
345+
Assertions.assertEquals(
346+
"CANCELED", jobClient.getJobStatus(jobId)));
347+
} catch (Exception e) {
348+
throw new RuntimeException(e);
349+
} finally {
350+
seaTunnelClient.close();
351+
}
352+
}
353+
315354
@Test
316355
public void testGetJobInfo() {
317356
Common.setDeployMode(DeployMode.CLIENT);

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/event/JobEventHttpReportHandler.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.squareup.okhttp.Request;
3434
import com.squareup.okhttp.RequestBody;
3535
import com.squareup.okhttp.Response;
36+
import com.squareup.okhttp.ResponseBody;
3637
import lombok.extern.slf4j.Slf4j;
3738

3839
import java.io.IOException;
@@ -130,10 +131,12 @@ synchronized void report() throws IOException {
130131
.post(RequestBody.create(httpMediaType, events));
131132
httpHeaders.forEach(requestBuilder::header);
132133
Response response = httpClient.newCall(requestBuilder.build()).execute();
133-
if (response.isSuccessful()) {
134-
committedEventIndex += resultSet.readCount();
135-
} else {
136-
log.error("Failed to request http server: {}", response);
134+
try (ResponseBody closeable = response.body()) {
135+
if (response.isSuccessful()) {
136+
committedEventIndex += resultSet.readCount();
137+
} else {
138+
log.error("Failed to request http server: {}", response);
139+
}
137140
}
138141
}
139142

0 commit comments

Comments
 (0)