|
1 | 1 | """Handle execution backends."""
|
2 | 2 | import asyncio
|
| 3 | +import pickle |
3 | 4 | from uuid import uuid4
|
4 | 5 | from .workers import WORKERS
|
5 | 6 | from .core import is_workflow
|
@@ -167,9 +168,14 @@ async def expand_workflow(self, wf, rerun=False):
|
167 | 168 | # don't block the event loop!
|
168 | 169 | await asyncio.sleep(1)
|
169 | 170 | if ii > 60:
|
| 171 | + blocked = _list_blocked_tasks(graph_copy) |
| 172 | + get_runnable_tasks(graph_copy) |
170 | 173 | raise Exception(
|
171 | 174 | "graph is not empty, but not able to get more tasks "
|
172 |
| - "- something is wrong (e.g. with the filesystem)" |
| 175 | + "- something may have gone wrong when retrieving the results " |
| 176 | + "of predecessor tasks caused by a file-system error or a bug " |
| 177 | + "in the internal workflow logic.\n\nBlocked tasks\n-------------\n" |
| 178 | + + "\n".join(blocked) |
173 | 179 | )
|
174 | 180 | for task in tasks:
|
175 | 181 | # grab inputs if needed
|
@@ -281,3 +287,33 @@ async def prepare_runnable_with_state(runnable):
|
281 | 287 | runnable.state.prepare_inputs()
|
282 | 288 | logger.debug(f"Expanding {runnable} into {len(runnable.state.states_val)} states")
|
283 | 289 | return runnable.pickle_task()
|
| 290 | + |
| 291 | + |
| 292 | +def _list_blocked_tasks(graph): |
| 293 | + """Generates a list of tasks that can't be run and predecessors that are blocking |
| 294 | + them to help debugging of broken workflows""" |
| 295 | + blocked = [] |
| 296 | + for tsk in graph.sorted_nodes: |
| 297 | + blocking = [] |
| 298 | + for pred in graph.predecessors[tsk.name]: |
| 299 | + if not pred.done: |
| 300 | + matching_name = [] |
| 301 | + for cache_loc in tsk.cache_locations: |
| 302 | + for tsk_work_dir in cache_loc.iterdir(): |
| 303 | + if (tsk_work_dir / "_task.pklz").exists(): |
| 304 | + with open(tsk_work_dir / "_task.pklz", "rb") as f: |
| 305 | + saved_tsk = pickle.load(f) |
| 306 | + if saved_tsk.name == pred.name: |
| 307 | + matching_name.append( |
| 308 | + f"{saved_tsk.name} ({tsk_work_dir.name})" |
| 309 | + ) |
| 310 | + blocking.append(pred, ", ".join(matching_name)) |
| 311 | + if blocking: |
| 312 | + blocked.append( |
| 313 | + f"\n{tsk.name} ({tsk.checksum}) is blocked by " |
| 314 | + + "; ".join( |
| 315 | + f"{pred.name} ({pred.checksum}), which matches names of [{matching}]" |
| 316 | + for pred, matching in blocking |
| 317 | + ) |
| 318 | + ) |
| 319 | + return blocked |
0 commit comments