Skip to content

Commit ea1a993

Browse files
authored
Merge pull request #29 from Sage-Bionetworks-Workflows/bwmac/IBCDPE-691/workflow-tasks-logs
[IBCDPE-691] Adds Ability to Retrieve Workflow Task Logs from Tower
2 parents 7f63c0e + 552e08c commit ea1a993

File tree

7 files changed

+3311
-2231
lines changed

7 files changed

+3311
-2231
lines changed

Pipfile.lock

Lines changed: 2909 additions & 2228 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/orca/services/nextflowtower/client.py

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def request_paged(self, method: str, path: str, **kwargs) -> dict[str, Any]:
113113
while num_items < total_size:
114114
kwargs["params"]["offset"] = num_items
115115
json = self.request_json(method, path, **kwargs)
116-
total_size = json.pop("totalSize")
116+
total_size = json.pop("totalSize", None) or json.pop("total", 0)
117117
key_name, items = json.popitem()
118118
num_items += len(items)
119119
all_items.extend(items)
@@ -134,7 +134,7 @@ def get(self, path: str, **kwargs) -> dict[str, Any]:
134134
A dictionary from deserializing the JSON response.
135135
"""
136136
json = self.request_json("GET", path, **kwargs)
137-
if "totalSize" in json:
137+
if "totalSize" in json or "total" in json:
138138
json = self.request_paged("GET", path, **kwargs)
139139
return json
140140

@@ -369,3 +369,47 @@ def list_workflows(
369369
json = self.get(path=path, params=params)
370370
items = self.unwrap(json, "workflows")
371371
return [models.Workflow.from_json(item["workflow"]) for item in items]
372+
373+
def get_workflow_tasks(
374+
self,
375+
workflow_id: str,
376+
workspace_id: Optional[int] = None,
377+
) -> list[models.WorkflowTask]:
378+
"""Retrieve the details of a workflow run's tasks.
379+
380+
Args:
381+
workflow_id: The ID number for a workflow run to
382+
get tasks from.
383+
workspace_id: The ID number of the workspace the workflow
384+
exists within. Defaults to None.
385+
386+
Returns:
387+
List of WorkflowTask objects.
388+
"""
389+
path = f"/workflow/{workflow_id}/tasks"
390+
params = self.generate_params(workspace_id)
391+
json = self.get(path=path, params=params)
392+
items = self.unwrap(json, "tasks")
393+
return [models.WorkflowTask.from_json(item["task"]) for item in items]
394+
395+
def get_task_logs(
396+
self, workflow_id: str, task_id: int, workspace_id: Optional[int]
397+
) -> str:
398+
"""Retrieve the logs for a given workflow task.
399+
400+
Args:
401+
workflow_id: The ID number for a workflow run the
402+
tasks belongs to.
403+
task_id: The task_id for the task to get logs from.
404+
workspace_id: The ID number of the workspace the workflow
405+
exists within. Defaults to None.
406+
407+
Returns:
408+
WorkflowTask Execution logs.
409+
"""
410+
path = f"/workflow/{workflow_id}/log/{task_id}"
411+
params = self.generate_params(workspace_id)
412+
json = self.get(path=path, params=params)
413+
items = self.unwrap(json, "log")
414+
log_list = items["entries"]
415+
return "\n".join(log_list)

src/orca/services/nextflowtower/models.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,3 +336,45 @@ def __repr__(self) -> str:
336336
def status(self) -> WorkflowStatus:
337337
"""Workflow run status."""
338338
return WorkflowStatus(self.state)
339+
340+
341+
@dataclass(kw_only=False)
342+
class WorkflowTask(BaseTowerModel):
343+
"""Nextflow Tower workflow task details."""
344+
345+
id: int
346+
task_id: int
347+
status: str
348+
name: str
349+
module: list[str]
350+
queue: str
351+
memory: Optional[int]
352+
script: str
353+
tag: Optional[str]
354+
executor: str
355+
duration: int
356+
container: str
357+
process: str
358+
attempt: int
359+
scratch: Optional[str]
360+
work_dir: str
361+
disk: Optional[int]
362+
price_model: str
363+
cost: float
364+
error_action: Optional[str]
365+
native_id: str
366+
env: Optional[str]
367+
exit_status: int
368+
cpus: Optional[int]
369+
machine_type: str
370+
hash: str
371+
372+
_key_mapping = {
373+
"task_id": "taskId",
374+
"work_dir": "workdir",
375+
"price_model": "priceModel",
376+
"error_action": "errorAction",
377+
"native_id": "nativeId",
378+
"exit_status": "exitStatus",
379+
"machine_type": "machineType",
380+
}

src/orca/services/nextflowtower/ops.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@
1111
from orca.services.nextflowtower.client import NextflowTowerClient
1212
from orca.services.nextflowtower.client_factory import NextflowTowerClientFactory
1313
from orca.services.nextflowtower.config import NextflowTowerConfig
14-
from orca.services.nextflowtower.models import LaunchInfo, Workflow, WorkflowStatus
14+
from orca.services.nextflowtower.models import (
15+
LaunchInfo,
16+
Workflow,
17+
WorkflowStatus,
18+
WorkflowTask,
19+
)
1520
from orca.services.nextflowtower.utils import increment_suffix
1621

1722
logger = logging.getLogger(__name__)
@@ -272,3 +277,26 @@ async def monitor_workflow(
272277

273278
logger.info(f"{workflow} is now done!")
274279
return workflow.status
280+
281+
def get_workflow_tasks(self, workflow_id: str) -> list[WorkflowTask]:
282+
"""Retrieve the details of a workflow run's tasks.
283+
284+
Args:
285+
workflow_id: Workflow run ID.
286+
287+
Returns:
288+
List of task details.
289+
"""
290+
return self.client.get_workflow_tasks(workflow_id, self.workspace_id)
291+
292+
def get_task_logs(self, workflow_id: str, task_id: int) -> str:
293+
"""Retrieve the execution logs for a given workflow task.
294+
295+
Args:
296+
workflow_id: Workflow run ID.
297+
task_id: Task ID.
298+
299+
Returns:
300+
Task logs.
301+
"""
302+
return self.client.get_task_logs(workflow_id, task_id, self.workspace_id)

0 commit comments

Comments
 (0)