Skip to content

Commit 8204fed

Browse files
authored
Merge pull request #149 from conductor-sdk/orkes-conudctor-957-terminate-API-update-triggerFailureWorkflow
orkes-conductor-957: update Terminate API
2 parents d41a2aa + a7e09a9 commit 8204fed

4 files changed

Lines changed: 33 additions & 5 deletions

File tree

src/conductor/client/http/api/workflow_bulk_resource_api.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ def terminate(self, body, **kwargs): # noqa: E501
423423
:param async_req bool
424424
:param list[str] body: (required)
425425
:param str reason:
426+
:param bool trigger_failure_workflow:
426427
:return: BulkResponse
427428
If the method is called asynchronously,
428429
returns the request thread.
@@ -445,12 +446,13 @@ def terminate_with_http_info(self, body, **kwargs): # noqa: E501
445446
:param async_req bool
446447
:param list[str] body: (required)
447448
:param str reason:
449+
:param bool trigger_failure_workflow:
448450
:return: BulkResponse
449451
If the method is called asynchronously,
450452
returns the request thread.
451453
"""
452454

453-
all_params = ['body', 'reason'] # noqa: E501
455+
all_params = ['body', 'reason', 'triggerFailureWorkflow'] # noqa: E501
454456
all_params.append('async_req')
455457
all_params.append('_return_http_data_only')
456458
all_params.append('_preload_content')
@@ -478,6 +480,9 @@ def terminate_with_http_info(self, body, **kwargs): # noqa: E501
478480
if 'reason' in params:
479481
query_params.append(('reason', params['reason'])) # noqa: E501
480482

483+
if 'triggerFailureWorkflow' in params:
484+
query_params.append(('triggerFailureWorkflow', params['triggerFailureWorkflow'])) # noqa: E501
485+
481486
header_params = {}
482487

483488
form_params = []

src/conductor/client/http/api/workflow_resource_api.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2390,6 +2390,7 @@ def terminate1(self, workflow_id, **kwargs): # noqa: E501
23902390
:param async_req bool
23912391
:param str workflow_id: (required)
23922392
:param str reason:
2393+
:param bool trigger_failure_workflow:
23932394
:return: None
23942395
If the method is called asynchronously,
23952396
returns the request thread.
@@ -2412,12 +2413,13 @@ def terminate1_with_http_info(self, workflow_id, **kwargs): # noqa: E501
24122413
:param async_req bool
24132414
:param str workflow_id: (required)
24142415
:param str reason:
2416+
:param bool trigger_failure_workflow:
24152417
:return: None
24162418
If the method is called asynchronously,
24172419
returns the request thread.
24182420
"""
24192421

2420-
all_params = ['workflow_id', 'reason'] # noqa: E501
2422+
all_params = ['workflow_id', 'reason', 'triggerFailureWorkflow'] # noqa: E501
24212423
all_params.append('async_req')
24222424
all_params.append('_return_http_data_only')
24232425
all_params.append('_preload_content')
@@ -2447,6 +2449,9 @@ def terminate1_with_http_info(self, workflow_id, **kwargs): # noqa: E501
24472449
if 'reason' in params:
24482450
query_params.append(('reason', params['reason'])) # noqa: E501
24492451

2452+
if 'triggerFailureWorkflow' in params:
2453+
query_params.append(('triggerFailureWorkflow', params['triggerFailureWorkflow'])) # noqa: E501
2454+
24502455
header_params = {}
24512456

24522457
form_params = []

src/conductor/client/workflow/executor/workflow_executor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,13 @@ def resume(self, workflow_id: str) -> None:
152152
workflow_id=workflow_id
153153
)
154154

155-
def terminate(self, workflow_id: str, reason: str = None) -> None:
155+
def terminate(self, workflow_id: str, reason: str = None, trigger_failure_workflow: bool = None) -> None:
156156
"""Terminate workflow execution"""
157157
kwargs = {}
158158
if reason is not None:
159159
kwargs['reason'] = reason
160+
if trigger_failure_workflow is not None:
161+
kwargs['triggerFailureWorkflow'] = trigger_failure_workflow
160162
return self.workflow_client.terminate1(
161163
workflow_id=workflow_id,
162164
**kwargs

tests/integration/workflow/test_workflow_execution.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def test_workflow_methods(
124124
version=1234,
125125
).add(
126126
task
127-
)
127+
).failure_workflow(workflow_name)
128128
workflow_executor.register_workflow(
129129
workflow.to_workflow_def(),
130130
overwrite=True,
@@ -142,11 +142,15 @@ def test_workflow_methods(
142142
_restart_workflow(workflow_executor, workflow_id)
143143
_terminate_workflow(workflow_executor, workflow_id)
144144
_retry_workflow(workflow_executor, workflow_id)
145-
_terminate_workflow(workflow_executor, workflow_id)
145+
failure_wf_id = _terminate_workflow_with_failure(workflow_executor, workflow_id, True)
146+
_terminate_workflow(workflow_executor, failure_wf_id)
146147
_rerun_workflow(workflow_executor, workflow_id)
147148
workflow_executor.remove_workflow(
148149
workflow_id, archive_workflow=False
149150
)
151+
workflow_executor.remove_workflow(
152+
failure_wf_id, archive_workflow=False
153+
)
150154

151155

152156
def test_workflow_registration(workflow_executor: WorkflowExecutor):
@@ -266,6 +270,18 @@ def _terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -
266270
f'workflow expected to be TERMINATED, but received {workflow_status.status}, workflow_id: {workflow_id}'
267271
)
268272

273+
def _terminate_workflow_with_failure(workflow_executor: WorkflowExecutor, workflow_id: str, trigger_failure_workflow: bool) -> str:
274+
workflow_executor.terminate(workflow_id, 'test', trigger_failure_workflow)
275+
workflow_status = workflow_executor.get_workflow_status(
276+
workflow_id,
277+
include_output=True,
278+
include_variables=False,
279+
)
280+
if workflow_status.status != 'TERMINATED':
281+
raise Exception(
282+
f'workflow expected to be TERMINATED, but received {workflow_status.status}, workflow_id: {workflow_id}'
283+
)
284+
return workflow_status.output.get('conductor.failure_workflow')
269285

270286
def _restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
271287
workflow_executor.restart(workflow_id)

0 commit comments

Comments
 (0)