Skip to content
Merged
5 changes: 5 additions & 0 deletions changes.d/6835.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
A new experimental feature that can be switched on in workflow config:
Suicide triggers expire tasks rather than just remove them. This fixes
a bug that could allow tasks to run after suicide triggering. The
"expired" output will automatically be marked as optional for the
task, but custom completion conditions must be adapted accordingly.
34 changes: 34 additions & 0 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,40 @@ def get_script_common_text(this: str, example: Optional[str] = None):
The default time zone is now ``Z`` instead of the local time of
the first workflow start.
''')
with Conf('experimental', desc='''
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we can have this as a global option too? That way we don't have to alter all our workflows twice (once to test, and once to undo when it goes live)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experimental features are for canary testing. I'm not sure it's a good idea to turn on experimental features by default, that way users don't know what features they are opting into so aren't going to be attentive to any issues caused by them.

I did think we should add a CLI flag though to avoid the need to modify the workflows at all.

Copy link
Copy Markdown
Member

@dwsutherland dwsutherland Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

global configs can be user specific ~/.cylc/flow/8/global.cylc (as we do with out oper/test role users)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point - a user with a lot of workflows might well want to opt in for all of them at once.

We could just recommend against doing it centrally in site config.

Copy link
Copy Markdown
Member

@dwsutherland dwsutherland Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wouldn't recommend site wide.

Even if I implement it local to my user, I would have to stop and start (or reload?) my workflows for them to pick it up (I believe).
It would save me CLI or workflow mods to do it this way..

Follow on PRs I suppose

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cylc reload -g, --global # also reload global configuration

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Since 8.5.0 I think)

Activate experimental features.

These are preview features which will become the default in future
releases.

.. versionadded:: 8.6.0
'''):
Conf('all', VDR.V_BOOLEAN, False, desc='''
Activate all experimental features.

Encouraged for canary testing.

.. versionadded:: 8.6.0
''')
Conf('expire triggers', VDR.V_BOOLEAN, False, desc='''
This reimplements "suicide triggers" as "expire triggers".

* When the condition is met, the task will generate the
``expired`` output rather than just being removed.
* The ``expired`` output will be marked as :term:`optional`
for the triggered task, but a custom
`flow.cylc[runtime][<namespace>]completion condition
will need to be modified accordingly.
* This should be functionally equivalent to "suicide triggers"
in that the triggered task will not run.
* However, the triggered task will now be left in the
``expired`` state making it clearer in the GUI/logs that
the task has been triggered in this way.
* It is possible to trigger other tasks off of this ``expired``
output for more advanced failure recovery.

.. versionadded:: 8.6.0
''')

with Conf( # noqa: SIM117 (keep same format)
'main loop',
Expand Down
43 changes: 36 additions & 7 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import re
from textwrap import wrap
import traceback
from types import SimpleNamespace
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -461,6 +462,7 @@ def __init__(
self.mem_log("config.py: after get(sparse=False)")

# These 2 must be called before call to init_cyclers(self.cfg):
self.set_experimental_features()
self.process_utc_mode()
self.process_cycle_point_tz()

Expand Down Expand Up @@ -614,6 +616,14 @@ def __init__(

skip_mode_validate(self.taskdefs)

def set_experimental_features(self):
all_ = self.cfg['scheduler']['experimental']['all']
self.experimental = SimpleNamespace(**{
key.replace(' ', '_'): value or all_
for key, value in self.cfg['scheduler']['experimental'].items()
if key != 'all'
})

@staticmethod
def _warn_if_queues_have_implicit_tasks(
config, taskdefs, max_warning_lines
Expand Down Expand Up @@ -1062,8 +1072,13 @@ def _set_completion_expressions(self):
for name, taskdef in self.taskdefs.items():
expr = taskdef.rtconfig['completion']
if expr:
any_suicide = any(
dep.suicide
for d in taskdef.dependencies.values()
for dep in d
)
# check the user-defined expression
self._check_completion_expression(name, expr)
self._check_completion_expression(name, expr, any_suicide)
else:
# derive a completion expression for this taskdef
expr = get_completion_expression(taskdef)
Expand All @@ -1086,14 +1101,18 @@ def _set_completion_expressions(self):
# on after the TaskDef has been created
taskdef.rtconfig['completion'] = expr

def _check_completion_expression(self, task_name: str, expr: str) -> None:
def _check_completion_expression(
self, task_name: str, expr: str, any_suicide: bool
) -> None:
"""Checks a user-defined completion expression.

Args:
task_name:
The name of the task we are checking.
expr:
The completion expression as defined in the config.
any_suicide:
Does this task have any suicide triggers

"""
# check completion expressions are not being used in compat mode
Expand Down Expand Up @@ -1242,12 +1261,21 @@ def _check_completion_expression(self, task_name: str, expr: str) -> None:
and expr_opt is None
and compvar in {'submit_failed', 'expired'}
):
raise WorkflowConfigError(
msg = (
f'{task_name}:{trigger} is permitted in the graph'
' but is not referenced in the completion'
' expression (so is not permitted by it).'
f'\nTry: completion = "{expr} or {compvar}"'
' but is not referenced in the completion.'
)
if (
any_suicide
and trigger == "expired"
and self.experimental.expire_triggers
):
msg += (
"\nThis may be due to use of an expire "
"(formerly suicide) trigger."
)
msg += f'\nTry: completion = "{expr} or {compvar}"'
raise WorkflowConfigError(msg)

if (
graph_opt is False
Expand Down Expand Up @@ -2334,7 +2362,8 @@ def load_graph(self):
parser = GraphParser(
family_map,
self.parameters,
task_output_opt=task_output_opt
task_output_opt=task_output_opt,
expire_triggers=self.experimental.expire_triggers,
)
parser.parse_graph(graph)
task_output_opt.update(parser.task_output_opt)
Expand Down
12 changes: 9 additions & 3 deletions cylc/flow/graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import contextlib

from typing import (
Set,
Dict,
List,
Tuple,
Optional,
Set,
Tuple,
Union
)

Expand Down Expand Up @@ -264,7 +264,8 @@ def __init__(
family_map: Optional[Dict[str, List[str]]] = None,
parameters: Optional[Dict] = None,
task_output_opt:
Optional[Dict[Tuple[str, str], Tuple[bool, bool, bool]]] = None
Optional[Dict[Tuple[str, str], Tuple[bool, bool, bool]]] = None,
expire_triggers: bool = False,
) -> None:
"""Initialize the graph string parser.

Expand All @@ -283,6 +284,7 @@ def __init__(
self.triggers: Dict = {}
self.original: Dict = {}
self.workflow_state_polling_tasks: Dict = {}
self.expire_triggers = expire_triggers

# Record task outputs as optional or required:
# {(name, output): (is_optional, is_member)}
Expand Down Expand Up @@ -744,6 +746,10 @@ def _set_triggers(
self.original.setdefault(name, {})
self.original[name][expr] = orig_expr

if suicide and self.expire_triggers:
# Make expiry optional for suicide triggered tasks.
self._set_output_opt(name, TASK_OUTPUT_EXPIRED, True, False, False)

def _set_output_opt(
self,
name: str,
Expand Down
10 changes: 8 additions & 2 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1539,7 +1539,13 @@ def spawn_on_output(self, itask: TaskProxy, output: str) -> None:
suicide.append(t)

for c_task in suicide:
self.remove(c_task, self.__class__.SUICIDE_MSG)
if self.config.experimental.expire_triggers:
self.task_queue_mgr.remove_task(c_task)
self.task_events_mgr.process_message(
c_task, logging.WARNING, TASK_OUTPUT_EXPIRED
)
else:
self.remove(c_task, self.__class__.SUICIDE_MSG)

if suicide:
# Update DB now in case of very quick respawn attempt.
Expand Down Expand Up @@ -1825,7 +1831,7 @@ def spawn_task(
# revive as incomplete.
msg = "incomplete"

if cylc.flow.flags.verbosity >= 1:
if LOG.level <= logging.DEBUG:
# avoid unnecessary compute when we are not in debug mode
id_ = itask.tokens.duplicate(
task_sel=prev_status
Expand Down
83 changes: 83 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import pytest
from pytest import param
import re

from cylc.flow import (
CYLC_LOG,
Expand All @@ -37,6 +38,7 @@
from cylc.flow.cycling.iso8601 import ISO8601Point
from cylc.flow.data_messages_pb2 import PbPrerequisite
from cylc.flow.data_store_mgr import TASK_PROXIES
from cylc.flow.exceptions import WorkflowConfigError
from cylc.flow.flow_mgr import FLOW_NONE
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.task_outputs import (
Expand Down Expand Up @@ -2017,6 +2019,87 @@ async def test_remove_active_task(
)


async def test_remove_by_expire_trigger(
flow,
validate,
scheduler,
start,
log_filter
):
"""Test task removal by suicide trigger.

* Suicide triggers should remove tasks from the pool.
* It should be possible to bring them back by manually triggering them.
* Removing a task manually (cylc remove) should work the same.
"""
def _get_id(b_completion: str = "succeeded"):
return flow({
'scheduler': {
'experimental': {
'expire triggers': 'True',
}
},
'scheduling': {
'graph': {
'R1': '''
a? => b
a:failed? => !b
'''
},
},
'runtime': {
'b': {
'completion': b_completion
}
}
})
with pytest.raises(
WorkflowConfigError,
match=re.escape(
"This may be due to use of an expire (formerly suicide) trigger"
)
):
validate(_get_id())

id_ = _get_id("succeeded or expired")
validate(id_)
schd: 'Scheduler' = scheduler(id_, paused_start=False)

async with start(schd, level=logging.DEBUG) as log:
# it should start up with 1/a
assert schd.pool.get_task_ids() == {"1/a"}
a = schd.pool.get_task(IntegerPoint("1"), "a")

# mark 1/a as failed and check that 1/b expires
schd.pool.spawn_on_output(a, TASK_OUTPUT_FAILED)
assert log_filter(regex="1/b.*=> expired")
assert schd.pool.get_task_ids() == {"1/a"}

# 1/b should not be resurrected if it becomes ready
schd.pool.set_prereqs_and_outputs(['1/b'], [], ["1/a"], [1],)
assert log_filter(regex="1/b:expired.* already finished and completed")

# but we can still resurrect 1/b by triggering it
log.clear()
await commands.run_cmd(
commands.force_trigger_tasks(schd, ['1/b'], ['1']))
assert log_filter(regex='1/b.*added to the n=0 window')

# remove 1/b with "cylc remove""
await commands.run_cmd(
commands.remove_tasks(schd, ['1/b'], [])
)
assert log_filter(
regex='1/b.*removed from the n=0 window: request',
)

# and bring 1/b back again by triggering it again
log.clear()
await commands.run_cmd(
commands.force_trigger_tasks(schd, ['1/b'], ['1']))
assert log_filter(regex='1/b.*added to the n=0 window',)


async def test_remove_by_suicide(
flow,
scheduler,
Expand Down