Skip to content

Commit 9d94b6a

Browse files
authored
Merge branch 'dev' into Improvement-17025
2 parents 1fb7f69 + d850692 commit 9d94b6a

File tree

7 files changed

+190
-24
lines changed

7 files changed

+190
-24
lines changed

.github/workflows/stale.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ jobs:
3232
# Stale Issues
3333
days-before-issue-stale: 30
3434
days-before-issue-close: 7
35-
# We do not stale Issues with label Waiting for reply,DSIP,security,roadmap,help wanted,good first issue,community
36-
exempt-issue-labels: 'Waiting for reply,DSIP,security,roadmap,help wanted,good first issue,community'
35+
# We do not stale Issues with below labels
36+
exempt-issue-labels: 'Waiting for reply,DSIP,security,roadmap,help wanted,good first issue,community,priority:low,priority:middle,priority:high'
3737
stale-issue-message: >
3838
This issue has been automatically marked as stale because it has not had recent activity
3939
for 30 days. It will be closed in next 7 days if no further activity occurs.

dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_dml.sql

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ END IF;
3030
END;
3131
d//
3232

33+
delimiter ;
34+
3335
-- If the admin account is not associated with a tenant, the admin's tenant will be set to the default tenant.
3436
UPDATE `t_ds_user` SET `tenant_id` = '-1' WHERE (`user_name` = 'admin') AND (`tenant_id` = '0');
3537

36-
delimiter ;
3738
CALL dolphin_t_ds_tenant_insert_default();
3839
DROP PROCEDURE dolphin_t_ds_tenant_insert_default;
3940

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -216,13 +216,15 @@ public void cancelApplication() throws InterruptedException {
216216
return;
217217
}
218218

219-
// soft kill
220-
log.info("Begin to kill process process, pid is : {}", taskRequest.getProcessId());
221-
process.destroy();
222-
if (!process.waitFor(5, TimeUnit.SECONDS)) {
223-
process.destroyForcibly();
219+
// Try to kill process tree
220+
boolean killed = ProcessUtils.kill(taskRequest);
221+
if (killed) {
222+
log.info("Process tree for task: {} is killed or already finished, pid: {}",
223+
taskRequest.getTaskAppId(), taskRequest.getProcessId());
224+
} else {
225+
log.error("Failed to kill process tree for task: {}, pid: {}",
226+
taskRequest.getTaskAppId(), taskRequest.getProcessId());
224227
}
225-
log.info("Success kill task: {}, pid: {}", taskRequest.getTaskAppId(), taskRequest.getProcessId());
226228
}
227229

228230
private void collectPodLogIfNeeded() {

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java

+1
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ private List<String> bootstrapCommandInResourceLimitMode() {
171171
}
172172

173173
bootstrapCommand.add(String.format("--uid=%s", runUser));
174+
bootstrapCommand.add(shellAbsolutePath().toString());
174175
return bootstrapCommand;
175176
}
176177
}

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java

+57-11
Original file line numberDiff line numberDiff line change
@@ -85,31 +85,77 @@ private ProcessUtils() {
8585
private static final Pattern LINUXPATTERN = Pattern.compile("\\((\\d+)\\)");
8686

8787
/**
88-
* kill tasks according to different task types.
88+
* Terminate the task process, support multi-level signal processing and fallback strategy
89+
* @param request Task execution context
90+
* @return Whether the process was successfully terminated
8991
*/
90-
@Deprecated
9192
public static boolean kill(@NonNull TaskExecutionContext request) {
9293
try {
93-
log.info("Begin kill task instance, processId: {}", request.getProcessId());
94+
log.info("Begin killing task instance, processId: {}", request.getProcessId());
9495
int processId = request.getProcessId();
9596
if (processId == 0) {
96-
log.error("Task instance kill failed, processId is not exist");
97-
return false;
97+
log.info("Task instance has already finished, no need to kill");
98+
return true;
9899
}
99100

100-
String cmd = String.format("kill -9 %s", getPidsStr(processId));
101-
cmd = OSUtils.getSudoCmd(request.getTenantCode(), cmd);
102-
log.info("process id:{}, cmd:{}", processId, cmd);
101+
// Get all child processes
102+
String pids = getPidsStr(processId);
103+
String[] pidArray = pids.split("\\s+");
104+
if (pidArray.length == 0) {
105+
log.warn("No valid PIDs found for process: {}", processId);
106+
return true;
107+
}
108+
109+
// 1. Try to terminate gracefully (SIGINT)
110+
boolean gracefulKillSuccess = sendKillSignal("SIGINT", pids, request.getTenantCode());
111+
if (gracefulKillSuccess) {
112+
log.info("Successfully killed process tree using SIGINT, processId: {}", processId);
113+
return true;
114+
}
115+
116+
// 2. Try to terminate forcefully (SIGTERM)
117+
boolean termKillSuccess = sendKillSignal("SIGTERM", pids, request.getTenantCode());
118+
if (termKillSuccess) {
119+
log.info("Successfully killed process tree using SIGTERM, processId: {}", processId);
120+
return true;
121+
}
122+
123+
// 3. As a last resort, use `kill -9`
124+
log.warn("SIGINT & SIGTERM failed, using SIGKILL as a last resort for processId: {}", processId);
125+
boolean forceKillSuccess = sendKillSignal("SIGKILL", pids, request.getTenantCode());
126+
if (forceKillSuccess) {
127+
log.info("Successfully sent SIGKILL signal to process tree, processId: {}", processId);
128+
} else {
129+
log.error("Error sending SIGKILL signal to process tree, processId: {}", processId);
130+
}
131+
return forceKillSuccess;
103132

104-
OSUtils.exeCmd(cmd);
105-
log.info("Success kill task instance, processId: {}", request.getProcessId());
106-
return true;
107133
} catch (Exception e) {
108134
log.error("Kill task instance error, processId: {}", request.getProcessId(), e);
109135
return false;
110136
}
111137
}
112138

139+
/**
140+
* Send a kill signal to a process group
141+
* @param signal Signal type (SIGINT, SIGTERM, SIGKILL)
142+
* @param pids Process ID list
143+
* @param tenantCode Tenant code
144+
*/
145+
private static boolean sendKillSignal(String signal, String pids, String tenantCode) {
146+
try {
147+
String killCmd = String.format("kill -s %s %s", signal, pids);
148+
killCmd = OSUtils.getSudoCmd(tenantCode, killCmd);
149+
log.info("Sending {} to process group: {}, command: {}", signal, pids, killCmd);
150+
OSUtils.exeCmd(killCmd);
151+
152+
return true;
153+
} catch (Exception e) {
154+
log.error("Error sending {} to process: {}", signal, pids, e);
155+
return false;
156+
}
157+
}
158+
113159
/**
114160
* get pids str.
115161
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.plugin.task.api.shell;
19+
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
import static org.mockito.Mockito.mockStatic;
22+
23+
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
24+
import org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder;
25+
import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
26+
27+
import java.util.List;
28+
29+
import org.junit.jupiter.api.Test;
30+
import org.mockito.MockedStatic;
31+
32+
class BaseLinuxShellInterceptorBuilderTest {
33+
34+
@Test
35+
void generateBootstrapCommandTest() {
36+
BashShellInterceptorBuilder builder = new BashShellInterceptorBuilder()
37+
.shellDirectory("/tmp")
38+
.shellName("test")
39+
.runUser("root")
40+
.cpuQuota(10)
41+
.memoryQuota(1024)
42+
.sudoMode(false);
43+
try (MockedStatic<PropertyUtils> mockStatic = mockStatic(PropertyUtils.class)) {
44+
// default
45+
List<String> defaultCommands = builder.generateBootstrapCommand();
46+
assertEquals("bash /tmp/test.sh", String.join(" ", defaultCommands));
47+
48+
// sudo mode
49+
builder.sudoMode(true);
50+
List<String> sudoCommands = builder.generateBootstrapCommand();
51+
assertEquals("sudo -u root -i /tmp/test.sh", String.join(" ", sudoCommands));
52+
53+
// resource limit mode
54+
mockStatic.when(
55+
() -> PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE, false))
56+
.thenReturn(true);
57+
List<String> limitModeCommands = builder.generateBootstrapCommand();
58+
assertEquals("sudo systemd-run -q --scope -p CPUQuota=10% -p MemoryLimit=1024M --uid=root /tmp/test.sh",
59+
String.join(" ", limitModeCommands));
60+
}
61+
}
62+
}

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtilsTest.java

+58-4
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,39 @@
1919

2020
import org.apache.dolphinscheduler.common.utils.OSUtils;
2121
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
22+
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
2223

2324
import org.apache.commons.lang3.SystemUtils;
2425

26+
import org.junit.jupiter.api.AfterEach;
2527
import org.junit.jupiter.api.Assertions;
28+
import org.junit.jupiter.api.BeforeEach;
2629
import org.junit.jupiter.api.Test;
2730
import org.mockito.MockedStatic;
2831
import org.mockito.Mockito;
2932

3033
public class ProcessUtilsTest {
3134

35+
private MockedStatic<OSUtils> mockedOSUtils;
36+
37+
@BeforeEach
38+
void setUp() {
39+
mockedOSUtils = Mockito.mockStatic(OSUtils.class);
40+
}
41+
42+
@AfterEach
43+
void tearDown() {
44+
if (mockedOSUtils != null) {
45+
mockedOSUtils.close();
46+
}
47+
}
3248
@Test
3349
public void testGetPidsStr() throws Exception {
3450
// first
3551
String pids = "sudo(6279)---558_1497.sh(6282)---sleep(6354)";
3652
int processId = 6279;
3753
String exceptPidsStr = "6279 6282 6354";
3854
String command;
39-
MockedStatic<OSUtils> osUtilsMockedStatic = Mockito.mockStatic(OSUtils.class);
4055
if (SystemUtils.IS_OS_MAC) {
4156
pids = "-+= 6279 sudo -+- 6282 558_1497.sh --- 6354 sleep";
4257
command = String.format("%s -sp %d", TaskConstants.PSTREE, processId);
@@ -45,7 +60,7 @@ public void testGetPidsStr() throws Exception {
4560
} else {
4661
command = String.format("%s -p %d", TaskConstants.PSTREE, processId);
4762
}
48-
osUtilsMockedStatic.when(() -> OSUtils.exeCmd(command)).thenReturn(pids);
63+
mockedOSUtils.when(() -> OSUtils.exeCmd(command)).thenReturn(pids);
4964
String actualPidsStr = ProcessUtils.getPidsStr(processId);
5065
Assertions.assertEquals(exceptPidsStr, actualPidsStr);
5166

@@ -62,7 +77,7 @@ public void testGetPidsStr() throws Exception {
6277
} else {
6378
command2 = String.format("%s -p %d", TaskConstants.PSTREE, processId2);
6479
}
65-
osUtilsMockedStatic.when(() -> OSUtils.exeCmd(command2)).thenReturn(pids2);
80+
mockedOSUtils.when(() -> OSUtils.exeCmd(command2)).thenReturn(pids2);
6681
String actualPidsStr2 = ProcessUtils.getPidsStr(processId2);
6782
Assertions.assertEquals(exceptPidsStr2, actualPidsStr2);
6883

@@ -79,7 +94,7 @@ public void testGetPidsStr() throws Exception {
7994
} else {
8095
command3 = String.format("%s -p %d", TaskConstants.PSTREE, processId3);
8196
}
82-
osUtilsMockedStatic.when(() -> OSUtils.exeCmd(command3)).thenReturn(pids3);
97+
mockedOSUtils.when(() -> OSUtils.exeCmd(command3)).thenReturn(pids3);
8398
String actualPidsStr3 = ProcessUtils.getPidsStr(processId3);
8499
Assertions.assertEquals(exceptPidsStr3, actualPidsStr3);
85100
}
@@ -95,4 +110,43 @@ public void tetRemoveK8sClientCache() {
95110
});
96111
}
97112

113+
@Test
114+
void testKillProcessSuccessWithSigInt() throws Exception {
115+
// Arrange
116+
TaskExecutionContext taskRequest = Mockito.mock(TaskExecutionContext.class);
117+
Mockito.when(taskRequest.getProcessId()).thenReturn(12345);
118+
Mockito.when(taskRequest.getTenantCode()).thenReturn("testTenant");
119+
120+
// Mock getPidsStr
121+
mockedOSUtils.when(() -> OSUtils.exeCmd(Mockito.matches(".*pstree.*12345"))).thenReturn("1234 12345");
122+
123+
// Mock SIGINT command
124+
mockedOSUtils.when(() -> OSUtils.getSudoCmd(Mockito.eq("testTenant"), Mockito.matches("kill -s SIGINT.*")))
125+
.thenReturn("kill -s SIGINT 12345");
126+
mockedOSUtils.when(() -> OSUtils.exeCmd("kill -s SIGINT 12345")).thenReturn("");
127+
128+
// Mock process check - process dies after SIGINT
129+
mockedOSUtils.when(() -> OSUtils.exeCmd("ps -p 12345")).thenReturn(null);
130+
131+
// Act
132+
boolean result = ProcessUtils.kill(taskRequest);
133+
134+
// Assert
135+
Assertions.assertTrue(result);
136+
// Verify SIGKILL was never called
137+
mockedOSUtils.verify(() -> OSUtils.exeCmd("kill -9 12345"), Mockito.never());
138+
}
139+
140+
@Test
141+
void testKillNonExistentProcess() {
142+
// Arrange
143+
TaskExecutionContext taskRequest = Mockito.mock(TaskExecutionContext.class);
144+
Mockito.when(taskRequest.getProcessId()).thenReturn(0);
145+
146+
// Act
147+
boolean result = ProcessUtils.kill(taskRequest);
148+
149+
// Assert
150+
Assertions.assertTrue(result);
151+
}
98152
}

0 commit comments

Comments
 (0)