|
24 | 24 | import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
|
25 | 25 | import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
26 | 26 | import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
|
| 27 | +import org.apache.dolphinscheduler.extract.base.client.Clients; |
| 28 | +import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient; |
| 29 | +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest; |
| 30 | +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse; |
27 | 31 | import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
|
| 32 | +import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils; |
28 | 33 | import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
|
29 | 34 | import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus;
|
30 | 35 | import org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMasterFailoverEvent;
|
@@ -555,4 +560,59 @@ public void testGlobalFailover_readyStopWorkflow_withKilledTasks() {
|
555 | 560 | masterContainer.assertAllResourceReleased();
|
556 | 561 | }
|
557 | 562 |
|
| 563 | + @Test |
| 564 | + public void testGlobalFailover_runningWorkflow_fromAnotherMaster() { |
| 565 | + final String yaml = "/it/failover/running_workflowInstance_from_another_master.yaml"; |
| 566 | + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
| 567 | + final WorkflowDefinition workflow = context.getOneWorkflow(); |
| 568 | + |
| 569 | + systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
| 570 | + |
| 571 | + final String masterFailoverNodePath = RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown( |
| 572 | + "127.0.0.1:15678"); |
| 573 | + // wait failover process |
| 574 | + await() |
| 575 | + .atMost(Duration.ofMinutes(3)) |
| 576 | + .untilAsserted(() -> { |
| 577 | + assertThat(registryClient.exists(masterFailoverNodePath)).isTrue(); |
| 578 | + }); |
| 579 | + |
| 580 | + // check workflow's status and can stop it |
| 581 | + await() |
| 582 | + .atMost(Duration.ofMinutes(1)) |
| 583 | + .untilAsserted(() -> { |
| 584 | + assertThat(repository.queryWorkflowInstance(workflow)) |
| 585 | + .hasSize(1) |
| 586 | + .anySatisfy(workflowInstance -> { |
| 587 | + assertThat(workflowInstance.getState()) |
| 588 | + .isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION); |
| 589 | + assertThat(workflowInstance.getName()) |
| 590 | + .isEqualTo("workflow_with_one_fake_task_running-20250322201900000"); |
| 591 | + |
| 592 | + final WorkflowInstanceStopResponse stopResponse = Clients |
| 593 | + .withService(IWorkflowControlClient.class) |
| 594 | + .withHost(workflowInstance.getHost()) |
| 595 | + .stopWorkflowInstance( |
| 596 | + new WorkflowInstanceStopRequest(workflowInstance.getId())); |
| 597 | + |
| 598 | + assertThat((stopResponse != null && stopResponse.isSuccess())).isTrue(); |
| 599 | + }); |
| 600 | + }); |
| 601 | + |
| 602 | + await() |
| 603 | + .atMost(Duration.ofMinutes(1)) |
| 604 | + .untilAsserted(() -> { |
| 605 | + assertThat(repository.queryWorkflowInstance(workflow)) |
| 606 | + .hasSize(1) |
| 607 | + .anySatisfy(workflowInstance -> { |
| 608 | + assertThat(workflowInstance.getState()) |
| 609 | + .isEqualTo(WorkflowExecutionStatus.STOP); |
| 610 | + assertThat(workflowInstance.getName()) |
| 611 | + .isEqualTo("workflow_with_one_fake_task_running-20250322201900000"); |
| 612 | + }); |
| 613 | + }); |
| 614 | + |
| 615 | + masterContainer.assertAllResourceReleased(); |
| 616 | + |
| 617 | + } |
558 | 618 | }
|
0 commit comments