Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6903.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Workflows that have hit the `stop after cycle point` will no longer shut down immediately when restart according to the `restart timeout` configuration.
30 changes: 28 additions & 2 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,14 +370,40 @@
''',
'restart timeout': '''
How long to wait for intervention on restarting a completed workflow.
The timer stops if any task is triggered.

When a workflow reaches the end of the :term:`graph`, it will
:term:`shut down <shutdown>` automatically. We call such workflows
:ref:`completed <workflow completion>` as there are no more tasks for
Cylc to run.

Completed workflows can be caused by:

* Cylc reaching the end of the :term:`graph`.
* The workflow reaching the
:cylc:conf:`flow.cylc[scheduling]final cycle point`.
* The workflow reaching the
:cylc:conf:`flow.cylc[scheduling]stop after cycle point`.
* Tasks being manually removed :ref:`interventions.remove_tasks`.

When you restart a completed workflow, it will detect that there are no
more tasks to run, and shut itself down again. The ``restart timeout``
delays this shutdown for a configured period allowing you to trigger
more task(s) to run.

.. seealso::

:ref:`user_guide.scheduler.workflow_events`
* :ref:`user_guide.scheduler.workflow_events`
* :ref:`workflow completion`
* :ref:`examples.extending-workflow`

.. versionadded:: 8.2.0

.. versionchanged:: 8.5.2

The ``restart timeout`` is now also activated for workflows that
have hit the
:cylc:conf:`flow.cylc[scheduling]stop after cycle point`.

'''
}

Expand Down
18 changes: 16 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,22 @@ async def configure(self, params):
timer.reset()
self.timers[event] = timer

if self.is_restart and not self.pool.get_tasks():
# This workflow completed before restart; wait for intervention.
if self.is_restart and (
# workflow has completed
not self.pool.get_tasks()
# workflow has hit the "stop after cycle point"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest that this should be documented in globalcfg.py since this setting now applies to stopped as well as completed workflows.

Copy link
Copy Markdown
Member Author

@oliver-sanders oliver-sanders Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 8e3afc7

or (
self.config.stop_point
and all(
cycle > self.config.stop_point
for cycle in {
itask.point for itask in self.pool.get_tasks()
}
)
)
):
# This workflow will shut down immediately once restarted
# => Give the user a grace period to intervene first
with suppress(KeyError):
self.timers[self.EVENT_RESTART_TIMEOUT].reset()
self.is_restart_timeout_wait = True
Expand Down
3 changes: 3 additions & 0 deletions tests/functional/restart/08-stop-after-cycle-point/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ description = """
[scheduler]
UTC mode = True
cycle point format = %Y
[[events]]
# prevent workflow hanging if restarted with nothing more to do
restart timeout = PT0S

[scheduling]
runahead limit = P0
Expand Down
78 changes: 63 additions & 15 deletions tests/integration/test_workflow_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,23 @@ async def test_scheduler(flow, scheduler, capcall):
def get_events():
return {e[0][1] for e in events}

def _schd(config=None, **opts):
id_ = flow({
'scheduler': {
'events': {
'mail events': ', '.join(EVENTS),
**(config or {}),
def _schd(event_config=None, config=None, **opts):
assert not (event_config and config)
if not config:
config = {
'scheduler': {
'events': {
'mail events': ', '.join(EVENTS),
**(event_config or {}),
},
},
},
'scheduling': {
'graph': {
'R1': 'a'
}
},
})
'scheduling': {
'graph': {
'R1': 'a'
}
},
}
id_ = flow(config)
schd = scheduler(id_, **opts)
schd.get_events = get_events
return schd
Expand Down Expand Up @@ -168,8 +171,13 @@ async def test_stall(test_scheduler, start):
assert schd.get_events() == {'shutdown', 'stall'}


async def test_restart_timeout(test_scheduler, scheduler, run, complete):
"""Test restart timeout.
async def test_restart_timeout_workflow_completion(
test_scheduler,
scheduler,
run,
complete,
):
"""Test restart timeout for completed workflows.

This should fire when a completed workflow is restarted.
"""
Expand All @@ -188,6 +196,46 @@ async def test_restart_timeout(test_scheduler, scheduler, run, complete):
assert schd2.get_events() == {'startup', 'restart timeout', 'shutdown'}


async def test_restart_timeout_workflow_stop_after_cycle_point(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy of above test, shimmed to test stop after cycle point.

test_scheduler,
scheduler,
run,
complete,
):
"""Test restart timeout with the "stop after cycle point" config.

This should fire when a completed workflow is restarted.
"""
schd = test_scheduler(
config={
'scheduler': {
'cycle point format': 'CCYY',
'events': {'restart timeout': 'PT0S'},
},
'scheduling': {
'initial cycle point': '2000',
'stop after cycle point': '2000',
'graph': {
'P1Y': 'foo[-P1Y] => foo',
},
},
},
paused_start=False,
)

# run to completion
async with run(schd):
await complete(schd)
assert schd.get_events() == {'startup', 'shutdown'}

# restart
schd2 = scheduler(schd.workflow)
schd2.get_events = schd.get_events
async with run(schd2):
await asyncio.sleep(0.1)
assert schd2.get_events() == {'startup', 'restart timeout', 'shutdown'}


async def test_shutdown_handler_timeout_kill(
test_scheduler, run, monkeypatch, mock_glbl_cfg, caplog
):
Expand Down
Loading