1+ import logging
2+ from dataclasses import field
13from functools import cached_property
24from typing import ClassVar , Optional
35
1113from orca .services .nextflowtower .models import LaunchInfo , Workflow , WorkflowStatus
1214from orca .services .nextflowtower .utils import increment_suffix
1315
16+ logger = logging .getLogger (__name__ )
17+
1418
1519@dataclass (kw_only = False )
1620class NextflowTowerOps (BaseOps ):
@@ -23,7 +27,7 @@ class NextflowTowerOps(BaseOps):
2327 client_factory_class: The class for constructing clients.
2428 """
2529
26- config : NextflowTowerConfig
30+ config : NextflowTowerConfig = field ( default_factory = NextflowTowerConfig )
2731
2832 client_factory_class = NextflowTowerClientFactory
2933
@@ -130,13 +134,19 @@ def launch_workflow(
130134 if not ignore_previous_runs :
131135 latest_run = self .get_latest_previous_workflow (launch_info )
132136 if latest_run :
137+ status = latest_run .status .value
138+ run_repr = f"{ latest_run .run_name } (id='{ latest_run .id } ', { status = } )"
133139 # Return ID for latest run if ongoing, succeeded, or cancelled
134- skip_statuses = {"SUCCEEDED" , "CANCELLED" }
135- if not latest_run .is_done or latest_run .status .value in skip_statuses :
140+ if not latest_run .is_done : # pragma: no cover
141+ logger .info (f"Found an ongoing previous run: { run_repr } " )
142+ return latest_run .id
143+ if status in {"SUCCEEDED" , "UNKNOWN" }:
144+ logger .info (f"Found a previous run: { run_repr } " )
136145 return latest_run .id
137146 launch_info .fill_in ("resume" , True )
138147 launch_info .fill_in ("session_id" , latest_run .session_id )
139- launch_info .run_name = increment_suffix (launch_info .run_name )
148+ launch_info .run_name = increment_suffix (latest_run .run_name )
149+ logger .info (f"Relaunching from a previous run: { run_repr } " )
140150
141151 # Get relevant compute environment and its resource tags
142152 compute_env_id = self .get_latest_compute_env (compute_env_filter )
@@ -154,7 +164,21 @@ def launch_workflow(
154164 launch_info .fill_in ("pre_run_script" , compute_env .pre_run_script )
155165 launch_info .add_in ("label_ids" , label_ids )
156166
157- return self .client .launch_workflow (launch_info , self .workspace_id )
167+ workflow_id = self .client .launch_workflow (launch_info , self .workspace_id )
168+ workflow_repr = f"{ launch_info .run_name } ({ workflow_id } )"
169+ logger .info (f"Launched a new workflow run: { workflow_repr } " )
170+ return workflow_id
171+
172+ def get_workflow (self , workflow_id : str ) -> Workflow :
173+ """Retrieve details about a workflow run.
174+
175+ Args:
176+ workflow_id: Workflow run ID.
177+
178+ Returns:
179+ Workflow instance.
180+ """
181+ return self .client .get_workflow (workflow_id , self .workspace_id )
158182
159183 # TODO: Consider switching return value to a namedtuple
160184 def get_workflow_status (self , workflow_id : str ) -> tuple [WorkflowStatus , bool ]:
@@ -166,8 +190,8 @@ def get_workflow_status(self, workflow_id: str) -> tuple[WorkflowStatus, bool]:
166190 Returns:
167191 Workflow status and whether the workflow is done.
168192 """
169- workflow = self .client . get_workflow (workflow_id , self . workspace_id )
170- is_done = workflow .status . value in WorkflowStatus .terminal_states . value
193+ workflow = self .get_workflow (workflow_id )
194+ is_done = workflow .status in WorkflowStatus .terminal_states
171195 return workflow .status , is_done
172196
173197 def list_workflows (self , search_filter : str = "" ) -> list [Workflow ]:
0 commit comments