Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1717,11 +1717,10 @@ async def _main_loop(self) -> None:
await self.update_data_structure()

if has_updated:
if not self.is_reloaded:
if not self.is_reloaded and self.is_stalled:
# (A reload cannot un-stall workflow by itself)
if self.is_stalled:
self.is_stalled = False
self.update_data_store()
self.is_stalled = False
self.update_data_store()
self.is_reloaded = False

# Reset workflow and task updated flags.
Expand Down
42 changes: 25 additions & 17 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,23 +850,31 @@ def spawn_to_rh_limit(
if self.runahead_limit_point is None:
return

is_xtrig_sequential = False
while point is not None and (point <= self.runahead_limit_point):
if tdef.is_parentless(point, cutoff=self.config.start_point):
ntask, is_in_pool, is_xtrig_sequential = (
self.get_or_spawn_task(point, tdef, flow_nums)
)
if ntask is not None:
if not is_in_pool:
self.add_to_pool(ntask)
self.rh_release_and_queue(ntask)
if is_xtrig_sequential:
break
point = tdef.next_point(point)

# Once more for the runahead-limited task (don't release it).
if not is_xtrig_sequential:
self.spawn_if_parentless(tdef, point, flow_nums)
# Spawn this instance
if tdef.is_parentless(point, cutoff=self.config.start_point):
ntask, is_in_pool, is_xtrig_sequential = (
self.get_or_spawn_task(point, tdef, flow_nums)
)
if ntask is not None:
if not is_in_pool:
self.add_to_pool(ntask)
self.rh_release_and_queue(ntask)
if is_xtrig_sequential:
return # TODO?

# spawn next parentless instance, wherever it is.
point = tdef.next_point_parentless(point)
if point is None:
return
ntask, is_in_pool, _ = (
self.get_or_spawn_task(point, tdef, flow_nums)
)
if ntask is not None and not is_in_pool:
self.add_to_pool(ntask)
# if still below the limit, call again.
if point <= self.runahead_limit_point:
self.rh_release_and_queue(ntask)
self.spawn_to_rh_limit(tdef, point, flow_nums)

def spawn_if_parentless(self, tdef, point, flow_nums):
"""Spawn a task if parentless, regardless of runahead limit."""
Expand Down
16 changes: 16 additions & 0 deletions cylc/flow/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,22 @@ def next_point(self, point):
p_next = min(adjusted)
return p_next

def next_point_parentless(self, point):
"""Return the next cycle point >= point."""
p_next = None
adjusted = []
for seq in self.sequences:
if seq in self.graph_parents:
# has parents here
continue
nxt = seq.get_next_point(point)
if nxt:
# may be None if beyond the sequence bounds
adjusted.append(nxt)
if adjusted:
p_next = min(adjusted)
return p_next

def is_parentless(self, point: 'PointBase', cutoff: 'PointBase') -> bool:
"""Return True if task has no parents at the given point.

Expand Down
Loading