Skip to content

Commit e708b33

Browse files
committed
Avoid spurious logging on restart with waiting parentless tasks.
1 parent 920c315 commit e708b33

File tree

2 files changed

+55
-9
lines changed

2 files changed

+55
-9
lines changed

cylc/flow/task_pool.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -266,12 +266,22 @@ def db_add_new_flow_rows(self, itask: TaskProxy) -> None:
266266
self.workflow_db_mgr.put_insert_task_outputs(itask)
267267

268268
def add_to_pool(self, itask) -> None:
269-
"""Add a task to the pool."""
269+
"""Add a task to the pool, if not already added."""
270270

271271
self.active_tasks.setdefault(itask.point, {})
272+
if itask.identity in self.active_tasks[itask.point]:
273+
# This is normal on restart with waiting parentless tasks that
274+
# auto-spawned to the runahead limit before shutdown (e.g. if
275+
# you shut down immediately after starting up paused). Each
276+
# will be resurrected from the DB again *and* try to auto-spawn
277+
# again because we don't record that parentless tasks spawned.
278+
# (But debug-log it anyway for any unexpected cases).
279+
LOG.debug(f"{itask.identity} not added to n=0: already exists")
280+
return None
281+
272282
self.active_tasks[itask.point][itask.identity] = itask
273283
self.active_tasks_changed = True
274-
LOG.debug(f"[{itask}] added to the n=0 window")
284+
LOG.info(f"[{itask}] added to the n=0 window")
275285

276286
self.create_data_store_elements(itask)
277287

@@ -1834,13 +1844,20 @@ def spawn_task(
18341844
if (
18351845
prev_status is not None
18361846
and not itask.state.outputs.get_completed_outputs()
1847+
and not self.config.experimental.expire_triggers
18371848
):
1838-
# If itask has any history in this flow but no completed outputs
1839-
# we can infer it has just been deliberately removed (N.B. not
1840-
# by `cylc remove`), so don't immediately respawn it.
1841-
# TODO (follow-up work):
1842-
# - this logic fails if task removed after some outputs completed
1843-
LOG.info(f"Not respawning {point}/{name} - task was removed")
1849+
# If itask has history but no completed outputs, it was removed
1850+
# by suicide trigger (not by `cylc remove` which erases history).
1851+
# This logic fails if suicided after completion of outputs!
1852+
1853+
# NOTE: redoing suicide triggers as expire triggers fixed this.
1854+
# TODO: remove this code once that's no longer experimental.
1855+
# Until then, this code also prevents double-spawning of waiting
1856+
# parentless tasks at restart if experimental expire triggers are
1857+
# off (but that is also handled properly without this block - by
1858+
# not adding tasks to the pool if already added (see comments in
1859+
# the add_to_pool method).
1860+
LOG.debug(f"Not respawning {point}/{name}")
18441861
return None
18451862

18461863
if prev_status in TASK_STATUSES_FINAL:

tests/integration/test_task_pool.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1951,13 +1951,14 @@ async def test_fast_respawn(
19511951
foo = task_pool.get_task(IntegerPoint("1"), "foo")
19521952

19531953
# remove it from the pool
1954+
# (this is not the higher level "cylc remove" that would erase history)
19541955
task_pool.remove(foo)
19551956
assert foo not in task_pool.get_tasks()
19561957

19571958
# attempt to spawn it again
19581959
itask = task_pool.spawn_task("foo", IntegerPoint("1"), {1})
19591960
assert itask is None
1960-
assert "Not respawning 1/foo - task was removed" in caplog.text
1961+
assert "Not respawning 1/foo" in caplog.text
19611962

19621963

19631964
async def test_remove_active_task(
@@ -2576,3 +2577,31 @@ def list_spawn_task_calls():
25762577
assert db_select(
25772578
schd, True, 'task_outputs', 'outputs', cycle='1', name='foo'
25782579
) == [('{"x": "(manually completed)"}',)]
2580+
2581+
2582+
async def test_add_to_pool(
2583+
flow, scheduler, start, caplog
2584+
):
2585+
"""It should log attempts to add the same task again."""
2586+
id_ = flow(
2587+
{
2588+
'scheduling': {
2589+
'graph': {
2590+
'R1': 'a',
2591+
},
2592+
},
2593+
}
2594+
)
2595+
schd = scheduler(id_)
2596+
2597+
async with start(schd):
2598+
caplog.set_level(logging.DEBUG, CYLC_LOG)
2599+
2600+
# 1/a should be pre-spawned (parentless)
2601+
a_1 = schd.pool.get_task(IntegerPoint('1'), 'a')
2602+
assert a_1
2603+
2604+
# add it again
2605+
schd.pool.add_to_pool(a_1)
2606+
2607+
assert "1/a not added to n=0: already exists" in caplog.text

0 commit comments

Comments
 (0)