Skip to content

Commit c382516

Browse files
authored
Merge pull request #154 from conductor-sdk/integration-tests-idempotence
Updated TaskRunner to retry sending taskResult
2 parents bc436a6 + 0292d55 commit c382516

2 files changed

Lines changed: 106 additions & 41 deletions

File tree

src/conductor/client/automator/task_runner.py

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -168,33 +168,35 @@ def __update_task(self, task_result: TaskResult):
168168
task_definition_name=task_definition_name
169169
)
170170
)
171-
try:
172-
response = self.task_client.update_task(
173-
body=task_result
174-
)
175-
except Exception as e:
176-
if self.metrics_collector is not None:
177-
self.metrics_collector.increment_task_update_error(
178-
task_definition_name, type(e)
171+
for attempt in range(4):
172+
if attempt > 0:
173+
# Wait for [10s, 20s, 30s] before next attempt
174+
time.sleep(attempt * 10)
175+
try:
176+
response = self.task_client.update_task(body=task_result)
177+
logger.debug(
178+
'Updated task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format(
179+
task_id=task_result.task_id,
180+
workflow_instance_id=task_result.workflow_instance_id,
181+
task_definition_name=task_definition_name,
182+
response=response
183+
)
179184
)
180-
logger.info(
181-
'Failed to update task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, reason: {reason}'.format(
182-
task_id=task_result.task_id,
183-
workflow_instance_id=task_result.workflow_instance_id,
184-
task_definition_name=task_definition_name,
185-
reason=traceback.format_exc()
185+
return response
186+
except Exception as e:
187+
if self.metrics_collector is not None:
188+
self.metrics_collector.increment_task_update_error(
189+
task_definition_name, type(e)
190+
)
191+
logger.debug(
192+
'Failed to update task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, reason: {reason}'.format(
193+
task_id=task_result.task_id,
194+
workflow_instance_id=task_result.workflow_instance_id,
195+
task_definition_name=task_definition_name,
196+
reason=traceback.format_exc()
197+
)
186198
)
187-
)
188-
return None
189-
logger.debug(
190-
'Updated task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format(
191-
task_id=task_result.task_id,
192-
workflow_instance_id=task_result.workflow_instance_id,
193-
task_definition_name=task_definition_name,
194-
response=response
195-
)
196-
)
197-
return response
199+
return None
198200

199201
def __wait_for_polling_interval(self) -> None:
200202
polling_interval = self.worker.get_polling_interval_in_seconds()

tests/integration/workflow/test_workflow_execution.py

Lines changed: 79 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,16 @@ def test_workflow_methods(
136136
*start_workflow_requests
137137
)
138138
for workflow_id in workflow_ids:
139-
_pause_workflow(workflow_executor, workflow_id)
140-
_resume_workflow(workflow_executor, workflow_id)
141-
_terminate_workflow(workflow_executor, workflow_id)
142-
_restart_workflow(workflow_executor, workflow_id)
143-
_terminate_workflow(workflow_executor, workflow_id)
144-
_retry_workflow(workflow_executor, workflow_id)
145-
failure_wf_id = _terminate_workflow_with_failure(workflow_executor, workflow_id, True)
146-
_terminate_workflow(workflow_executor, failure_wf_id)
147-
_rerun_workflow(workflow_executor, workflow_id)
139+
__pause_workflow(workflow_executor, workflow_id)
140+
__resume_workflow(workflow_executor, workflow_id)
141+
__terminate_workflow(workflow_executor, workflow_id)
142+
__restart_workflow(workflow_executor, workflow_id)
143+
__terminate_workflow(workflow_executor, workflow_id)
144+
__retry_workflow(workflow_executor, workflow_id)
145+
failure_wf_id = __terminate_workflow_with_failure(
146+
workflow_executor, workflow_id, True)
147+
__terminate_workflow(workflow_executor, failure_wf_id)
148+
__rerun_workflow(workflow_executor, workflow_id)
148149
workflow_executor.remove_workflow(
149150
workflow_id, archive_workflow=False
150151
)
@@ -232,7 +233,17 @@ def generate_worker(execute_function: ExecuteTaskFunction) -> Worker:
232233
)
233234

234235

235-
def _pause_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
236+
def __pause_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
237+
_run_with_retry_attempt(
238+
__validate_pause_workflow,
239+
{
240+
"workflow_executor": workflow_executor,
241+
"workflow_id": workflow_id,
242+
}
243+
)
244+
245+
246+
def __validate_pause_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
236247
workflow_executor.pause(workflow_id)
237248
workflow_status = workflow_executor.get_workflow_status(
238249
workflow_id,
@@ -245,7 +256,17 @@ def _pause_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> No
245256
)
246257

247258

248-
def _resume_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
259+
def __resume_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
260+
_run_with_retry_attempt(
261+
__validate_resume_workflow,
262+
{
263+
"workflow_executor": workflow_executor,
264+
"workflow_id": workflow_id,
265+
}
266+
)
267+
268+
269+
def __validate_resume_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
249270
workflow_executor.resume(workflow_id)
250271
workflow_status = workflow_executor.get_workflow_status(
251272
workflow_id,
@@ -258,7 +279,17 @@ def _resume_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> N
258279
)
259280

260281

261-
def _terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
282+
def __terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
283+
_run_with_retry_attempt(
284+
__validate_terminate_workflow,
285+
{
286+
"workflow_executor": workflow_executor,
287+
"workflow_id": workflow_id,
288+
}
289+
)
290+
291+
292+
def __validate_terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
262293
workflow_executor.terminate(workflow_id)
263294
workflow_status = workflow_executor.get_workflow_status(
264295
workflow_id,
@@ -270,7 +301,8 @@ def _terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -
270301
f'workflow expected to be TERMINATED, but received {workflow_status.status}, workflow_id: {workflow_id}'
271302
)
272303

273-
def _terminate_workflow_with_failure(workflow_executor: WorkflowExecutor, workflow_id: str, trigger_failure_workflow: bool) -> str:
304+
305+
def __terminate_workflow_with_failure(workflow_executor: WorkflowExecutor, workflow_id: str, trigger_failure_workflow: bool) -> str:
274306
workflow_executor.terminate(workflow_id, 'test', trigger_failure_workflow)
275307
workflow_status = workflow_executor.get_workflow_status(
276308
workflow_id,
@@ -283,7 +315,18 @@ def _terminate_workflow_with_failure(workflow_executor: WorkflowExecutor, workfl
283315
)
284316
return workflow_status.output.get('conductor.failure_workflow')
285317

286-
def _restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
318+
319+
def __restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
320+
_run_with_retry_attempt(
321+
__validate_restart_workflow,
322+
{
323+
"workflow_executor": workflow_executor,
324+
"workflow_id": workflow_id,
325+
}
326+
)
327+
328+
329+
def __validate_restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
287330
workflow_executor.restart(workflow_id)
288331
workflow_status = workflow_executor.get_workflow_status(
289332
workflow_id,
@@ -296,7 +339,17 @@ def _restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) ->
296339
)
297340

298341

299-
def _retry_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
342+
def __retry_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
343+
_run_with_retry_attempt(
344+
__validate_retry_workflow,
345+
{
346+
"workflow_executor": workflow_executor,
347+
"workflow_id": workflow_id,
348+
}
349+
)
350+
351+
352+
def __validate_retry_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
300353
workflow_executor.retry(workflow_id)
301354
workflow_status = workflow_executor.get_workflow_status(
302355
workflow_id,
@@ -309,7 +362,17 @@ def _retry_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> No
309362
)
310363

311364

312-
def _rerun_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
365+
def __rerun_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
366+
_run_with_retry_attempt(
367+
__validate_rerun_workflow,
368+
{
369+
"workflow_executor": workflow_executor,
370+
"workflow_id": workflow_id,
371+
}
372+
)
373+
374+
375+
def __validate_rerun_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
313376
workflow_executor.rerun(RerunWorkflowRequest(), workflow_id)
314377
workflow_status = workflow_executor.get_workflow_status(
315378
workflow_id,

0 commit comments

Comments
 (0)