Skip to content

Commit ef90774

Browse files
[3.1.8] [fix-#14537] the branch that needs to be executed overlaps with another branch, it may not be able to complete the normal execution #14563 (#14689)
Co-authored-by: fuchanghai <[email protected]>
1 parent 2975701 commit ef90774

File tree

2 files changed

+91
-14
lines changed
  • dolphinscheduler-service/src

2 files changed

+91
-14
lines changed

Diff for: dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java

+51-5
Original file line numberDiff line numberDiff line change
@@ -413,28 +413,74 @@ public static List<String> parseSwitchTask(String nodeCode,
413413
return conditionTaskList;
414414
}
415415

416-
private static List<String> skipTaskNode4Switch(TaskNode taskNode, Map<String, TaskNode> skipTaskNodeList,
417-
Map<String, TaskInstance> completeTaskList,
418-
DAG<String, TaskNode, TaskNodeRelation> dag) {
416+
417+
public static List<String> skipTaskNode4Switch(TaskNode taskNode,
418+
Map<String, TaskNode> skipTaskNodeList,
419+
Map<String, TaskInstance> completeTaskList,
420+
DAG<String, TaskNode, TaskNodeRelation> dag) {
419421

420422
SwitchParameters switchParameters =
421423
completeTaskList.get(Long.toString(taskNode.getCode())).getSwitchDependency();
422424
int resultConditionLocation = switchParameters.getResultConditionLocation();
423425
List<SwitchResultVo> conditionResultVoList = switchParameters.getDependTaskList();
426+
424427
List<String> switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode();
428+
Set<String> switchNeedWorkCodes = new HashSet<>();
425429
if (CollectionUtils.isEmpty(switchTaskList)) {
426-
switchTaskList = new ArrayList<>();
430+
return new ArrayList<>();
431+
}
432+
// get all downstream nodes of the branch that the switch node needs to execute
433+
for (String switchTaskCode : switchTaskList) {
434+
getSwitchNeedWorkCodes(switchTaskCode, dag, switchNeedWorkCodes);
427435
}
428436
conditionResultVoList.remove(resultConditionLocation);
429437
for (SwitchResultVo info : conditionResultVoList) {
430438
if (CollectionUtils.isEmpty(info.getNextNode())) {
431439
continue;
432440
}
433-
setTaskNodeSkip(info.getNextNode().get(0), dag, completeTaskList, skipTaskNodeList);
441+
for (String nextNode : info.getNextNode()) {
442+
setSwitchTaskNodeSkip(nextNode, dag, completeTaskList, skipTaskNodeList,
443+
switchNeedWorkCodes);
444+
}
434445
}
435446
return switchTaskList;
436447
}
437448

449+
/**
450+
* get all downstream nodes of the branch that the switch node needs to execute
451+
* @param taskCode
452+
* @param dag
453+
* @param switchNeedWorkCodes
454+
*/
455+
public static void getSwitchNeedWorkCodes(String taskCode, DAG<String, TaskNode, TaskNodeRelation> dag,
456+
Set<String> switchNeedWorkCodes) {
457+
switchNeedWorkCodes.add(taskCode);
458+
Set<String> subsequentNodes = dag.getSubsequentNodes(taskCode);
459+
if (org.apache.commons.collections.CollectionUtils.isNotEmpty(subsequentNodes)) {
460+
for (String subCode : subsequentNodes) {
461+
getSwitchNeedWorkCodes(subCode, dag, switchNeedWorkCodes);
462+
}
463+
}
464+
}
465+
466+
private static void setSwitchTaskNodeSkip(String skipNodeCode,
467+
DAG<String, TaskNode, TaskNodeRelation> dag,
468+
Map<String, TaskInstance> completeTaskList,
469+
Map<String, TaskNode> skipTaskNodeList,
470+
Set<String> switchNeedWorkCodes) {
471+
// ignore when the node that needs to be skipped exists on the branch that the switch type node needs to execute
472+
if (!dag.containsNode(skipNodeCode) || switchNeedWorkCodes.contains(skipNodeCode)) {
473+
return;
474+
}
475+
skipTaskNodeList.putIfAbsent(skipNodeCode, dag.getNode(skipNodeCode));
476+
Collection<String> postNodeList = dag.getSubsequentNodes(skipNodeCode);
477+
for (String post : postNodeList) {
478+
TaskNode postNode = dag.getNode(post);
479+
if (isTaskNodeNeedSkip(postNode, skipTaskNodeList)) {
480+
setTaskNodeSkip(post, dag, completeTaskList, skipTaskNodeList);
481+
}
482+
}
483+
}
438484
/**
439485
* set task node and the post nodes skip flag
440486
*/

Diff for: dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java

+40-9
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,24 @@ public void testConditionPostNode() throws IOException {
330330
Assert.assertEquals(1, postNodes.size());
331331
}
332332

333+
@Test
334+
public void testSwitchPostNode() throws IOException {
335+
DAG<String, TaskNode, TaskNodeRelation> dag = generateDag2();
336+
Map<String, TaskNode> skipTaskNodeList = new HashMap<>();
337+
Map<String, TaskInstance> completeTaskList = new HashMap<>();
338+
completeTaskList.put("0", new TaskInstance());
339+
TaskInstance taskInstance = new TaskInstance();
340+
taskInstance.setState(TaskExecutionStatus.SUCCESS);
341+
taskInstance.setTaskCode(1);
342+
Map<String, Object> taskParamsMap = new HashMap<>();
343+
taskParamsMap.put(Constants.SWITCH_RESULT, "");
344+
taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
345+
taskInstance.setSwitchDependency(getSwitchNode());
346+
completeTaskList.put("1", taskInstance);
347+
DagHelper.skipTaskNode4Switch(dag.getNode("1"), skipTaskNodeList, completeTaskList, dag);
348+
Assert.assertNotNull(skipTaskNodeList.get("2"));
349+
Assert.assertEquals(1, skipTaskNodeList.size());
350+
}
333351
/**
334352
* process:
335353
* 1->2->3->5->7
@@ -436,11 +454,13 @@ private DAG<String, TaskNode, TaskNodeRelation> generateDag() throws IOException
436454

437455
/**
438456
* DAG graph:
439-
* 2
440-
* ↑
441-
* 0->1(switch)
442-
* ↓
443-
* 4
457+
* -> 2->
458+
* / \
459+
* / \
460+
* 0->1(switch)->5 6
461+
* \ /
462+
* \ /
463+
* -> 4->
444464
*
445465
* @return dag
446466
* @throws JsonProcessingException if error throws JsonProcessingException
@@ -484,15 +504,26 @@ private DAG<String, TaskNode, TaskNodeRelation> generateDag2() throws IOExceptio
484504
taskNodeList.add(node4);
485505

486506
TaskNode node5 = new TaskNode();
487-
node5.setId("4");
488-
node5.setName("4");
489-
node5.setCode(4);
507+
node5.setId("5");
508+
node5.setName("5");
509+
node5.setCode(5);
490510
node5.setType("SHELL");
491511
List<String> dep5 = new ArrayList<>();
492512
dep5.add("1");
493513
node5.setPreTasks(JSONUtils.toJsonString(dep5));
494514
taskNodeList.add(node5);
495515

516+
TaskNode node6 = new TaskNode();
517+
node5.setId("6");
518+
node5.setName("6");
519+
node5.setCode(6);
520+
node5.setType("SHELL");
521+
List<String> dep6 = new ArrayList<>();
522+
dep5.add("2");
523+
dep5.add("4");
524+
node5.setPreTasks(JSONUtils.toJsonString(dep6));
525+
taskNodeList.add(node6);
526+
496527
List<String> startNodes = new ArrayList<>();
497528
List<String> recoveryNodes = new ArrayList<>();
498529
List<TaskNode> destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList,
@@ -518,7 +549,7 @@ private SwitchParameters getSwitchNode() {
518549
conditionsParameters.setDependTaskList(list);
519550
conditionsParameters.setNextNode("5");
520551
conditionsParameters.setRelation("AND");
521-
552+
conditionsParameters.setResultConditionLocation(1);
522553
// in: AND(AND(1 is SUCCESS))
523554
return conditionsParameters;
524555
}

0 commit comments

Comments
 (0)