Skip to content

Commit 10d6199

Browse files
brave-leeJinyLeeChina
andauthored
[Fix-12966] Failed to export and then import the project (#13455)
* fix 13347 and 12966 * fix switch Co-authored-by: JinyLeeChina <[email protected]>
1 parent d95cf93 commit 10d6199

File tree

10 files changed

+130
-63
lines changed

10 files changed

+130
-63
lines changed

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,16 @@
4343
import org.apache.dolphinscheduler.common.graph.DAG;
4444
import org.apache.dolphinscheduler.common.model.TaskNode;
4545
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
46+
import org.apache.dolphinscheduler.common.task.AbstractParameters;
47+
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
48+
import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
49+
import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
4650
import org.apache.dolphinscheduler.common.thread.Stopper;
4751
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
4852
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
4953
import org.apache.dolphinscheduler.common.utils.DateUtils;
5054
import org.apache.dolphinscheduler.common.utils.JSONUtils;
55+
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
5156
import org.apache.dolphinscheduler.dao.entity.DagData;
5257
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
5358
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
@@ -107,6 +112,7 @@
107112

108113
import com.baomidou.mybatisplus.core.metadata.IPage;
109114
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
115+
import com.fasterxml.jackson.core.type.TypeReference;
110116
import com.fasterxml.jackson.databind.JsonNode;
111117
import com.fasterxml.jackson.databind.node.ArrayNode;
112118
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -962,6 +968,7 @@ private boolean checkAndImport(User loginUser, long projectCode, Map<String, Obj
962968
taskDefinitionLog.setUpdateTime(now);
963969
taskDefinitionLog.setOperator(loginUser.getId());
964970
taskDefinitionLog.setOperateTime(now);
971+
taskDefinitionLog.setTaskParams(taskDefinition.getTaskParams());
965972
try {
966973
long code = CodeGenerateUtils.getInstance().genCode();
967974
taskCodeMap.put(taskDefinitionLog.getCode(), code);
@@ -973,6 +980,7 @@ private boolean checkAndImport(User loginUser, long projectCode, Map<String, Obj
973980
}
974981
taskDefinitionLogList.add(taskDefinitionLog);
975982
}
983+
taskDefinitionLogList.forEach(v -> v.setTaskParams(resetImportTaskParams(taskCodeMap, v)));
976984
int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogList);
977985
int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList);
978986
if ((logInsert & insert) == 0) {
@@ -1035,6 +1043,64 @@ private boolean checkAndImport(User loginUser, long projectCode, Map<String, Obj
10351043
return true;
10361044
}
10371045

1046+
private String resetImportTaskParams(Map<Long, Long> taskCodeMap, TaskDefinitionLog taskDefinition) {
1047+
String taskType = taskDefinition.getTaskType();
1048+
if (!TaskType.CONDITIONS.getDesc().equals(taskType) && !TaskType.SWITCH.getDesc().equals(taskType)) {
1049+
return taskDefinition.getTaskParams();
1050+
}
1051+
1052+
Map<String, Object> taskParamsMap = JSONUtils.parseObject(taskDefinition.getTaskParams(), new TypeReference<Map<String, Object>>() {});
1053+
if (taskParamsMap == null) {
1054+
taskParamsMap = new HashMap<>();
1055+
}
1056+
AbstractParameters switchParameters = TaskParametersUtils.getParameters(TaskType.SWITCH.getDesc(), JSONUtils.toJsonString(taskParamsMap.get(Constants.SWITCH_RESULT)));
1057+
if (switchParameters != null) {
1058+
taskParamsMap.put(Constants.SWITCH_RESULT, resetImportSwitchTaskParams(taskCodeMap, switchParameters));
1059+
}
1060+
AbstractParameters conditionParameters = TaskParametersUtils.getParameters(TaskType.CONDITIONS.getDesc(), JSONUtils.toJsonString(taskParamsMap.get(Constants.CONDITION_RESULT)));
1061+
if (conditionParameters != null) {
1062+
taskParamsMap.put(Constants.CONDITION_RESULT, resetImportConditionTaskParams(taskCodeMap, conditionParameters));
1063+
}
1064+
return JSONUtils.toJsonString(taskParamsMap);
1065+
}
1066+
1067+
private AbstractParameters resetImportSwitchTaskParams(Map<Long, Long> taskCodeMap, AbstractParameters parameter) {
1068+
SwitchParameters switchParameters = (SwitchParameters) parameter;
1069+
List<SwitchResultVo> dependTaskList = switchParameters.getDependTaskList();
1070+
if (CollectionUtils.isEmpty(dependTaskList)) {
1071+
return switchParameters;
1072+
}
1073+
for (SwitchResultVo resultVo : dependTaskList) {
1074+
Long nextNode = resultVo.getNextNode();
1075+
resultVo.setNextNode(taskCodeMap.get(nextNode));
1076+
}
1077+
Long nextNode = switchParameters.getNextNode();
1078+
switchParameters.setNextNode(taskCodeMap.get(nextNode));
1079+
return switchParameters;
1080+
}
1081+
1082+
private AbstractParameters resetImportConditionTaskParams(Map<Long, Long> taskCodeMap, AbstractParameters parameter) {
1083+
ConditionsParameters conditionsParameters = (ConditionsParameters) parameter;
1084+
List<Long> originalSuccessNode = conditionsParameters.getSuccessNode();
1085+
List<Long> originalFailedNode = conditionsParameters.getFailedNode();
1086+
if (CollectionUtils.isEmpty(originalSuccessNode) || CollectionUtils.isEmpty(originalFailedNode)) {
1087+
return conditionsParameters;
1088+
}
1089+
List<Long> resultSuccessNode = new ArrayList<>();
1090+
List<Long> resultFailedNode = new ArrayList<>();
1091+
1092+
if (CollectionUtils.isNotEmpty(originalSuccessNode)) {
1093+
originalSuccessNode.forEach(v -> resultSuccessNode.add(taskCodeMap.get(v)));
1094+
}
1095+
if (CollectionUtils.isNotEmpty(originalFailedNode)) {
1096+
originalFailedNode.forEach(v -> resultFailedNode.add(taskCodeMap.get(v)));
1097+
}
1098+
1099+
conditionsParameters.setSuccessNode(resultSuccessNode);
1100+
conditionsParameters.setFailedNode(resultFailedNode);
1101+
return conditionsParameters;
1102+
}
1103+
10381104
/**
10391105
* check importance params
10401106
*/

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ public class ConditionsParameters extends AbstractParameters {
3232
private DependentRelation dependRelation;
3333

3434
// node list to run when success
35-
private List<String> successNode;
35+
private List<Long> successNode;
3636

3737
// node list to run when failed
38-
private List<String> failedNode;
38+
private List<Long> failedNode;
3939

4040
@Override
4141
public boolean checkParameters() {
@@ -63,19 +63,19 @@ public void setDependRelation(DependentRelation dependRelation) {
6363
this.dependRelation = dependRelation;
6464
}
6565

66-
public List<String> getSuccessNode() {
66+
public List<Long> getSuccessNode() {
6767
return successNode;
6868
}
6969

70-
public void setSuccessNode(List<String> successNode) {
70+
public void setSuccessNode(List<Long> successNode) {
7171
this.successNode = successNode;
7272
}
7373

74-
public List<String> getFailedNode() {
74+
public List<Long> getFailedNode() {
7575
return failedNode;
7676
}
7777

78-
public void setFailedNode(List<String> failedNode) {
78+
public void setFailedNode(List<Long> failedNode) {
7979
this.failedNode = failedNode;
8080
}
8181

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@
2121
import org.apache.dolphinscheduler.common.process.ResourceInfo;
2222
import org.apache.dolphinscheduler.common.task.AbstractParameters;
2323

24+
import org.apache.commons.collections.CollectionUtils;
25+
2426
import java.util.ArrayList;
2527
import java.util.List;
2628

2729
public class SwitchParameters extends AbstractParameters {
2830

2931
private DependentRelation dependRelation;
3032
private String relation;
31-
private List<String> nextNode;
33+
private Long nextNode;
3234
private int resultConditionLocation;
3335
private List<SwitchResultVo> dependTaskList;
3436

@@ -74,21 +76,18 @@ public void setDependTaskList(List<SwitchResultVo> dependTaskList) {
7476
this.dependTaskList = dependTaskList;
7577
}
7678

77-
public List<String> getNextNode() {
79+
public Long getNextNode() {
7880
return nextNode;
7981
}
8082

8183
public void setNextNode(Object nextNode) {
82-
if (nextNode instanceof String) {
83-
List<String> nextNodeList = new ArrayList<>();
84-
nextNodeList.add(String.valueOf(nextNode));
85-
this.nextNode = nextNodeList;
86-
} else if (nextNode instanceof Number) {
87-
List<String> nextNodeList = new ArrayList<>();
88-
nextNodeList.add(nextNode.toString());
89-
this.nextNode = nextNodeList;
84+
if (nextNode instanceof Long) {
85+
this.nextNode = (Long) nextNode;
9086
} else {
91-
this.nextNode = (ArrayList) nextNode;
87+
List<String> nextNodes = (ArrayList) nextNode;
88+
if (CollectionUtils.isNotEmpty(nextNodes)) {
89+
this.nextNode = Long.parseLong(nextNodes.get(0));
90+
}
9291
}
9392
}
9493
}

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package org.apache.dolphinscheduler.common.task.switchtask;
1919

20+
import org.apache.commons.collections.CollectionUtils;
21+
2022
import java.util.ArrayList;
2123
import java.util.List;
2224

2325
public class SwitchResultVo {
2426

2527
private String condition;
26-
private List<String> nextNode;
28+
private Long nextNode;
2729

2830
public String getCondition() {
2931
return condition;
@@ -33,21 +35,18 @@ public void setCondition(String condition) {
3335
this.condition = condition;
3436
}
3537

36-
public List<String> getNextNode() {
38+
public Long getNextNode() {
3739
return nextNode;
3840
}
3941

4042
public void setNextNode(Object nextNode) {
41-
if (nextNode instanceof String) {
42-
List<String> nextNodeList = new ArrayList<>();
43-
nextNodeList.add(String.valueOf(nextNode));
44-
this.nextNode = nextNodeList;
45-
} else if (nextNode instanceof Number) {
46-
List<String> nextNodeList = new ArrayList<>();
47-
nextNodeList.add(nextNode.toString());
48-
this.nextNode = nextNodeList;
43+
if (nextNode instanceof Long) {
44+
this.nextNode = (Long) nextNode;
4945
} else {
50-
this.nextNode = (ArrayList) nextNode;
46+
List<String> nextNodes = (ArrayList) nextNode;
47+
if (CollectionUtils.isNotEmpty(nextNodes)) {
48+
this.nextNode = Long.parseLong(nextNodes.get(0));
49+
}
5150
}
5251
}
5352
}

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ public void setTaskParamMap(Map<String, String> taskParamMap) {
294294
public Map<String, String> getTaskParamMap() {
295295
if (taskParamMap == null && StringUtils.isNotEmpty(taskParams)) {
296296
JsonNode localParams = JSONUtils.parseObject(taskParams).findValue("localParams");
297-
if (localParams != null) {
297+
if (localParams != null && localParams.size() > 0) {
298298
List<Property> propList = JSONUtils.toList(localParams.toString(), Property.class);
299299
taskParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
300300
}

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.List;
3939
import java.util.Map;
4040
import java.util.Set;
41+
import java.util.stream.Collectors;
4142

4243
import org.slf4j.Logger;
4344
import org.slf4j.LoggerFactory;
@@ -363,13 +364,16 @@ public static List<String> parseConditionTask(String nodeCode,
363364
TaskInstance taskInstance = completeTaskList.get(nodeCode);
364365
ConditionsParameters conditionsParameters =
365366
JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
367+
if (conditionsParameters == null) {
368+
return conditionTaskList;
369+
}
366370
List<String> skipNodeList = new ArrayList<>();
367371
if (taskInstance.getState().typeIsSuccess()) {
368-
conditionTaskList = conditionsParameters.getSuccessNode();
369-
skipNodeList = conditionsParameters.getFailedNode();
372+
conditionTaskList = conditionsParameters.getSuccessNode().stream().map(String::valueOf).collect(Collectors.toList());
373+
skipNodeList = conditionsParameters.getFailedNode().stream().map(String::valueOf).collect(Collectors.toList());
370374
} else if (taskInstance.getState().typeIsFailure()) {
371-
conditionTaskList = conditionsParameters.getFailedNode();
372-
skipNodeList = conditionsParameters.getSuccessNode();
375+
conditionTaskList = conditionsParameters.getFailedNode().stream().map(String::valueOf).collect(Collectors.toList());
376+
skipNodeList = conditionsParameters.getSuccessNode().stream().map(String::valueOf).collect(Collectors.toList());
373377
} else {
374378
conditionTaskList.add(nodeCode);
375379
}
@@ -395,27 +399,28 @@ public static List<String> parseSwitchTask(String nodeCode,
395399
if (!completeTaskList.containsKey(nodeCode)) {
396400
return conditionTaskList;
397401
}
398-
conditionTaskList = skipTaskNode4Switch(taskNode, skipTaskNodeList, completeTaskList, dag);
402+
conditionTaskList.add(String.valueOf(skipTaskNode4Switch(taskNode, skipTaskNodeList, completeTaskList, dag)));
399403
return conditionTaskList;
400404
}
401405

402-
private static List<String> skipTaskNode4Switch(TaskNode taskNode, Map<String, TaskNode> skipTaskNodeList,
403-
Map<String, TaskInstance> completeTaskList,
404-
DAG<String, TaskNode, TaskNodeRelation> dag) {
406+
private static Long skipTaskNode4Switch(TaskNode taskNode, Map<String, TaskNode> skipTaskNodeList,
407+
Map<String, TaskInstance> completeTaskList,
408+
DAG<String, TaskNode, TaskNodeRelation> dag) {
405409

406410
SwitchParameters switchParameters = completeTaskList.get(Long.toString(taskNode.getCode())).getSwitchDependency();
407411
int resultConditionLocation = switchParameters.getResultConditionLocation();
408412
List<SwitchResultVo> conditionResultVoList = switchParameters.getDependTaskList();
409-
List<String> switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode();
410-
if (CollectionUtils.isEmpty(switchTaskList)) {
411-
switchTaskList = new ArrayList<>();
413+
Long switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode();
414+
if (switchTaskList == null) {
415+
switchTaskList = 0L;
412416
}
413417
conditionResultVoList.remove(resultConditionLocation);
414418
for (SwitchResultVo info : conditionResultVoList) {
415-
if (CollectionUtils.isEmpty(info.getNextNode())) {
419+
Long nextNode = info.getNextNode();
420+
if (nextNode == null || nextNode == 0L) {
416421
continue;
417422
}
418-
setTaskNodeSkip(info.getNextNode().get(0), dag, completeTaskList, skipTaskNodeList);
423+
setTaskNodeSkip(String.valueOf(nextNode), dag, completeTaskList, skipTaskNodeList);
419424
}
420425
return switchTaskList;
421426
}

dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -446,15 +446,15 @@ private SwitchParameters getSwitchNode() {
446446
SwitchParameters conditionsParameters = new SwitchParameters();
447447
SwitchResultVo switchResultVo1 = new SwitchResultVo();
448448
switchResultVo1.setCondition(" 2 == 1");
449-
switchResultVo1.setNextNode("2");
449+
switchResultVo1.setNextNode(2L);
450450
SwitchResultVo switchResultVo2 = new SwitchResultVo();
451451
switchResultVo2.setCondition(" 2 == 2");
452-
switchResultVo2.setNextNode("4");
452+
switchResultVo2.setNextNode(4L);
453453
List<SwitchResultVo> list = new ArrayList<>();
454454
list.add(switchResultVo1);
455455
list.add(switchResultVo2);
456456
conditionsParameters.setDependTaskList(list);
457-
conditionsParameters.setNextNode("5");
457+
conditionsParameters.setNextNode(5L);
458458
conditionsParameters.setRelation("AND");
459459

460460
// in: AND(AND(1 is SUCCESS))

dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.server.master.runner.task;
1919

20+
import org.apache.dolphinscheduler.common.Constants;
2021
import org.apache.dolphinscheduler.common.enums.DependResult;
2122
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
2223
import org.apache.dolphinscheduler.common.enums.TaskType;
@@ -30,7 +31,6 @@
3031
import org.apache.dolphinscheduler.server.utils.LogUtils;
3132
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
3233

33-
import org.apache.commons.collections4.CollectionUtils;
3434
import org.apache.commons.lang.StringUtils;
3535

3636
import java.util.Date;
@@ -225,9 +225,15 @@ public String setTaskParams(String content, String rgex) {
225225
}
226226
while (m.find()) {
227227
String paramName = m.group(1);
228-
Property property = globalParams.get(paramName);
228+
Property property = globalParams.get(Constants.START_UP_PARAMS_PREFIX + paramName);
229229
if (property == null) {
230-
return "";
230+
property = globalParams.get(paramName);
231+
if (property == null) {
232+
property = globalParams.get(Constants.GLOBAL_PARAMS_PREFIX + paramName);
233+
if (property == null) {
234+
return "";
235+
}
236+
}
231237
}
232238
String value = property.getValue();
233239
if (!org.apache.commons.lang.math.NumberUtils.isNumber(value)) {
@@ -243,15 +249,7 @@ public String setTaskParams(String content, String rgex) {
243249
* check whether switch result is valid
244250
*/
245251
private boolean isValidSwitchResult(SwitchResultVo switchResult) {
246-
if (CollectionUtils.isEmpty(switchResult.getNextNode())) {
247-
return false;
248-
}
249-
for (String nextNode : switchResult.getNextNode()) {
250-
if (StringUtils.isEmpty(nextNode)) {
251-
return false;
252-
}
253-
}
254-
return true;
252+
return switchResult.getNextNode() != null && switchResult.getNextNode() != 0L;
255253
}
256254

257255
}

dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,8 @@ private TaskNode getTaskNode() {
155155
taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
156156

157157
ConditionsParameters conditionsParameters = new ConditionsParameters();
158-
conditionsParameters.setSuccessNode(Stream.of("2").collect(Collectors.toList()));
159-
conditionsParameters.setFailedNode(Stream.of("3").collect(Collectors.toList()));
158+
conditionsParameters.setSuccessNode(Stream.of(2L).collect(Collectors.toList()));
159+
conditionsParameters.setFailedNode(Stream.of(3L).collect(Collectors.toList()));
160160

161161
// out: SUCCESS => 2, FAILED => 3
162162
taskNode.setConditionResult(JSONUtils.toJsonString(conditionsParameters));

0 commit comments

Comments
 (0)