Skip to content
Merged
3 changes: 3 additions & 0 deletions changes.d/6835.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Reformulate suicide triggers to expire tasks.
Fixes a rare bug that could allow tasks to run after suicide triggering.
The "expired" output is now completed on suicide triggered tasks.
34 changes: 34 additions & 0 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,40 @@ def get_script_common_text(this: str, example: Optional[str] = None):
The default time zone is now ``Z`` instead of the local time of
the first workflow start.
''')
with Conf('experimental', desc='''
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we can have this as a global option too? That way we don't have to alter all our workflows twice (once to test, and once to undo when it goes live)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experimental features are for canary testing. I'm not sure it's a good idea to turn on experimental features by default, that way users don't know what features they are opting into so aren't going to be attentive to any issues caused by them.

I did think we should add a CLI flag though to avoid the need to modify the workflows at all.

Copy link
Copy Markdown
Member

@dwsutherland dwsutherland Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

global configs can be user specific ~/.cylc/flow/8/global.cylc (as we do with out oper/test role users)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point - a user with a lot of workflows might well want to opt in for all of them at once.

We could just recommend against doing it centrally in site config.

Copy link
Copy Markdown
Member

@dwsutherland dwsutherland Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wouldn't recommend site wide.

Even if I implement it local to my user, I would have to stop and start (or reload?) my workflows for them to pick it up (I believe).
It would save me CLI or workflow mods to do it this way..

Follow on PRs I suppose

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cylc reload -g, --global # also reload global configuration

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Since 8.5.0 I think)

Activate experimental features.

These are preview features which will become the default in future
releases.

.. versionadded:: 8.6.0
'''):
Conf('all', VDR.V_BOOLEAN, False, desc='''
Activate all experimental features.

Encouraged for canary testing.

.. versionadded:: 8.6.0
''')
Conf('expire triggers', VDR.V_BOOLEAN, False, desc='''
This reimplements "suicide triggers" as "expire triggers".

* When the condition is met, the task will generate the
``expired`` output rather than just being removed.
* The triggered task's
`flow.cylc[runtime][<namespace>]completion condition`
will be automatically modified so that expiry completes the
task's outputs.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I think this PR currently has a buggy interaction with the completion condition configuration as the logic added to make expiry optional wont apply if a completion condition is specified.

I've just tried this out with the following example:

[scheduler]
  [[experimental]]
    all = True

[scheduling]
  [[graph]]
    R1 = """
      a? => b
      a:fail? => !a
    """

[runtime]
  [[a]]
    completion = succeeded or failed
  [[b]]

The result is:

WorkflowConfigError: a:expired is permitted in the graph but is not referenced in the completion expression (so is not permitted by it).
Try: completion = "succeeded or failed or expired"

Which is reasonable, the discrepancy comes out as a validation error rather than a runtime stall which is good.

I think this is ok, although, we should probably clarify the WorkflowConfigError message for this case to make it clear that the "expire trigger" is the cause of this (as the expired output is unlikely to appear in the graph).

Also this bullet point will need to be updated (sorry made an invalid assumption here):

Suggested change
* The triggered task's
`flow.cylc[runtime][<namespace>]completion condition`
will be automatically modified so that expiry completes the
task's outputs.
* The ``expired`` output will be marked as
:term:`optional` for the triggered task.

* This should be functionally equivalent to "suicide triggers"
in that the triggered task will not run.
* However, the triggered task will now be left in the
``expired`` state making it clearer in the GUI/logs that
the task has been triggered in this way.
* It is possible to trigger other tasks off of this ``expired``
output for more advanced failure recovery.

.. versionadded:: 8.6.0
''')

with Conf( # noqa: SIM117 (keep same format)
'main loop',
Expand Down
13 changes: 12 additions & 1 deletion cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import re
from textwrap import wrap
import traceback
from types import SimpleNamespace
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -461,6 +462,7 @@ def __init__(
self.mem_log("config.py: after get(sparse=False)")

# These 2 must be called before call to init_cyclers(self.cfg):
self.set_experimental_features()
self.process_utc_mode()
self.process_cycle_point_tz()

Expand Down Expand Up @@ -614,6 +616,14 @@ def __init__(

skip_mode_validate(self.taskdefs)

def set_experimental_features(self):
all_ = self.cfg['scheduler']['experimental']['all']
self.experimental = SimpleNamespace(**{
key.replace(' ', '_'): value or all_
for key, value in self.cfg['scheduler']['experimental'].items()
if key != 'all'
})

@staticmethod
def _warn_if_queues_have_implicit_tasks(
config, taskdefs, max_warning_lines
Expand Down Expand Up @@ -2334,7 +2344,8 @@ def load_graph(self):
parser = GraphParser(
family_map,
self.parameters,
task_output_opt=task_output_opt
task_output_opt=task_output_opt,
expire_triggers=self.experimental.expire_triggers,
)
parser.parse_graph(graph)
task_output_opt.update(parser.task_output_opt)
Expand Down
12 changes: 9 additions & 3 deletions cylc/flow/graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import contextlib

from typing import (
Set,
Dict,
List,
Tuple,
Optional,
Set,
Tuple,
Union
)

Expand Down Expand Up @@ -264,7 +264,8 @@ def __init__(
family_map: Optional[Dict[str, List[str]]] = None,
parameters: Optional[Dict] = None,
task_output_opt:
Optional[Dict[Tuple[str, str], Tuple[bool, bool, bool]]] = None
Optional[Dict[Tuple[str, str], Tuple[bool, bool, bool]]] = None,
expire_triggers: bool = False,
) -> None:
"""Initialize the graph string parser.

Expand All @@ -283,6 +284,7 @@ def __init__(
self.triggers: Dict = {}
self.original: Dict = {}
self.workflow_state_polling_tasks: Dict = {}
self.expire_triggers = expire_triggers

# Record task outputs as optional or required:
# {(name, output): (is_optional, is_member)}
Expand Down Expand Up @@ -744,6 +746,10 @@ def _set_triggers(
self.original.setdefault(name, {})
self.original[name][expr] = orig_expr

if suicide and self.expire_triggers:
# Make expiry optional for suicide triggered tasks.
self._set_output_opt(name, TASK_OUTPUT_EXPIRED, True, False, False)

def _set_output_opt(
self,
name: str,
Expand Down
10 changes: 8 additions & 2 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1539,7 +1539,13 @@ def spawn_on_output(self, itask: TaskProxy, output: str) -> None:
suicide.append(t)

for c_task in suicide:
self.remove(c_task, self.__class__.SUICIDE_MSG)
if self.config.experimental.expire_triggers:
self.task_queue_mgr.remove_task(c_task)
self.task_events_mgr.process_message(
c_task, logging.WARNING, TASK_OUTPUT_EXPIRED
)
else:
self.remove(c_task, self.__class__.SUICIDE_MSG)

if suicide:
# Update DB now in case of very quick respawn attempt.
Expand Down Expand Up @@ -1825,7 +1831,7 @@ def spawn_task(
# revive as incomplete.
msg = "incomplete"

if cylc.flow.flags.verbosity >= 1:
if LOG.level <= logging.DEBUG:
# avoid unnecessary compute when we are not in debug mode
id_ = itask.tokens.duplicate(
task_sel=prev_status
Expand Down
64 changes: 64 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2017,6 +2017,70 @@ async def test_remove_active_task(
)


async def test_remove_by_expire_trigger(
flow,
scheduler,
start,
log_filter
):
"""Test task removal by suicide trigger.

* Suicide triggers should remove tasks from the pool.
* It should be possible to bring them back by manually triggering them.
* Removing a task manually (cylc remove) should work the same.
"""
id_ = flow({
'scheduler': {
'experimental': {
'expire triggers': 'True',
}
},
'scheduling': {
'graph': {
'R1': '''
a? => b
a:failed? => !b
'''
},
}
})
schd: 'Scheduler' = scheduler(id_, paused_start=False)

async with start(schd, level=logging.DEBUG) as log:
# it should start up with 1/a
assert schd.pool.get_task_ids() == {"1/a"}
a = schd.pool.get_task(IntegerPoint("1"), "a")

# mark 1/a as failed and check that 1/b expires
schd.pool.spawn_on_output(a, TASK_OUTPUT_FAILED)
assert log_filter(regex="1/b.*=> expired")
assert schd.pool.get_task_ids() == {"1/a"}

# 1/b should not be resurrected if it becomes ready
schd.pool.set_prereqs_and_outputs(['1/b'], [], ["1/a"], [1],)
assert log_filter(regex="1/b:expired.* already finished and completed")

# but we can still resurrect 1/b by triggering it
log.clear()
await commands.run_cmd(
commands.force_trigger_tasks(schd, ['1/b'], ['1']))
assert log_filter(regex='1/b.*added to the n=0 window')

# remove 1/b with "cylc remove""
await commands.run_cmd(
commands.remove_tasks(schd, ['1/b'], [])
)
assert log_filter(
regex='1/b.*removed from the n=0 window: request',
)

# and bring 1/b back again by triggering it again
log.clear()
await commands.run_cmd(
commands.force_trigger_tasks(schd, ['1/b'], ['1']))
assert log_filter(regex='1/b.*added to the n=0 window',)


async def test_remove_by_suicide(
flow,
scheduler,
Expand Down
Loading