Skip to content

Commit 42bc582

Browse files
Merge pull request #7256 from cylc/8.6.x-sync
🤖 Merge 8.6.x-sync into master
2 parents b171201 + 3b0c945 commit 42bc582

File tree

3 files changed

+70
-17
lines changed

3 files changed

+70
-17
lines changed

changes.d/7056.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixes the interaction between retrying tasks and the expired state.

cylc/flow/task_events_mgr.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,8 +930,14 @@ def _process_message_check(
930930
)
931931
return False
932932

933+
# Ignore non-expire messages if task is waiting with a retry lined up.
934+
# Waiting tasks normally advance to a new state due to any message, but
935+
# the retry implies late arrival after task failure (e.g. a delayed
936+
# poll result). Task expire messages are internal, so excluded from
937+
# this.
933938
if (
934939
itask.state(TASK_STATUS_WAITING)
940+
and message != TASK_OUTPUT_EXPIRED
935941
# Polling in live mode only:
936942
and itask.run_mode == RunMode.LIVE
937943
and (

tests/integration/test_task_pool.py

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from cylc.flow.exceptions import WorkflowConfigError
4343
from cylc.flow.flow_mgr import FLOW_NONE
4444
from cylc.flow.id import TaskTokens, Tokens
45+
from cylc.flow.run_modes import RunMode
4546
from cylc.flow.task_events_mgr import TaskEventsManager
4647
from cylc.flow.task_outputs import (
4748
TASK_OUTPUT_FAILED,
@@ -2289,7 +2290,9 @@ async def list_data_store():
22892290
}
22902291

22912292

2292-
@pytest.mark.parametrize('expire_type', ['clock-expire', 'manual'])
2293+
@pytest.mark.parametrize(
2294+
'expire_type', ['clock-expire', 'manual-expire', 'expire-trigger']
2295+
)
22932296
async def test_expire_dequeue_with_retries(
22942297
flow, scheduler, start, expire_type
22952298
):
@@ -2298,48 +2301,91 @@ async def test_expire_dequeue_with_retries(
22982301
See https://github.com/cylc/cylc-flow/issues/6284
22992302
"""
23002303
conf = {
2304+
'scheduler': {
2305+
'experimental': {
2306+
'expire triggers': True,
2307+
},
2308+
},
23012309
'scheduling': {
23022310
'initial cycle point': '2000',
2303-
23042311
'graph': {
23052312
'R1': 'foo'
23062313
},
23072314
},
23082315
'runtime': {
23092316
'foo': {
2310-
'execution retry delays': 'PT0S'
2311-
}
2312-
}
2317+
'execution retry delays': 'PT0S',
2318+
'outputs': {
2319+
'x': 'xxx',
2320+
},
2321+
},
2322+
},
23132323
}
23142324

23152325
if expire_type == 'clock-expire':
2326+
# configure foo to clock-expire
23162327
conf['scheduling']['special tasks'] = {'clock-expire': 'foo(PT0S)'}
2317-
method = lambda schd: schd.pool.clock_expire_tasks()
2328+
2329+
# run the clock-expire logic
2330+
def method(schd):
2331+
schd.pool.clock_expire_tasks()
2332+
2333+
elif expire_type == 'manual-expire':
2334+
# run the "cylc set" command to expire "foo"
2335+
def method(schd):
2336+
schd.pool.set_prereqs_and_outputs(
2337+
{TaskTokens('20000101T0000Z', 'foo')},
2338+
prereqs=[],
2339+
outputs=['expired'],
2340+
flow=['1'],
2341+
)
2342+
23182343
else:
2319-
method = lambda schd: schd.pool.set_prereqs_and_outputs(
2320-
{TaskTokens('20000101T0000Z', 'foo')},
2321-
prereqs=[],
2322-
outputs=['expired'],
2323-
flow=['1'],
2324-
)
2344+
# configure the task to be expire-triggered once "bar" succeeds
2345+
conf['scheduling']['graph']['R1'] = '''
2346+
foo:x => bar
2347+
foo:x & bar => !foo
2348+
'''
2349+
2350+
# run the "cylc set" command to succeed "bar"
2351+
def method(schd):
2352+
schd.pool.set_prereqs_and_outputs(
2353+
{TaskTokens('20000101T0000Z', 'bar')},
2354+
prereqs=[],
2355+
outputs=['succeeded'],
2356+
flow=['1'],
2357+
)
23252358

23262359
id_ = flow(conf)
2327-
schd = scheduler(id_)
2360+
schd = scheduler(id_, run_mode='live')
23282361
schd: Scheduler
23292362
async with start(schd):
2330-
itask = schd.pool.get_tasks()[0]
2363+
foo = schd.pool._get_task_by_id('20000101T0000Z/foo')
2364+
assert foo
2365+
2366+
# fake a real submission failure
2367+
# NOTE: yes, all of these things are needed for a valid test!
2368+
# Try removing the "force=True" added in this commit and ensure the
2369+
# "clock-expire" test fails before changing anything here!
2370+
foo.submit_num += 1
2371+
foo.run_mode = RunMode.LIVE
2372+
schd.task_job_mgr._set_retry_timers(foo, foo.tdef.rtconfig)
2373+
schd.task_events_mgr.process_message(foo, 0, 'started')
2374+
schd.task_events_mgr.process_message(foo, 0, 'xxx')
2375+
schd.task_events_mgr.process_message(foo, 0, 'failed')
2376+
schd.task_events_mgr._retry_task(foo, 0)
23312377

23322378
# the task should start as "waiting(queued)"
2333-
assert itask.state(TASK_STATUS_WAITING, is_queued=True)
2379+
assert foo.state(TASK_STATUS_WAITING, is_queued=True)
23342380

23352381
# expire the task via whichever method we are testing
23362382
method(schd)
23372383

23382384
# the task should enter the "expired" state
2339-
assert itask.state(TASK_STATUS_EXPIRED, is_queued=False)
2385+
assert foo.state(TASK_STATUS_EXPIRED, is_queued=False)
23402386

23412387
# the task should also have been removed from the queue
2342-
assert not schd.pool.task_queue_mgr.remove_task(itask)
2388+
assert not schd.pool.task_queue_mgr.remove_task(foo)
23432389

23442390

23452391
async def test_clock_expire_with_sequential_xtriggers(

0 commit comments

Comments
 (0)