Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 2 additions & 18 deletions lib/galaxy/managers/workflow_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,38 +52,25 @@ def check_and_record_completion(self, invocation_id: int) -> Optional[WorkflowIn
The WorkflowInvocationCompletion record if newly completed,
None if already completed or not yet complete.
"""
log.debug("Checking completion for invocation %d", invocation_id)
session = self.sa_session

# Use select to get fresh data from database
stmt = select(WorkflowInvocation).where(WorkflowInvocation.id == invocation_id)
invocation = session.execute(stmt).scalar_one_or_none()
if not invocation:
log.debug("Invocation %d not found", invocation_id)
return None

log.debug(
"Invocation %d state=%s, has_completion=%s",
invocation_id,
invocation.state,
invocation.completion is not None,
)

# Already completed - don't create duplicate record
if invocation.completion is not None:
log.debug("Invocation %d already has completion record", invocation_id)
return None

# Check if complete
complete = invocation.is_complete
log.debug("Invocation %d is_complete=%s", invocation_id, complete)
if not complete:
return None

# Record completion
job_summary = invocation.compute_recursive_job_state_summary()
log.debug("Invocation %d job_summary=%s", invocation_id, job_summary)

completion = WorkflowInvocationCompletion(
workflow_invocation_id=invocation.id,
job_state_summary=job_summary,
Expand All @@ -95,7 +82,7 @@ def check_and_record_completion(self, invocation_id: int) -> Optional[WorkflowIn

session.add(completion)
session.commit()
log.info("Recorded completion for invocation %d", invocation_id)
log.info("Recorded completion for invocation %d, job_summary=%s", invocation_id, job_summary)

return completion

Expand Down Expand Up @@ -126,10 +113,7 @@ def poll_pending_completions(self, limit: int = 100, handler: Optional[str] = No
stmt = stmt.where(WorkflowInvocation.handler == handler)
stmt = stmt.limit(limit)
session = self.sa_session
result = list(session.execute(stmt).scalars())
if result:
log.debug("Found %d pending completions: %s", len(result), result)
return result
return list(session.execute(stmt).scalars())

def get_completion(self, invocation_id: int) -> Optional[WorkflowInvocationCompletion]:
"""
Expand Down
6 changes: 0 additions & 6 deletions lib/galaxy/workflow/completion_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,6 @@ def _monitor_step(self) -> None:
handler = self.app.config.server_name
pending_ids = self.completion_manager.poll_pending_completions(handler=handler)

if pending_ids:
log.debug(
"Workflow completion monitor checking %d invocations",
len(pending_ids),
)

for invocation_id in pending_ids:
if not self.monitor_running:
return
Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy_test/api/test_wes.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,11 @@ def test_wes_cancel_run_completed_workflow(self):
history_id=history_id,
)

self.workflow_populator.wait_for_invocation_and_jobs(history_id, None, invocation_id, assert_ok=True)
self.workflow_populator.wait_for_invocation_and_completion(invocation_id, assert_ok=True)

# Verify workflow is complete
invocation = self.workflow_populator.get_invocation(invocation_id)
assert invocation["state"] == "scheduled"
assert invocation["state"] == "completed"

# Try to cancel (should succeed but workflow already complete)
response = self._wes_post(f"ga4gh/wes/v1/runs/{invocation_id}/cancel")
Expand Down
Loading