Skip to content

Commit fcf54ea

Browse files
expire: fix interaction with automatic retries
* Closes #6717
1 parent 113a724 commit fcf54ea

File tree

5 files changed

+71
-22
lines changed

5 files changed

+71
-22
lines changed

changes.d/7056.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixes the interaction between retrying tasks and the expired state.

cylc/flow/etc/examples/expiry/.validate

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ test_one () {
3030
./one
3131

3232
# the start task should have expired
33-
grep 'start.*(internal)expired' "$HOME/cylc-run/$ID/log/scheduler/log"
33+
grep 'start.*expired' "$HOME/cylc-run/$ID/log/scheduler/log"
3434

3535
# the following task(s) should not have run
3636
grep 'a.*running' "$HOME/cylc-run/$ID/log/scheduler/log" && exit 1
@@ -60,7 +60,7 @@ test_two () {
6060
grep 'start.*running' "$HOME/cylc-run/$ID/log/scheduler/log"
6161

6262
# some other task in the chain should expire
63-
grep '(internal)expired' "$HOME/cylc-run/$ID/log/scheduler/log"
63+
grep 'expired' "$HOME/cylc-run/$ID/log/scheduler/log"
6464

6565
# the housekeep task at the end of the cycle should not run
6666
grep 'housekeep.*running' "$HOME/cylc-run/$ID/log/scheduler/log" && exit 1
@@ -86,7 +86,7 @@ test_three () {
8686
./three
8787

8888
# the start task should expire
89-
grep 'start.*(internal)expired' "$HOME/cylc-run/$ID/log/scheduler/log"
89+
grep 'start.*expired' "$HOME/cylc-run/$ID/log/scheduler/log"
9090
# shellcheck disable=SC2125 # could only ever be one matching file
9191
local job_file="$HOME/cylc-run/$ID/log/job/"*"/a/NN/job"
9292
[[ ! -f "$job_file" ]]

cylc/flow/task_events_mgr.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,8 @@ def process_message(
718718
The submit number of the task relevant for the message.
719719
If not specified, use latest submit number.
720720
forced:
721-
If this message is due to manual completion or not (cylc set)
721+
True, if this message is due to manual (as opposed to natural)
722+
output completion.
722723
723724
Return:
724725
False: in normal circumstances.

cylc/flow/task_pool.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1581,7 +1581,7 @@ def spawn_on_output(self, itask: TaskProxy, output: str) -> None:
15811581
if self.config.experimental.expire_triggers:
15821582
self.task_queue_mgr.remove_task(c_task)
15831583
self.task_events_mgr.process_message(
1584-
c_task, logging.WARNING, TASK_OUTPUT_EXPIRED
1584+
c_task, logging.WARNING, TASK_OUTPUT_EXPIRED, forced=True
15851585
)
15861586
else:
15871587
self.remove(c_task, self.__class__.SUICIDE_MSG)
@@ -2476,6 +2476,7 @@ def clock_expire_tasks(self):
24762476
itask,
24772477
logging.WARNING,
24782478
TASK_OUTPUT_EXPIRED,
2479+
forced=True,
24792480
)
24802481

24812482
def task_succeeded(self, id_):

tests/integration/test_task_pool.py

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from cylc.flow.exceptions import WorkflowConfigError
4343
from cylc.flow.flow_mgr import FLOW_NONE
4444
from cylc.flow.id import TaskTokens, Tokens
45+
from cylc.flow.run_modes import RunMode
4546
from cylc.flow.task_events_mgr import TaskEventsManager
4647
from cylc.flow.task_outputs import (
4748
TASK_OUTPUT_FAILED,
@@ -2289,7 +2290,9 @@ async def list_data_store():
22892290
}
22902291

22912292

2292-
@pytest.mark.parametrize('expire_type', ['clock-expire', 'manual'])
2293+
@pytest.mark.parametrize(
2294+
'expire_type', ['clock-expire', 'manual-expire', 'expire-trigger']
2295+
)
22932296
async def test_expire_dequeue_with_retries(
22942297
flow, scheduler, start, expire_type
22952298
):
@@ -2298,48 +2301,91 @@ async def test_expire_dequeue_with_retries(
22982301
See https://github.com/cylc/cylc-flow/issues/6284
22992302
"""
23002303
conf = {
2304+
'scheduler': {
2305+
'experimental': {
2306+
'expire triggers': True,
2307+
},
2308+
},
23012309
'scheduling': {
23022310
'initial cycle point': '2000',
2303-
23042311
'graph': {
23052312
'R1': 'foo'
23062313
},
23072314
},
23082315
'runtime': {
23092316
'foo': {
2310-
'execution retry delays': 'PT0S'
2311-
}
2312-
}
2317+
'execution retry delays': 'PT0S',
2318+
'outputs': {
2319+
'x': 'xxx',
2320+
},
2321+
},
2322+
},
23132323
}
23142324

23152325
if expire_type == 'clock-expire':
2326+
# configre foo to clock-expire
23162327
conf['scheduling']['special tasks'] = {'clock-expire': 'foo(PT0S)'}
2317-
method = lambda schd: schd.pool.clock_expire_tasks()
2328+
2329+
# run the clock-expire logic
2330+
def method(schd):
2331+
schd.pool.clock_expire_tasks()
2332+
2333+
elif expire_type == 'manual-expire':
2334+
# run the "cylc set" command to expire "foo"
2335+
def method(schd):
2336+
schd.pool.set_prereqs_and_outputs(
2337+
{TaskTokens('20000101T0000Z', 'foo')},
2338+
prereqs=[],
2339+
outputs=['expired'],
2340+
flow=['1'],
2341+
)
2342+
23182343
else:
2319-
method = lambda schd: schd.pool.set_prereqs_and_outputs(
2320-
{TaskTokens('20000101T0000Z', 'foo')},
2321-
prereqs=[],
2322-
outputs=['expired'],
2323-
flow=['1'],
2324-
)
2344+
# configure the task to be expire-triggered once "bar" succeeds
2345+
conf['scheduling']['graph']['R1'] = '''
2346+
foo:x => bar
2347+
foo:x & bar => !foo
2348+
'''
2349+
2350+
# run the "cylc set" command to succeed "bar"
2351+
def method(schd):
2352+
schd.pool.set_prereqs_and_outputs(
2353+
{TaskTokens('20000101T0000Z', 'bar')},
2354+
prereqs=[],
2355+
outputs=['succeeded'],
2356+
flow=['1'],
2357+
)
23252358

23262359
id_ = flow(conf)
2327-
schd = scheduler(id_)
2360+
schd = scheduler(id_, run_mode='live')
23282361
schd: Scheduler
23292362
async with start(schd):
2330-
itask = schd.pool.get_tasks()[0]
2363+
foo = schd.pool._get_task_by_id('20000101T0000Z/foo')
2364+
assert foo
2365+
2366+
# fake a real submission failure
2367+
# NOTE: yes, all of these things are needed for a valid test!
2368+
# Try removing the "force=True" added in this commit and ensure the
2369+
# "clock-expire" test fails before changing anything here!
2370+
foo.submit_num += 1
2371+
foo.run_mode = RunMode.LIVE
2372+
schd.task_job_mgr._set_retry_timers(foo, foo.tdef.rtconfig)
2373+
schd.task_events_mgr.process_message(foo, 0, 'started')
2374+
schd.task_events_mgr.process_message(foo, 0, 'xxx')
2375+
schd.task_events_mgr.process_message(foo, 0, 'failed')
2376+
schd.task_events_mgr._retry_task(foo, 0)
23312377

23322378
# the task should start as "waiting(queued)"
2333-
assert itask.state(TASK_STATUS_WAITING, is_queued=True)
2379+
assert foo.state(TASK_STATUS_WAITING, is_queued=True)
23342380

23352381
# expire the task via whichever method we are testing
23362382
method(schd)
23372383

23382384
# the task should enter the "expired" state
2339-
assert itask.state(TASK_STATUS_EXPIRED, is_queued=False)
2385+
assert foo.state(TASK_STATUS_EXPIRED, is_queued=False)
23402386

23412387
# the task should also have been removed from the queue
2342-
assert not schd.pool.task_queue_mgr.remove_task(itask)
2388+
assert not schd.pool.task_queue_mgr.remove_task(foo)
23432389

23442390

23452391
async def test_clock_expire_with_sequential_xtriggers(

0 commit comments

Comments
 (0)