Skip to content

Commit 3b54837

Browse files
[Fix][Core] Cancel pending job (#8993)
1 parent 8a48cee commit 3b54837

File tree

4 files changed

+277
-4
lines changed

4 files changed

+277
-4
lines changed

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public class CoordinatorService {
156156
* key: job id; <br>
157157
* value: job master;
158158
*/
159-
private final Map<Long, Tuple2<PendingSourceState, JobMaster>> pendingJobMasterMap =
159+
protected final Map<Long, Tuple2<PendingSourceState, JobMaster>> pendingJobMasterMap =
160160
new ConcurrentHashMap<>();
161161

162162
/**
@@ -248,12 +248,19 @@ private void pendingJobSchedule() throws InterruptedException {
248248
Thread.sleep(3000);
249249
return;
250250
}
251+
252+
Long jobId = jobMaster.getJobId();
253+
254+
if (!pendingJobMasterMap.containsKey(jobId)) {
255+
logger.fine(String.format("Job ID : %s already cancelled", jobId));
256+
queueRemove(jobMaster);
257+
return;
258+
}
259+
251260
logger.fine(
252261
String.format(
253262
"Start pending job schedule, pendingJob Size : %s", pendingJob.size()));
254263

255-
Long jobId = jobMaster.getJobId();
256-
257264
logger.fine(
258265
String.format(
259266
"Start calculating whether pending task resources are enough: %s", jobId));
@@ -744,6 +751,11 @@ public PassiveCompletableFuture<Void> cancelJob(long jobId) {
744751
future.complete(null);
745752
return new PassiveCompletableFuture<>(future);
746753
} else {
754+
// Cancel pending tasks
755+
if (pendingJobMasterMap.containsKey(jobId)) {
756+
pendingJobMasterMap.remove(jobId);
757+
logger.fine(String.format("Cancel pending tasks : %s", jobId));
758+
}
747759
return new PassiveCompletableFuture<>(
748760
CompletableFuture.supplyAsync(
749761
() -> {

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,13 @@ public void cancelJob() {
199199
return;
200200
}
201201

202-
updateJobState(JobStatus.CANCELING);
202+
if (runningJobStateIMap.get(jobId) == JobStatus.PENDING) {
203+
// The pending task needs to be directly set to 'cancelled' status because it has not
204+
// started running yet
205+
updateJobState(JobStatus.CANCELED);
206+
} else {
207+
updateJobState(JobStatus.CANCELING);
208+
}
203209
}
204210

205211
public void savepointJob() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.engine.server;
19+
20+
import org.apache.seatunnel.engine.common.config.EngineConfig;
21+
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
22+
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
23+
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
24+
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
25+
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
26+
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
27+
import org.apache.seatunnel.engine.core.job.JobInfo;
28+
import org.apache.seatunnel.engine.core.job.JobStatus;
29+
import org.apache.seatunnel.engine.core.job.PipelineStatus;
30+
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
31+
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
32+
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
33+
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
34+
import org.apache.seatunnel.engine.server.master.JobMaster;
35+
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
36+
37+
import org.junit.jupiter.api.Assertions;
38+
import org.junit.jupiter.api.BeforeAll;
39+
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.TestInstance;
41+
import org.junit.jupiter.api.condition.DisabledOnOs;
42+
import org.junit.jupiter.api.condition.OS;
43+
44+
import com.hazelcast.config.Config;
45+
import com.hazelcast.internal.serialization.Data;
46+
import com.hazelcast.map.IMap;
47+
48+
import java.util.Collections;
49+
import java.util.Map;
50+
import java.util.concurrent.TimeUnit;
51+
52+
import static org.awaitility.Awaitility.await;
53+
54+
/** JobMaster Tester. */
55+
@DisabledOnOs(OS.WINDOWS)
56+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
57+
public class CoordinatorServiceWithCancelPendingJobTest extends AbstractSeaTunnelServerTest {
58+
/**
59+
* IMap key is jobId and value is a Tuple2 Tuple2 key is JobMaster init timestamp and value is
60+
* the jobImmutableInformation which is sent by client when submit job
61+
*
62+
* <p>This IMap is used to recovery runningJobInfoIMap in JobMaster when a new master node
63+
* active
64+
*/
65+
private IMap<Long, JobInfo> runningJobInfoIMap;
66+
67+
/**
68+
* IMap key is one of jobId {@link PipelineLocation} and {@link TaskGroupLocation}
69+
*
70+
* <p>The value of IMap is one of {@link JobStatus} {@link PipelineStatus} {@link
71+
* org.apache.seatunnel.engine.server.execution.ExecutionState}
72+
*
73+
* <p>This IMap is used to recovery runningJobStateIMap in JobMaster when a new master node
74+
* active
75+
*/
76+
IMap<Object, Object> runningJobStateIMap;
77+
78+
/**
79+
* IMap key is one of jobId {@link PipelineLocation} and {@link TaskGroupLocation}
80+
*
81+
* <p>The value of IMap is one of {@link
82+
* org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan} stateTimestamps {@link SubPlan}
83+
* stateTimestamps {@link PhysicalVertex} stateTimestamps
84+
*
85+
* <p>This IMap is used to recovery runningJobStateTimestampsIMap in JobMaster when a new master
86+
* node active
87+
*/
88+
IMap<Object, Long[]> runningJobStateTimestampsIMap;
89+
90+
/**
91+
* IMap key is {@link PipelineLocation}
92+
*
93+
* <p>The value of IMap is map of {@link TaskGroupLocation} and the {@link SlotProfile} it used.
94+
*
95+
* <p>This IMap is used to recovery ownedSlotProfilesIMap in JobMaster when a new master node
96+
* active
97+
*/
98+
private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;
99+
100+
@BeforeAll
101+
public void before() {
102+
String name = this.getClass().getName();
103+
Config hazelcastConfig = Config.loadFromString(getHazelcastConfig());
104+
hazelcastConfig.setClusterName(
105+
TestUtils.getClusterName("AbstractSeaTunnelServerTest_" + name));
106+
SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig();
107+
seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
108+
EngineConfig engineConfig = seaTunnelConfig.getEngineConfig();
109+
engineConfig.setMode(ExecutionMode.LOCAL);
110+
engineConfig.setScheduleStrategy(ScheduleStrategy.WAIT);
111+
engineConfig.getSlotServiceConfig().setDynamicSlot(false);
112+
engineConfig.getSlotServiceConfig().setSlotNum(1);
113+
instance = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
114+
nodeEngine = instance.node.nodeEngine;
115+
server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
116+
LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class);
117+
}
118+
119+
@Test
120+
public void testCancelPendingJob() throws InterruptedException {
121+
122+
long jobId = instance.getFlakeIdGenerator("testCancelPendingJob").newId();
123+
JobMaster jobMaster = newJobInstanceWithRunningState(jobId);
124+
125+
// Verify that the task is pending
126+
Assertions.assertTrue(
127+
server.getCoordinatorService().pendingJobMasterMap.containsKey(jobId));
128+
129+
// Cancel Task
130+
PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
131+
server.getCoordinatorService().cancelJob(jobId);
132+
voidPassiveCompletableFuture.join();
133+
134+
// Verify if the task has been deleted in pending
135+
Assertions.assertFalse(
136+
server.getCoordinatorService().pendingJobMasterMap.containsKey(jobId));
137+
138+
// Verify if the final status of the task is cancelled
139+
await().atMost(120, TimeUnit.SECONDS)
140+
.untilAsserted(
141+
() ->
142+
Assertions.assertEquals(
143+
JobStatus.CANCELED, jobMaster.getJobStatus()));
144+
}
145+
146+
private JobMaster newJobInstanceWithRunningState(long jobId) throws InterruptedException {
147+
return newJobInstanceWithRunningState(jobId, false);
148+
}
149+
150+
private JobMaster newJobInstanceWithRunningState(long jobId, boolean restore)
151+
throws InterruptedException {
152+
LogicalDag testLogicalDag =
153+
TestUtils.createTestLogicalPlan(
154+
"cancel_pending_job.conf", "cancel_pending_job", jobId);
155+
156+
JobImmutableInformation jobImmutableInformation =
157+
new JobImmutableInformation(
158+
jobId,
159+
"Test",
160+
restore,
161+
nodeEngine.getSerializationService(),
162+
testLogicalDag,
163+
Collections.emptyList(),
164+
Collections.emptyList());
165+
166+
Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);
167+
168+
PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
169+
server.getCoordinatorService()
170+
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint());
171+
voidPassiveCompletableFuture.join();
172+
173+
JobMaster jobMaster = server.getCoordinatorService().getJobMaster(jobId);
174+
175+
// waiting for job status turn to running
176+
await().atMost(120, TimeUnit.SECONDS)
177+
.untilAsserted(
178+
() -> Assertions.assertEquals(JobStatus.PENDING, jobMaster.getJobStatus()));
179+
180+
// Because handleCheckpointTimeout is an async method, so we need sleep 5s to waiting job
181+
// status become running again
182+
Thread.sleep(5000);
183+
return jobMaster;
184+
}
185+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
parallelism = 1
23+
job.mode = "BATCH"
24+
}
25+
26+
source {
27+
# This is a example source plugin **only for test and demonstrate the feature source plugin**
28+
FakeSource {
29+
plugin_output = "fake"
30+
schema = {
31+
fields {
32+
name = "string"
33+
age = "int"
34+
}
35+
}
36+
parallelism = 1
37+
}
38+
39+
FakeSource {
40+
plugin_output = "fake2"
41+
schema = {
42+
fields {
43+
name = "string"
44+
age = "int"
45+
}
46+
}
47+
parallelism = 1
48+
}
49+
}
50+
51+
transform {
52+
}
53+
54+
sink {
55+
LocalFile {
56+
path="/tmp/hive/warehouse/test2"
57+
field_delimiter="\t"
58+
row_delimiter="\n"
59+
partition_by=["age"]
60+
partition_dir_expression="${k0}=${v0}"
61+
is_partition_field_write_in_file=true
62+
file_name_expression="${transactionId}_${now}"
63+
file_format_type="text"
64+
sink_columns=["name","age"]
65+
filename_time_format="yyyy.MM.dd"
66+
is_enable_transaction=true
67+
save_mode="error",
68+
plugin_input="fake,fake2"
69+
}
70+
}

0 commit comments

Comments
 (0)