Skip to content

Commit aedd419

Browse files
restart timeout: fire event for stop after cycle point scenarios
* If a workflow hits the `final cycle point` and is restarted, it is considered complete. * If a `restart timeout` is configured, the scheduler will stay up for a configured period. * However, if a workflow hits the `stop after after cycle point` and is restarted, the `restart timeout` event is not yielded and the scheduler will instantly shut down again.
1 parent df72a56 commit aedd419

File tree

3 files changed

+80
-17
lines changed

3 files changed

+80
-17
lines changed

changes.d/6903.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Workflows that have hit the `stop after cycle point` will no longer shut down immediately when restart according to the `restart timeout` configuration.

cylc/flow/scheduler.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -555,8 +555,22 @@ async def configure(self, params):
555555
timer.reset()
556556
self.timers[event] = timer
557557

558-
if self.is_restart and not self.pool.get_tasks():
559-
# This workflow completed before restart; wait for intervention.
558+
if self.is_restart and (
559+
# workflow has completed
560+
not self.pool.get_tasks()
561+
# workflow has hit the "stop after cycle point"
562+
or (
563+
self.config.stop_point
564+
and all(
565+
cycle > self.config.stop_point
566+
for cycle in {
567+
itask.point for itask in self.pool.get_tasks()
568+
}
569+
)
570+
)
571+
):
572+
# This workflow will shut down immediately once restarted
573+
# => Give the user a grace period to intervene first
560574
with suppress(KeyError):
561575
self.timers[self.EVENT_RESTART_TIMEOUT].reset()
562576
self.is_restart_timeout_wait = True

tests/integration/test_workflow_events.py

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,23 @@ async def test_scheduler(flow, scheduler, capcall):
5151
def get_events():
5252
return {e[0][1] for e in events}
5353

54-
def _schd(config=None, **opts):
55-
id_ = flow({
56-
'scheduler': {
57-
'events': {
58-
'mail events': ', '.join(EVENTS),
59-
**(config or {}),
54+
def _schd(event_config=None, config=None, **opts):
55+
assert not (event_config and config)
56+
if not config:
57+
config = {
58+
'scheduler': {
59+
'events': {
60+
'mail events': ', '.join(EVENTS),
61+
**(event_config or {}),
62+
},
6063
},
61-
},
62-
'scheduling': {
63-
'graph': {
64-
'R1': 'a'
65-
}
66-
},
67-
})
64+
'scheduling': {
65+
'graph': {
66+
'R1': 'a'
67+
}
68+
},
69+
}
70+
id_ = flow(config)
6871
schd = scheduler(id_, **opts)
6972
schd.get_events = get_events
7073
return schd
@@ -168,8 +171,13 @@ async def test_stall(test_scheduler, start):
168171
assert schd.get_events() == {'shutdown', 'stall'}
169172

170173

171-
async def test_restart_timeout(test_scheduler, scheduler, run, complete):
172-
"""Test restart timeout.
174+
async def test_restart_timeout_workflow_completion(
175+
test_scheduler,
176+
scheduler,
177+
run,
178+
complete,
179+
):
180+
"""Test restart timeout for completed workflows.
173181
174182
This should fire when a completed workflow is restarted.
175183
"""
@@ -188,6 +196,46 @@ async def test_restart_timeout(test_scheduler, scheduler, run, complete):
188196
assert schd2.get_events() == {'startup', 'restart timeout', 'shutdown'}
189197

190198

199+
async def test_restart_timeout_workflow_stop_after_cycle_point(
200+
test_scheduler,
201+
scheduler,
202+
run,
203+
complete,
204+
):
205+
"""Test restart timeout with the "stop after cycle point" config.
206+
207+
This should fire when a completed workflow is restarted.
208+
"""
209+
schd = test_scheduler(
210+
config={
211+
'scheduler': {
212+
'cycle point format': 'CCYY',
213+
'events': {'restart timeout': 'PT0S'},
214+
},
215+
'scheduling': {
216+
'initial cycle point': '2000',
217+
'stop after cycle point': '2000',
218+
'graph': {
219+
'P1Y': 'foo[-P1Y] => foo',
220+
},
221+
},
222+
},
223+
paused_start=False,
224+
)
225+
226+
# run to completion
227+
async with run(schd):
228+
await complete(schd)
229+
assert schd.get_events() == {'startup', 'shutdown'}
230+
231+
# restart
232+
schd2 = scheduler(schd.workflow)
233+
schd2.get_events = schd.get_events
234+
async with run(schd2):
235+
await asyncio.sleep(0.1)
236+
assert schd2.get_events() == {'startup', 'restart timeout', 'shutdown'}
237+
238+
191239
async def test_shutdown_handler_timeout_kill(
192240
test_scheduler, run, monkeypatch, mock_glbl_cfg, caplog
193241
):

0 commit comments

Comments
 (0)