Skip to content

Commit 809870a

Browse files
authored
[Improve][RestAPI] return finished job info when job is finished (#6576)
1 parent ddca95f commit 809870a

File tree

3 files changed

+104
-6
lines changed

3 files changed

+104
-6
lines changed

Diff for: docs/en/seatunnel-engine/rest-api.md

+12-6
Original file line numberDiff line numberDiff line change
@@ -92,25 +92,31 @@ network:
9292
"jobId": "",
9393
"jobName": "",
9494
"jobStatus": "",
95-
"envOptions": {
96-
},
9795
"createTime": "",
9896
"jobDag": {
9997
"vertices": [
10098
],
10199
"edges": [
102100
]
103101
},
104-
"pluginJarsUrls": [
105-
],
106-
"isStartWithSavePoint": false,
107102
"metrics": {
108103
"sourceReceivedCount": "",
109104
"sinkWriteCount": ""
110-
}
105+
},
106+
"finishedTime": "",
107+
"errorMsg": null,
108+
"envOptions": {
109+
},
110+
"pluginJarsUrls": [
111+
],
112+
"isStartWithSavePoint": false
111113
}
112114
```
113115

116+
`jobId`, `jobName`, `jobStatus`, `createTime`, `jobDag`, `metrics` always be returned.
117+
`envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` will return when job is running.
118+
`finishedTime`, `errorMsg` will return when job is finished.
119+
114120
When we can't get the job info, the response will be:
115121

116122
```json

Diff for: seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java

+36
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public class RestApiIT {
5252

5353
private static ClientJobProxy clientJobProxy;
5454

55+
private static ClientJobProxy batchJobProxy;
56+
5557
private static HazelcastInstanceImpl node1;
5658

5759
private static HazelcastInstanceImpl node2;
@@ -85,6 +87,19 @@ void beforeClass() throws Exception {
8587
() ->
8688
Assertions.assertEquals(
8789
JobStatus.RUNNING, clientJobProxy.getJobStatus()));
90+
91+
String batchFilePath = TestUtils.getResource("fakesource_to_console.conf");
92+
JobConfig batchConf = new JobConfig();
93+
batchConf.setName("fake_to_console");
94+
ClientJobExecutionEnvironment batchJobExecutionEnv =
95+
engineClient.createExecutionContext(batchFilePath, batchConf, seaTunnelConfig);
96+
batchJobProxy = batchJobExecutionEnv.execute();
97+
Awaitility.await()
98+
.atMost(5, TimeUnit.MINUTES)
99+
.untilAsserted(
100+
() ->
101+
Assertions.assertEquals(
102+
JobStatus.FINISHED, batchJobProxy.getJobStatus()));
88103
}
89104

90105
@Test
@@ -108,6 +123,27 @@ public void testGetRunningJobById() {
108123
});
109124
}
110125

126+
@Test
127+
public void testGetJobById() {
128+
Arrays.asList(node2, node1)
129+
.forEach(
130+
instance -> {
131+
given().get(
132+
HOST
133+
+ instance.getCluster()
134+
.getLocalMember()
135+
.getAddress()
136+
.getPort()
137+
+ RestConstant.RUNNING_JOB_URL
138+
+ "/"
139+
+ batchJobProxy.getJobId())
140+
.then()
141+
.statusCode(200)
142+
.body("jobName", equalTo("fake_to_console"))
143+
.body("jobStatus", equalTo("FINISHED"));
144+
});
145+
}
146+
111147
@Test
112148
public void testGetAnNotExistJobById() {
113149
Arrays.asList(node2, node1)

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java

+56
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,36 @@ private void handleJobInfoById(HttpGetCommand command, String uri) {
272272
.getMap(Constant.IMAP_RUNNING_JOB_INFO)
273273
.get(Long.valueOf(jobId));
274274

275+
JobState finishedJobState =
276+
(JobState)
277+
this.textCommandService
278+
.getNode()
279+
.getNodeEngine()
280+
.getHazelcastInstance()
281+
.getMap(Constant.IMAP_FINISHED_JOB_STATE)
282+
.get(Long.valueOf(jobId));
275283
if (!jobId.isEmpty() && jobInfo != null) {
276284
this.prepareResponse(command, convertToJson(jobInfo, Long.parseLong(jobId)));
285+
} else if (!jobId.isEmpty() && finishedJobState != null) {
286+
JobMetrics finishedJobMetrics =
287+
(JobMetrics)
288+
this.textCommandService
289+
.getNode()
290+
.getNodeEngine()
291+
.getHazelcastInstance()
292+
.getMap(Constant.IMAP_FINISHED_JOB_METRICS)
293+
.get(Long.valueOf(jobId));
294+
JobDAGInfo finishedJobDAGInfo =
295+
(JobDAGInfo)
296+
this.textCommandService
297+
.getNode()
298+
.getNodeEngine()
299+
.getHazelcastInstance()
300+
.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO)
301+
.get(Long.valueOf(jobId));
302+
this.prepareResponse(
303+
command,
304+
convertToJson(finishedJobState, finishedJobMetrics, finishedJobDAGInfo));
277305
} else {
278306
this.prepareResponse(command, new JsonObject().add(RestConstant.JOB_ID, jobId));
279307
}
@@ -411,6 +439,34 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) {
411439
return jobInfoJson;
412440
}
413441

442+
private JsonObject convertToJson(
443+
JobState finishedJobState,
444+
JobMetrics finishedJobMetrics,
445+
JobDAGInfo finishedJobDAGInfo) {
446+
JsonObject jobInfoJson = new JsonObject();
447+
jobInfoJson
448+
.add(RestConstant.JOB_ID, String.valueOf(finishedJobState.getJobId()))
449+
.add(RestConstant.JOB_NAME, finishedJobState.getJobName())
450+
.add(RestConstant.JOB_STATUS, finishedJobState.getJobStatus().toString())
451+
.add(RestConstant.ERROR_MSG, finishedJobState.getErrorMessage())
452+
.add(
453+
RestConstant.CREATE_TIME,
454+
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
455+
.format(new Date(finishedJobState.getSubmitTime())))
456+
.add(
457+
RestConstant.FINISH_TIME,
458+
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
459+
.format(new Date(finishedJobState.getFinishTime())))
460+
.add(
461+
RestConstant.JOB_DAG,
462+
Json.parse(JsonUtils.toJsonString(finishedJobDAGInfo)).asObject())
463+
.add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
464+
.add(
465+
RestConstant.METRICS,
466+
JsonUtil.toJsonObject(getJobMetrics(finishedJobMetrics.toJsonString())));
467+
return jobInfoJson;
468+
}
469+
414470
private JsonObject convertToJson(
415471
JobState jobState, String jobMetrics, JsonObject jobDAGInfo, long jobId) {
416472
JsonObject jobInfoJson = new JsonObject();

0 commit comments

Comments
 (0)