Skip to content

Commit 66c1d87

Browse files
authored
[Fix-17109] Recovered workflow host may incorrect (#17112)
1 parent d5532ed commit 66c1d87

File tree

9 files changed

+511
-0
lines changed

9 files changed

+511
-0
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
2626
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2727
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
28+
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
2829
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
2930
import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
3031
import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor;
@@ -67,6 +68,9 @@ public class RecoverFailureTaskCommandHandler extends AbstractCommandHandler {
6768
@Autowired
6869
private TaskInstanceFactories taskInstanceFactories;
6970

71+
@Autowired
72+
private MasterConfig masterConfig;
73+
7074
/**
7175
* Generate the recover workflow instance.
7276
* <p> Will use the origin workflow instance, but will update the following fields. Need to note we cannot not
@@ -90,6 +94,7 @@ protected void assembleWorkflowInstance(
9094
workflowInstance.setVarPool(null);
9195
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, command.getCommandType().name());
9296
workflowInstance.setCommandType(command.getCommandType());
97+
workflowInstance.setHost(masterConfig.getMasterAddress());
9398
workflowInstanceDao.updateById(workflowInstance);
9499

95100
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
2525
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2626
import org.apache.dolphinscheduler.extract.master.command.WorkflowFailoverCommandParam;
27+
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
2728
import org.apache.dolphinscheduler.server.master.engine.ITaskGroupCoordinator;
2829
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
2930
import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
@@ -58,6 +59,9 @@ public class WorkflowFailoverCommandHandler extends AbstractCommandHandler {
5859
@Autowired
5960
private ApplicationContext applicationContext;
6061

62+
@Autowired
63+
private MasterConfig masterConfig;
64+
6165
/**
6266
* Generate the recover workflow instance.
6367
* <p> Will use the origin workflow instance, but will update the following fields. Need to note we cannot not
@@ -86,6 +90,7 @@ protected void assembleWorkflowInstance(
8690
"The WorkflowFailoverCommandParam: " + command.getCommandParam() + " is invalid");
8791
}
8892
workflowInstance.setState(workflowFailoverCommandParam.getWorkflowExecutionStatus());
93+
workflowInstance.setHost(masterConfig.getMasterAddress());
8994
workflowInstanceDao.updateById(workflowInstance);
9095

9196
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/AbstractMasterIntegrationTestCase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.dolphinscheduler.server.master;
1919

2020
import org.apache.dolphinscheduler.dao.DaoConfiguration;
21+
import org.apache.dolphinscheduler.registry.api.RegistryClient;
22+
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
2123
import org.apache.dolphinscheduler.server.master.integration.MasterContainer;
2224
import org.apache.dolphinscheduler.server.master.integration.Repository;
2325
import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator;
@@ -52,4 +54,10 @@ public abstract class AbstractMasterIntegrationTestCase {
5254

5355
@Autowired
5456
protected MasterContainer masterContainer;
57+
58+
@Autowired
59+
protected RegistryClient registryClient;
60+
61+
@Autowired
62+
protected MasterConfig masterConfig;
5563
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@
2424
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2525
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2626
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
27+
import org.apache.dolphinscheduler.extract.base.client.Clients;
28+
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
29+
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
30+
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse;
2731
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
32+
import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils;
2833
import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
2934
import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus;
3035
import org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMasterFailoverEvent;
@@ -555,4 +560,59 @@ public void testGlobalFailover_readyStopWorkflow_withKilledTasks() {
555560
masterContainer.assertAllResourceReleased();
556561
}
557562

563+
@Test
564+
public void testGlobalFailover_runningWorkflow_fromAnotherMaster() {
565+
final String yaml = "/it/failover/running_workflowInstance_from_another_master.yaml";
566+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
567+
final WorkflowDefinition workflow = context.getOneWorkflow();
568+
569+
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date()));
570+
571+
final String masterFailoverNodePath = RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown(
572+
"127.0.0.1:15678");
573+
// wait failover process
574+
await()
575+
.atMost(Duration.ofMinutes(3))
576+
.untilAsserted(() -> {
577+
assertThat(registryClient.exists(masterFailoverNodePath)).isTrue();
578+
});
579+
580+
// check workflow's status and can stop it
581+
await()
582+
.atMost(Duration.ofMinutes(1))
583+
.untilAsserted(() -> {
584+
assertThat(repository.queryWorkflowInstance(workflow))
585+
.hasSize(1)
586+
.anySatisfy(workflowInstance -> {
587+
assertThat(workflowInstance.getState())
588+
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
589+
assertThat(workflowInstance.getName())
590+
.isEqualTo("workflow_with_one_fake_task_running-20250322201900000");
591+
592+
final WorkflowInstanceStopResponse stopResponse = Clients
593+
.withService(IWorkflowControlClient.class)
594+
.withHost(workflowInstance.getHost())
595+
.stopWorkflowInstance(
596+
new WorkflowInstanceStopRequest(workflowInstance.getId()));
597+
598+
assertThat((stopResponse != null && stopResponse.isSuccess())).isTrue();
599+
});
600+
});
601+
602+
await()
603+
.atMost(Duration.ofMinutes(1))
604+
.untilAsserted(() -> {
605+
assertThat(repository.queryWorkflowInstance(workflow))
606+
.hasSize(1)
607+
.anySatisfy(workflowInstance -> {
608+
assertThat(workflowInstance.getState())
609+
.isEqualTo(WorkflowExecutionStatus.STOP);
610+
assertThat(workflowInstance.getName())
611+
.isEqualTo("workflow_with_one_fake_task_running-20250322201900000");
612+
});
613+
});
614+
615+
masterContainer.assertAllResourceReleased();
616+
617+
}
558618
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceRecoverFailureTaskTestCase.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@
2323
import org.apache.dolphinscheduler.common.enums.Flag;
2424
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2525
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
26+
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
2627
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
28+
import org.apache.dolphinscheduler.extract.base.client.Clients;
29+
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
30+
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
31+
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse;
2732
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
2833
import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
2934
import org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext;
@@ -89,4 +94,50 @@ public void testRepeatRunningWorkflow_with_taskOnly() {
8994
masterContainer.assertAllResourceReleased();
9095
}
9196

97+
@Test
98+
@DisplayName("Test recover a failure workflow from another master")
99+
public void testRecoverFailureWorkflow_from_another_master() {
100+
final String yaml = "/it/recover_failure_tasks/failure_workflow_from_another_master.yaml";
101+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
102+
final WorkflowDefinition workflow = context.getOneWorkflow();
103+
104+
final Integer workflowInstanceId = context.getWorkflowInstance().getId();
105+
workflowOperator.recoverFailureTasks(workflowInstanceId);
106+
107+
await()
108+
.atMost(Duration.ofMinutes(1))
109+
.untilAsserted(() -> {
110+
assertThat(repository.queryWorkflowInstance(workflow))
111+
.hasSize(1)
112+
.anySatisfy(workflowInstance -> {
113+
assertThat(workflowInstance.getState())
114+
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
115+
assertThat(workflowInstance.getName())
116+
.isEqualTo("workflow_with_one_fake_task_killed-20250322201900000");
117+
118+
final WorkflowInstanceStopResponse stopResponse = Clients
119+
.withService(IWorkflowControlClient.class)
120+
.withHost(workflowInstance.getHost())
121+
.stopWorkflowInstance(
122+
new WorkflowInstanceStopRequest(workflowInstance.getId()));
123+
124+
assertThat(stopResponse != null && stopResponse.isSuccess()).isTrue();
125+
});
126+
});
127+
128+
await()
129+
.atMost(Duration.ofMinutes(1))
130+
.untilAsserted(() -> {
131+
assertThat(repository.queryWorkflowInstance(workflow))
132+
.hasSize(1)
133+
.anySatisfy(workflowInstance -> {
134+
assertThat(workflowInstance.getState())
135+
.isEqualTo(WorkflowExecutionStatus.STOP);
136+
assertThat(workflowInstance.getName())
137+
.isEqualTo("workflow_with_one_fake_task_killed-20250322201900000");
138+
});
139+
});
140+
141+
masterContainer.assertAllResourceReleased();
142+
}
92143
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceRecoverStopTestCase.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@
2424
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2525
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
2626
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
27+
import org.apache.dolphinscheduler.extract.base.client.Clients;
28+
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
2729
import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam;
30+
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
31+
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse;
2832
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
2933
import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
3034
import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator;
@@ -121,4 +125,50 @@ public void testRecoverStoppedWorkflow_with_subWorkflowTask_success() {
121125
masterContainer.assertAllResourceReleased();
122126
}
123127

128+
@Test
129+
@DisplayName("Test recover a stopped workflow from another master")
130+
public void testRecoverStoppedWorkflow_from_another_master() {
131+
final String yaml = "/it/recover_stopped/stopped_workflow_from_another_master.yaml";
132+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
133+
final WorkflowDefinition workflow = context.getOneWorkflow();
134+
135+
final Integer workflowInstanceId = context.getWorkflowInstance().getId();
136+
assertThat(workflowOperator.recoverSuspendWorkflowInstance(workflowInstanceId).isSuccess()).isTrue();
137+
138+
await()
139+
.atMost(Duration.ofMinutes(1))
140+
.untilAsserted(() -> {
141+
assertThat(repository.queryWorkflowInstance(workflow))
142+
.hasSize(1)
143+
.anySatisfy(workflowInstance -> {
144+
assertThat(workflowInstance.getState())
145+
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
146+
assertThat(workflowInstance.getName())
147+
.isEqualTo("workflow_with_one_fake_task_killed-20250322201900000");
148+
149+
final WorkflowInstanceStopResponse stopResponse = Clients
150+
.withService(IWorkflowControlClient.class)
151+
.withHost(workflowInstance.getHost())
152+
.stopWorkflowInstance(
153+
new WorkflowInstanceStopRequest(workflowInstance.getId()));
154+
155+
assertThat(stopResponse != null && stopResponse.isSuccess()).isTrue();
156+
});
157+
});
158+
159+
await()
160+
.atMost(Duration.ofMinutes(1))
161+
.untilAsserted(() -> {
162+
assertThat(repository.queryWorkflowInstance(workflow))
163+
.hasSize(1)
164+
.anySatisfy(workflowInstance -> {
165+
assertThat(workflowInstance.getState())
166+
.isEqualTo(WorkflowExecutionStatus.STOP);
167+
assertThat(workflowInstance.getName())
168+
.isEqualTo("workflow_with_one_fake_task_killed-20250322201900000");
169+
});
170+
});
171+
172+
masterContainer.assertAllResourceReleased();
173+
}
124174
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
project:
19+
name: MasterIntegrationTest
20+
code: 1
21+
description: This is a fake project
22+
userId: 1
23+
userName: admin
24+
createTime: 2025-03-22 00:00:00
25+
updateTime: 2025-03-22 00:00:00
26+
27+
workflows:
28+
- name: workflow_with_one_fake_task_running
29+
code: 1
30+
version: 1
31+
projectCode: 1
32+
description: This is a fake workflow with single task
33+
releaseState: ONLINE
34+
createTime: 2025-03-22 00:00:00
35+
updateTime: 2025-03-22 00:00:00
36+
userId: 1
37+
executionType: PARALLEL
38+
39+
workflowInstance:
40+
id: 1
41+
name: workflow_with_one_fake_task_running-20250322201900000
42+
workflowDefinitionCode: 1
43+
workflowDefinitionVersion: 1
44+
projectCode: 1
45+
state: RUNNING_EXECUTION
46+
recovery: NO
47+
startTime: 2025-03-22 20:19:00
48+
endTime: null
49+
runTimes: 1
50+
host: "127.0.0.1:15678"
51+
commandType: START_PROCESS
52+
commandParam: '{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
53+
taskDependType: TASK_POST
54+
commandStartTime: 2025-03-22 20:19:00
55+
isSubWorkflow: NO
56+
executorId: 1
57+
historyCmd: START_PROCESS
58+
workerGroup: default
59+
globalParams: '[]'
60+
varPool: '[]'
61+
dryRun: 0
62+
63+
taskInstances:
64+
- id: 1
65+
name: A
66+
taskType: LogicFakeTask
67+
workflowInstanceId: 1
68+
workflowInstanceName: workflow_with_one_fake_task_running-20250322201900000
69+
projectCode: 1
70+
taskCode: 1
71+
taskDefinitionVersion: 1
72+
state: RUNNING_EXECUTION
73+
firstSubmitTime: 2025-03-22 20:19:00
74+
submitTime: 2025-03-22 20:19:00
75+
startTime: 2025-03-22 20:19:00
76+
retryTimes: 0
77+
host: 127.0.0.1:1234
78+
maxRetryTimes: 0
79+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 200"}'
80+
flag: YES
81+
retryInterval: 0
82+
delayTime: 0
83+
workerGroup: default
84+
executorId: 1
85+
varPool: '[]'
86+
taskExecuteType: BATCH
87+
88+
tasks:
89+
- name: A
90+
code: 1
91+
version: 1
92+
projectCode: 1
93+
userId: 1
94+
taskType: LogicFakeTask
95+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 200"}'
96+
workerGroup: default
97+
createTime: 2025-03-22 00:00:00
98+
updateTime: 2025-03-22 00:00:00
99+
taskExecuteType: BATCH
100+
101+
taskRelations:
102+
- projectCode: 1
103+
workflowDefinitionCode: 1
104+
workflowDefinitionVersion: 1
105+
preTaskCode: 0
106+
preTaskVersion: 0
107+
postTaskCode: 1
108+
postTaskVersion: 1
109+
createTime: 2025-03-22 00:00:00
110+
updateTime: 2025-03-22 00:00:00

0 commit comments

Comments
 (0)