Skip to content

Commit 32cf375

Browse files
andrewfayresAyres, Andrew
authored andcommitted
small fixes for workflows. Improving waiting on invocation and getting steps.
1 parent a24ab48 commit 32cf375

1 file changed

Lines changed: 28 additions & 8 deletions

File tree

src/nova/galaxy/workflow.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
if TYPE_CHECKING:
77
from .data_store import Datastore
88

9+
from bioblend import TimeoutException
910
from nova.common.job import WorkState
1011

1112
from .dataset import AbstractData, Dataset, DatasetCollection
@@ -169,16 +170,30 @@ def submit(self, params: Optional[WorkflowParameters]) -> None:
169170
self.status.details = f"Failed to prepare or submit workflow invocation: {str(e)}"
170171
self.invocation_id = None
171172

172-
def wait_for_results(self) -> None:
173+
def wait_for_results(self, max_tries=120) -> None:
173174
"""Waits for the workflow invocation to complete."""
174175
if not self.invocation_id:
175176
raise Exception("Cannot wait for results, invocation ID is not set.")
176177

178+
179+
# galaxy doesn't always return when a job fails. Periodically checking the jobs to see if we should return.
180+
attempt_counter = 0
181+
while True:
182+
try:
183+
if attempt_counter < max_tries:
184+
self.galaxy_instance.invocations.wait_for_invocation(self.invocation_id, maxwait = 5)
185+
except TimeoutException as e:
186+
# check if any steps failed. If they have we return. Otherwise we just wait some more.
187+
attempt_counter += 1
188+
for step in self.get_step_jobs(running_only=False):
189+
if step._job is not None:
190+
if step.get_status() is WorkState.ERROR:
191+
return
192+
177193
# galaxy returns once all steps are scheduled instead of complete. Need to wait for each job to complete
178-
self.galaxy_instance.invocations.wait_for_invocation(self.invocation_id)
179194
for step in self.get_step_jobs():
180195
if step._job is not None:
181-
step._job.wait_for_results()
196+
step._job.wait_for_results(running_only=False)
182197
if step.get_status() is not WorkState.FINISHED:
183198
return
184199

@@ -271,16 +286,15 @@ def get_invocation_id(self) -> Optional[str]:
271286
"""Returns the Galaxy invocation ID."""
272287
return self.invocation_id
273288

274-
def get_step_jobs(self) -> List[Tool]:
289+
def get_step_jobs(self, running_only: bool = True) -> List[Tool]:
275290
"""Returns nova-galaxy Job instances for each step in the workflow invocation."""
276291
if not self.invocation_id:
277292
return []
278293

279294
try:
280295
jobs_summary = self.galaxy_instance.invocations.get_invocation_step_jobs_summary(self.invocation_id)
281296
step_jobs = []
282-
283-
tools = self.store.recover_tools(filter_running=True)
297+
tools = self.store.recover_tools(filter_running=running_only)
284298

285299
for job_info in jobs_summary:
286300
if job_info.get("id"):
@@ -446,12 +460,18 @@ def get_invocation_id(self) -> Optional[str]:
446460
return self._invocation.get_invocation_id()
447461
return None
448462

449-
def get_step_jobs(self) -> List[Tool]:
463+
def get_step_jobs(self, running_only: bool = True) -> List[Tool]:
450464
"""Gets nova-galaxy Job instances for each step in the workflow.
451465
452466
Returns the individual jobs that make up the workflow steps,
453467
allowing access to step-level status, outputs, and console logs.
454468
469+
Parameters
470+
----------
471+
running_only : Optional[bool]
472+
A boolean that determines whether or not to return only jobs which
473+
are currently running.
474+
455475
Returns
456476
-------
457477
List[Job]
@@ -470,7 +490,7 @@ def get_step_jobs(self) -> List[Tool]:
470490
... print(console.get('stdout', ''))
471491
"""
472492
if self._invocation:
473-
return self._invocation.get_step_jobs()
493+
return self._invocation.get_step_jobs(running_only)
474494
return []
475495

476496
def get_step_name(self, step_number: int) -> str:

0 commit comments

Comments
 (0)