Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/7056.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes the interaction between retrying tasks and the expired state.
6 changes: 6 additions & 0 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,14 @@ def _process_message_check(
)
return False

# Ignore non-expire messages if task is waiting with a retry lined up.
# Waiting tasks normally advance to a new state due to any message, but
# the retry implies late arrival after task failure (e.g. a delayed
# poll result). Task expire messages are internal, so excluded from
# this.
if (
itask.state(TASK_STATUS_WAITING)
and message != TASK_OUTPUT_EXPIRED
# Polling in live mode only:
and itask.run_mode == RunMode.LIVE
and (
Expand Down
80 changes: 63 additions & 17 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from cylc.flow.exceptions import WorkflowConfigError
from cylc.flow.flow_mgr import FLOW_NONE
from cylc.flow.id import TaskTokens, Tokens
from cylc.flow.run_modes import RunMode
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.task_outputs import (
TASK_OUTPUT_FAILED,
Expand Down Expand Up @@ -2289,7 +2290,9 @@ async def list_data_store():
}


@pytest.mark.parametrize('expire_type', ['clock-expire', 'manual'])
@pytest.mark.parametrize(
'expire_type', ['clock-expire', 'manual-expire', 'expire-trigger']
)
async def test_expire_dequeue_with_retries(
flow, scheduler, start, expire_type
):
Expand All @@ -2298,48 +2301,91 @@ async def test_expire_dequeue_with_retries(
See https://github.com/cylc/cylc-flow/issues/6284
"""
conf = {
'scheduler': {
'experimental': {
'expire triggers': True,
},
},
'scheduling': {
'initial cycle point': '2000',

'graph': {
'R1': 'foo'
},
},
'runtime': {
'foo': {
'execution retry delays': 'PT0S'
}
}
'execution retry delays': 'PT0S',
'outputs': {
'x': 'xxx',
},
},
},
}

if expire_type == 'clock-expire':
# configure foo to clock-expire
conf['scheduling']['special tasks'] = {'clock-expire': 'foo(PT0S)'}
method = lambda schd: schd.pool.clock_expire_tasks()

# run the clock-expire logic
def method(schd):
schd.pool.clock_expire_tasks()

elif expire_type == 'manual-expire':
# run the "cylc set" command to expire "foo"
def method(schd):
schd.pool.set_prereqs_and_outputs(
{TaskTokens('20000101T0000Z', 'foo')},
prereqs=[],
outputs=['expired'],
flow=['1'],
)

else:
method = lambda schd: schd.pool.set_prereqs_and_outputs(
{TaskTokens('20000101T0000Z', 'foo')},
prereqs=[],
outputs=['expired'],
flow=['1'],
)
# configure the task to be expire-triggered once "bar" succeeds
conf['scheduling']['graph']['R1'] = '''
foo:x => bar
foo:x & bar => !foo
'''

# run the "cylc set" command to succeed "bar"
def method(schd):
schd.pool.set_prereqs_and_outputs(
{TaskTokens('20000101T0000Z', 'bar')},
prereqs=[],
outputs=['succeeded'],
flow=['1'],
)

id_ = flow(conf)
schd = scheduler(id_)
schd = scheduler(id_, run_mode='live')
schd: Scheduler
async with start(schd):
itask = schd.pool.get_tasks()[0]
foo = schd.pool._get_task_by_id('20000101T0000Z/foo')
assert foo

# fake a real submission failure
# NOTE: yes, all of these things are needed for a valid test!
# Try removing the "force=True" added in this commit and ensure the
# "clock-expire" test fails before changing anything here!
foo.submit_num += 1
foo.run_mode = RunMode.LIVE
schd.task_job_mgr._set_retry_timers(foo, foo.tdef.rtconfig)
schd.task_events_mgr.process_message(foo, 0, 'started')
schd.task_events_mgr.process_message(foo, 0, 'xxx')
schd.task_events_mgr.process_message(foo, 0, 'failed')
schd.task_events_mgr._retry_task(foo, 0)

# the task should start as "waiting(queued)"
assert itask.state(TASK_STATUS_WAITING, is_queued=True)
assert foo.state(TASK_STATUS_WAITING, is_queued=True)

# expire the task via whichever method we are testing
method(schd)

# the task should enter the "expired" state
assert itask.state(TASK_STATUS_EXPIRED, is_queued=False)
assert foo.state(TASK_STATUS_EXPIRED, is_queued=False)

# the task should also have been removed from the queue
assert not schd.pool.task_queue_mgr.remove_task(itask)
assert not schd.pool.task_queue_mgr.remove_task(foo)


async def test_clock_expire_with_sequential_xtriggers(
Expand Down