From 876cd067cc30aa758b23b5610476d9450e584cdb Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 18 Mar 2026 12:45:19 +1300 Subject: [PATCH] Fix parentless spawning. --- cylc/flow/scheduler.py | 7 +++---- cylc/flow/task_pool.py | 42 +++++++++++++++++++++++++----------------- cylc/flow/taskdef.py | 16 ++++++++++++++++ 3 files changed, 44 insertions(+), 21 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 1223c23f08..158a70337d 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1717,11 +1717,10 @@ async def _main_loop(self) -> None: await self.update_data_structure() if has_updated: - if not self.is_reloaded: + if not self.is_reloaded and self.is_stalled: # (A reload cannot un-stall workflow by itself) - if self.is_stalled: - self.is_stalled = False - self.update_data_store() + self.is_stalled = False + self.update_data_store() self.is_reloaded = False # Reset workflow and task updated flags. diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 94835df506..67f55d16e5 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -850,23 +850,31 @@ def spawn_to_rh_limit( if self.runahead_limit_point is None: return - is_xtrig_sequential = False - while point is not None and (point <= self.runahead_limit_point): - if tdef.is_parentless(point, cutoff=self.config.start_point): - ntask, is_in_pool, is_xtrig_sequential = ( - self.get_or_spawn_task(point, tdef, flow_nums) - ) - if ntask is not None: - if not is_in_pool: - self.add_to_pool(ntask) - self.rh_release_and_queue(ntask) - if is_xtrig_sequential: - break - point = tdef.next_point(point) - - # Once more for the runahead-limited task (don't release it). - if not is_xtrig_sequential: - self.spawn_if_parentless(tdef, point, flow_nums) + # Spawn this instance + if tdef.is_parentless(point, cutoff=self.config.start_point): + ntask, is_in_pool, is_xtrig_sequential = ( + self.get_or_spawn_task(point, tdef, flow_nums) + ) + if ntask is not None: + if not is_in_pool: + self.add_to_pool(ntask) + self.rh_release_and_queue(ntask) + if is_xtrig_sequential: + return # TODO? + + # spawn next parentless instance, wherever it is. + point = tdef.next_point_parentless(point) + if point is None: + return + ntask, is_in_pool, _ = ( + self.get_or_spawn_task(point, tdef, flow_nums) + ) + if ntask is not None and not is_in_pool: + self.add_to_pool(ntask) + # if still below the limit, call again. + if point <= self.runahead_limit_point: + self.rh_release_and_queue(ntask) + self.spawn_to_rh_limit(tdef, point, flow_nums) def spawn_if_parentless(self, tdef, point, flow_nums): """Spawn a task if parentless, regardless of runahead limit.""" diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index 359a5d686c..05516ad9b9 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -419,6 +419,22 @@ def next_point(self, point): p_next = min(adjusted) return p_next + def next_point_parentless(self, point): + """Return the next cycle point >= point.""" + p_next = None + adjusted = [] + for seq in self.sequences: + if seq in self.graph_parents: + # has parents here + continue + nxt = seq.get_next_point(point) + if nxt: + # may be None if beyond the sequence bounds + adjusted.append(nxt) + if adjusted: + p_next = min(adjusted) + return p_next + def is_parentless(self, point: 'PointBase', cutoff: 'PointBase') -> bool: """Return True if task has no parents at the given point.