Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/7101.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed manual group triggering of tasks before the start cycle point not obeying prerequisites within the group.
15 changes: 10 additions & 5 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ def _remove_matched_tasks(
schd: 'Scheduler',
ids: Set[TaskTokens],
flow_nums: 'FlowNums',
warn_unremovable: bool = True,
):
"""Remove matched tasks."""
# Mapping of *relative* task IDs to removed flow numbers:
Expand Down Expand Up @@ -267,15 +268,14 @@ def _remove_matched_tasks(
)
LOG.info(f"Removed tasks: {', '.join(sorted(tasks_str_list))}")

if not_removed:
if warn_unremovable and not_removed:
fnums_str = (
repr_flow_nums(flow_nums, full=True) if flow_nums else ''
)
tasks_str = ', '.join(
sorted(tokens.relative_id for tokens in not_removed)
)
# This often does not indicate an error - e.g. for group trigger.
LOG.debug(f"Task(s) not removable: {tasks_str} {fnums_str}")
LOG.warning(f"Task(s) not removable: {tasks_str} {fnums_str}")

if removed and schd.pool.compute_runahead():
schd.pool.release_runahead_tasks()
Expand Down Expand Up @@ -806,7 +806,12 @@ def _force_trigger_tasks(
# Remove all inactive and selected active group members.
if flow != [FLOW_NONE]:
# (No need to remove tasks if triggering with no-flow).
_remove_matched_tasks(schd, {*active_to_remove, *inactive}, flow_nums)
_remove_matched_tasks(
schd,
{*active_to_remove, *inactive},
flow_nums,
warn_unremovable=False,
)

# trigger should override the held state, however, in-group tasks may
# have previously been held and active in-group tasks will become
Expand All @@ -827,7 +832,7 @@ def _force_trigger_tasks(
icycle = get_point(id_['cycle'])
in_flow_prereqs = False
jtask: Optional[TaskProxy] = None
if tdef.is_parentless(icycle):
if tdef.is_parentless(icycle, cutoff=schd.config.initial_point):
# Parentless: set all prereqs to spawn the task.
jtask = schd.pool._set_prereqs_tdef(
icycle, tdef,
Expand Down
16 changes: 16 additions & 0 deletions cylc/flow/cycling/integer.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,22 @@ def __lt__(self, other: 'IntegerSequence') -> bool:
return True
return False

def __repr__(self) -> str:
"""
>>> IntegerSequence('R6/3/P1', '2')
<IntegerSequence start=3, stop=8, step=P1, self.p_context_start=2,
i_offset=P0>
"""
ret = f"start={self.p_start}, stop={self.p_stop}, step={self.i_step}"
if self.p_context_start not in {self.p_start, None}:
ret += f", {self.p_context_start=}"
if self.p_context_stop not in {self.p_stop, None}:
ret += f", {self.p_context_stop=}"
for attr in ('i_offset', 'exclusions'):
if (value := getattr(self, attr)) is not None:
ret += f", {attr}={value}"
return f"<{type(self).__name__} {ret}>"


def init_from_cfg(_):
"""Placeholder function required by all cycling modules."""
Expand Down
16 changes: 11 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,9 +836,11 @@ def spawn_to_rh_limit(
longer parentless, and/or hit the runahead limit.

"""
if not flow_nums or point is None:
# Force-triggered no-flow task.
# Or called with an invalid next_point.
if (
not flow_nums # Force-triggered no-flow task
or point is None # Reached end of sequence?
or point < self.config.start_point # Warm start
):
return
if self.runahead_limit_point is None:
self.compute_runahead()
Expand All @@ -847,7 +849,7 @@ def spawn_to_rh_limit(

is_xtrig_sequential = False
while point is not None and (point <= self.runahead_limit_point):
if tdef.is_parentless(point):
if tdef.is_parentless(point, cutoff=self.config.start_point):
ntask, is_in_pool, is_xtrig_sequential = (
self.get_or_spawn_task(point, tdef, flow_nums)
)
Expand All @@ -865,7 +867,11 @@ def spawn_to_rh_limit(

def spawn_if_parentless(self, tdef, point, flow_nums):
"""Spawn a task if parentless, regardless of runahead limit."""
if flow_nums and point is not None and tdef.is_parentless(point):
if (
flow_nums
and point is not None
and tdef.is_parentless(point, cutoff=self.config.start_point)
):
ntask, is_in_pool, _ = self.get_or_spawn_task(
point, tdef, flow_nums
)
Expand Down
10 changes: 6 additions & 4 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ def __init__(
self,
scheduler_tokens: 'Tokens',
tdef: 'TaskDef',
start_point: 'PointBase',
flow_nums: Optional['FlowNums'] = None,
point: 'PointBase',
flow_nums: 'FlowNums | None' = None,
status: str = TASK_STATUS_WAITING,
is_held: bool = False,
submit_num: int | None = 0,
Expand All @@ -242,7 +242,7 @@ def __init__(
# (don't share flow_nums ref with parent task)
self.flow_nums = flow_nums.copy()
self.flow_wait = flow_wait
self.point = start_point
self.point = point
self.tokens: TaskTokens = scheduler_tokens.duplicate(
cycle=str(self.point),
task=self.tdef.name,
Expand Down Expand Up @@ -294,7 +294,9 @@ def __init__(
# Set xtrigger checking type, which effects parentless spawning.
self.is_xtrigger_sequential = bool(
sequential_xtrigger_labels
and self.tdef.is_parentless(start_point)
and self.tdef.is_parentless(
self.point, cutoff=self.tdef.initial_point
)
and sequential_xtrigger_labels.intersection(self.state.xtriggers)
)

Expand Down
43 changes: 26 additions & 17 deletions cylc/flow/task_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ def get_prerequisite(

# Loop over TaskTrigger instances.
for task_trigger in self.task_triggers:
key = (
task_trigger.get_point(point),
task_trigger.task_name,
task_trigger.output,
)
if task_trigger.cycle_point_offset is not None:
# Compute trigger cycle point from offset.
if task_trigger.offset_is_from_icp:
Expand All @@ -247,30 +252,26 @@ def get_prerequisite(
else:
prereq_offset_point = get_point_relative(
task_trigger.cycle_point_offset, point)
if prereq_offset_point < tdef.initial_point:
# Pre-initial dependency - treat as satisfied
cpre[key] = True
continue
if prereq_offset_point > point:
# Update tdef.max_future_prereq_offset.
prereq_offset = prereq_offset_point - point
if (tdef.max_future_prereq_offset is None or
(prereq_offset >
tdef.max_future_prereq_offset)):
tdef.max_future_prereq_offset = (
prereq_offset)
cpre[(
task_trigger.get_point(point),
task_trigger.task_name,
task_trigger.output
)] = (
(prereq_offset_point < tdef.start_point) &
(point >= tdef.start_point)
if (
tdef.max_future_prereq_offset is None
or prereq_offset > tdef.max_future_prereq_offset
):
tdef.max_future_prereq_offset = prereq_offset
cpre[key] = (
prereq_offset_point < tdef.start_point
and point >= tdef.start_point
)
else:
# Trigger is within the same cycle point.
# Register task message with Prerequisite object.
cpre[(
task_trigger.get_point(point),
task_trigger.task_name,
task_trigger.output,
)] = False
cpre[key] = False
cpre.set_conditional_expr(self.get_expression(point))
return cpre

Expand Down Expand Up @@ -299,6 +300,14 @@ def __str__(self):
ret.append('( %s )' % str(item))
return ' '.join(ret)

def __repr__(self) -> str:
"""
>>> from unittest.mock import Mock
>>> Dependency(exp=[Mock()], task_triggers=[Mock()], suicide=False)
<Dependency ...>
"""
return f"<{type(self).__name__} {self}>"

@classmethod
def _stringify_list(cls, nested_expr, point):
"""Stringify a nested list of TaskTrigger objects."""
Expand Down
14 changes: 10 additions & 4 deletions cylc/flow/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,18 +419,24 @@ def next_point(self, point):
p_next = min(adjusted)
return p_next

def is_parentless(self, point):
"""Return True if task has no parents at point.
def is_parentless(self, point: 'PointBase', cutoff: 'PointBase') -> bool:
"""Return True if task has no parents at the given point.

Tasks are considered parentless if they have:
- no parents at all
- all parents < initial cycle point
- all parents < cutoff cycle point
- only absolute triggers

Absolute-triggered tasks are auto-spawned like true parentless tasks,
(once the trigger is satisfied they are effectively parentless) but
with a prerequisite that gets satisfied when the absolute output is
completed at runtime.

Args:
point: The cycle point to check.
cutoff: This should be the start cycle point for the startup
spawning, or the intial cycle point for manually triggered
tasks.
"""
if not self.graph_parents:
# No parents at any point
Expand All @@ -441,7 +447,7 @@ def is_parentless(self, point):
parent_points = self.get_parent_points(point)
return (
not parent_points
or all(x < self.start_point for x in parent_points)
or all(x < cutoff for x in parent_points)
or self.has_only_abs_triggers(point)
)

Expand Down
37 changes: 37 additions & 0 deletions tests/integration/test_force_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,3 +742,40 @@ def _force_trigger_tasks(_schd, ids, *_, **__):
await run_cmd(force_trigger_tasks(schd, ['*:failed'], []))
assert trigger_calls == [{'1/d'}]
trigger_calls.clear()


async def test_pre_warm_start_group_trigger(flow, scheduler, run, complete):
"""Group-triggered tasks that are before the start point (in a warm-started
workflow) should run in order.

https://github.com/cylc/cylc-flow/pull/7101
"""
schd: Scheduler = scheduler(
flow({
'scheduling': {
'cycling mode': 'integer',
'runahead limit': 'P2',
'graph': {
'R1': 'c1 => c2 => c3 => foo',
'P1': 'foo[-P1] => foo',
},
},
'runtime': {
'COLD': {},
**{f'c{n}': {'inherit': 'COLD'} for n in (1, 2, 3)},
},
}),
paused_start=False,
startcp='5'
)
async with run(schd):
schd.pool.set_hold_point(IntegerPoint('4'))

await run_cmd(force_trigger_tasks(schd, ['1/COLD'], []))
assert schd.pool.get_task_ids() == {'1/c1', '5/foo'}

await complete(schd, '1/c1', timeout=10)
assert schd.pool.get_task_ids() == {'1/c2', '5/foo'}
assert schd.pool._get_task_by_id('1/c2').state(TASK_STATUS_WAITING)

await complete(schd, '1/c2', '1/c3', timeout=10)
12 changes: 5 additions & 7 deletions tests/integration/test_remove.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging

import pytest

from cylc.flow import CYLC_LOG
from cylc.flow.commands import (
force_trigger_tasks,
reload_workflow,
Expand Down Expand Up @@ -261,15 +261,14 @@ async def test_logging(
# Invalid tasks:
'2005/a', '2000/doh',
]
async with start(schd) as log:
log.set_level(logging.DEBUG, CYLC_LOG)
async with start(schd):
await run_cmd(remove_tasks(schd, tasks_to_remove, []))

assert log_filter(
logging.INFO, "Removed tasks: 2000/a (flows=1), 2000/b (flows=1)"
)

assert log_filter(logging.DEBUG, "Task(s) not removable: 2001/a, 2001/b")
assert log_filter(logging.WARNING, "Task(s) not removable: 2001/a, 2001/b")
assert log_filter(logging.WARNING, "Invalid cycle point for task: a, 2005")
assert log_filter(
logging.WARNING, "No tasks match the IDs:\n* 2000/doh\n* 2005/a"
Expand All @@ -283,13 +282,12 @@ async def test_logging_flow_nums(
):
"""Test logging of task removals involving flow numbers."""
schd: Scheduler = scheduler(example_workflow)
async with start(schd) as log:
log.set_level(logging.DEBUG, CYLC_LOG)
async with start(schd):
await run_cmd(force_trigger_tasks(schd, ['1/a1'], ['1', '2']))
# Removing from flow that doesn't exist doesn't work:
await run_cmd(remove_tasks(schd, ['1/a1'], ['3']))
assert log_filter(
logging.DEBUG, "Task(s) not removable: 1/a1 (flows=3)"
logging.WARNING, "Task(s) not removable: 1/a1 (flows=3)"
)

# But if a valid flow is included, it will be removed from that flow:
Expand Down
15 changes: 15 additions & 0 deletions tests/unit/parsec/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
Loading