Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6926.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed info missing from UI for submit-failed tasks.
18 changes: 4 additions & 14 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
Set,
TYPE_CHECKING,
Tuple,
Union,
)
import zlib

Expand Down Expand Up @@ -1612,37 +1611,27 @@ def _apply_broadcasts_to_runtime(self, tokens, rtconfig):

def insert_job(
self,
name: str,
cycle_point: Union['PointBase', str],
itask: 'TaskProxy',
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please try to avoid passing itask objects to the data store where possible.

We have had to do this in a couple of places, but we don't need to update the remaining interfaces to match.

In theory, we are supposed to be able to populate the data store out of the data base (without the Scheduler or its runtime objects, e.g. TaskProxy) so we can provide offline data.

In truth that isn't possible right now, but we should try to reduce the pain of refactor when the time comes.

status: str,
job_conf: dict,
):
) -> None:
"""Insert job into data-store.

Args:
name: Corresponding task name.
cycle_point: Cycle point string
status: The task's state.
job_conf:
Dictionary of job configuration used to generate
the job script.
(see TaskJobManager._prep_submit_task_job_impl)

Returns:

None

"""
if status not in JOB_STATUS_SET:
# Ignore task-only states e.g. preparing
# https://github.com/cylc/cylc-flow/issues/4994
return

sub_num = job_conf['submit_num']
tp_tokens = self.id_.duplicate(
cycle=str(cycle_point),
task=name,
)
tp_tokens = self.id_.duplicate(itask.tokens)
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(tp_tokens)
if not tproxy:
Expand All @@ -1665,6 +1654,7 @@ def insert_job(
execution_time_limit=job_conf.get('execution_time_limit'),
platform=job_conf['platform']['name'],
job_runner_name=job_conf.get('job_runner_name'),
job_id=itask.summary.get('submit_method_id'),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, there is no Job ID for a submission failure caused by a platform lookup error because no job submission was made.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.get() will return None in this case, which is fine

)
# Not all fields are populated with some submit-failures,
# so use task cfg as base.
Expand Down
21 changes: 6 additions & 15 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,13 +794,7 @@ def process_message(
self._process_message_submitted(itask, event_time, forced)
self.spawn_children(itask, TASK_OUTPUT_SUBMITTED, forced)

# ... but either way update the job ID in the job proxy (it only
# comes in via the submission message).
Comment on lines -797 to -798
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it is done by data_store_mgr.insert_job()

if itask.run_mode != RunMode.SIMULATION:
self.data_store_mgr.delta_job_attr(
itask, 'job_id', itask.summary['submit_method_id']
)
else:
if itask.run_mode == RunMode.SIMULATION:
# In simulation mode submitted implies started:
self.spawn_children(itask, TASK_OUTPUT_STARTED, forced)

Expand Down Expand Up @@ -1465,7 +1459,6 @@ def _process_message_submit_failed(
"time_submit_exit": event_time,
"submit_status": 1,
})
itask.summary['submit_method_id'] = None
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know why the job ID was being wiped on submit-failure

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know either, but there may be a reason, if not part of the bugfix, plz bump.

Copy link
Copy Markdown
Member

@oliver-sanders oliver-sanders Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, is this the job ID or the job submit method?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Job ID in the job runner, it's part 3 of the bugfix

LOG.error(f"[{itask}] {self.EVENT_SUBMIT_FAILED}")
if (
forced
Expand Down Expand Up @@ -1503,6 +1496,7 @@ def _process_message_submit_failed(
self._insert_task_job(
itask, event_time, self.JOB_SUBMIT_FAIL_FLAG, forced=forced)
self.data_store_mgr.delta_job_state(itask, TASK_STATUS_SUBMIT_FAILED)
self.data_store_mgr.delta_job_time(itask, 'submitted', event_time)
self._reset_job_timers(itask)

return no_retries
Expand Down Expand Up @@ -1589,11 +1583,9 @@ def _insert_task_job(
except IndexError:
# we do not have access to the job config (e.g. Scheduler
# crashed) - https://github.com/cylc/cylc-flow/pull/6326
job_id = itask.tokens.duplicate(
job=itask.submit_num
).relative_id
LOG.warning(
f'Could not find the job configuration for "{job_id}".'
'Could not find the job configuration for '
f'"{itask.job_tokens.relative_id}".'
)
itask.jobs.append({"submit_num": itask.submit_num})
job_conf = itask.jobs[-1]
Expand All @@ -1610,8 +1602,7 @@ def _insert_task_job(

# insert job into data store
self.data_store_mgr.insert_job(
itask.tdef.name,
itask.point,
itask,
job_status,
{
**job_conf,
Expand All @@ -1634,7 +1625,7 @@ def _insert_task_job(
# preparation started due to intelligent host (and or
# platform) selection
'platform_name': itask.platform['name'],
}
},
)

def _setup_job_logs_retrieval(self, itask, event) -> None:
Expand Down
35 changes: 23 additions & 12 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,12 @@ def _prep_submit_task_job(
# bad_hosts:
self.bad_hosts -= exc.hosts_consumed
self._set_retry_timers(itask, rtconfig)
# Provide dummy platform otherwise it will incorrectly show as
# the default localhost platform in the data store:
itask.platform = {
'name': rtconfig['platform'],
'job runner': '',
}
self._prep_submit_task_job_error(itask, msg, exc)
return False

Expand Down Expand Up @@ -1337,15 +1343,20 @@ def _prep_submit_task_job_error(
itask.is_manual_submit = False
# job failed in preparation i.e. is really preparation-failed rather
# than submit-failed
# provide a dummy job config - this info will be added to the data
# store
try_num = itask.get_try_num()
itask.jobs.append({
'task_id': itask.identity,
'platform': itask.platform,
'submit_num': itask.submit_num,
'try_num': try_num,
})
if not itask.jobs or (
itask.jobs[-1]['submit_num'] != itask.submit_num
):
# provide a dummy job config - this info will be added to the data
# store
itask.jobs.append({
'task_id': itask.identity,
'platform': itask.platform,
'job_runner_name': itask.platform['job runner'],
'submit_num': itask.submit_num,
'try_num': try_num,
'flow_nums': itask.flow_nums,
})
# create a DB entry for the submit-failed job
self.workflow_db_mgr.put_insert_task_jobs(
itask,
Expand Down Expand Up @@ -1415,10 +1426,10 @@ def get_execution_time_limit(

def get_job_conf(
self,
itask,
rtconfig,
job_file_path=None,
job_d=None,
itask: 'TaskProxy',
rtconfig: dict,
job_file_path: Optional[str] = None,
job_d: Optional[str] = None,
):
"""Return a job config.

Expand Down
4 changes: 3 additions & 1 deletion tests/integration/network/test_graphql.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,12 @@ async def test_edges(harness):


async def test_jobs(harness):
schd: Scheduler
schd, client, w_tokens = harness

# add a job
schd.data_store_mgr.insert_job('a', '1', 'submitted', job_config(schd))
itask = schd.pool._get_task_by_id('1/a')
schd.data_store_mgr.insert_job(itask, 'submitted', job_config(schd))
schd.data_store_mgr.update_data_structure()
j_tokens = w_tokens.duplicate(
cycle='1',
Expand Down
8 changes: 6 additions & 2 deletions tests/integration/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,15 +316,18 @@ async def test_delta_task_held(mod_harness):

def test_insert_job(mod_harness):
"""Test method that adds a new job to the store."""
schd: Scheduler
schd, data = mod_harness
assert len(schd.data_store_mgr.added[JOBS]) == 0
schd.data_store_mgr.insert_job('foo', '1', 'submitted', job_config(schd))
itask = schd.pool.get_tasks()[0]
schd.data_store_mgr.insert_job(itask, 'submitted', job_config(schd))
assert len(schd.data_store_mgr.added[JOBS]) == 1
assert ext_id(schd) in schd.data_store_mgr.added[JOBS]


def test_insert_db_job(mod_harness, job_db_row):
"""Test method that adds a new job from the db to the store."""
schd: Scheduler
schd, data = mod_harness
assert len(schd.data_store_mgr.added[JOBS]) == 1
schd.data_store_mgr.insert_db_job(0, job_db_row)
Expand All @@ -334,6 +337,7 @@ def test_insert_db_job(mod_harness, job_db_row):

def test_delta_job_msg(mod_harness):
"""Test method adding messages to job element."""
schd: Scheduler
schd, data = mod_harness
j_id = ext_id(schd)
tokens = Tokens(j_id)
Expand Down Expand Up @@ -724,7 +728,7 @@ def _patch_remove(*args, **kwargs):
assert itask
itask.submit_num += 1
one.data_store_mgr.insert_job(
itask.tdef.name, itask.point, itask.state.status, {'submit_num': 1}
itask, itask.state.status, {'submit_num': 1}
)
await one.update_data_structure()

Expand Down
67 changes: 58 additions & 9 deletions tests/integration/test_task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
EventKey,
TaskJobLogsRetrieveContext,
)
from cylc.flow.task_state import (
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMIT_FAILED,
)

from .test_workflow_events import TEMPLATES

Expand Down Expand Up @@ -126,7 +130,7 @@ async def test__insert_task_job(flow, one_conf, scheduler, start, validate):


async def test__always_insert_task_job(
flow, scheduler, mock_glbl_cfg, start, run
flow, scheduler, mock_glbl_cfg, start
):
"""Insert Task Job _Always_ inserts a task into the data store.

Expand All @@ -144,20 +148,22 @@ async def test__always_insert_task_job(
[platforms]
[[broken1]]
hosts = no-such-host-1
job runner = abc
[[broken2]]
hosts = no-such-host-2
job runner = def
[platform groups]
[[broken]]
[[broken_group]]
platforms = broken1
"""
mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', global_config)

id_ = flow({
'scheduling': {'graph': {'R1': 'broken & broken2'}},
'scheduling': {'graph': {'R1': 'foo & bar'}},
'runtime': {
'root': {'submission retry delays': 'PT10M'},
'broken': {'platform': 'broken'},
'broken2': {'platform': 'broken2'}
'foo': {'platform': 'broken_group'},
'bar': {'platform': 'broken2'}
}
})

Expand All @@ -174,14 +180,57 @@ async def test__always_insert_task_job(
)

# Both jobs are in the data store with submit-failed state:
ds_jobs = schd.data_store_mgr.data[schd.id][JOBS]
updates = {
k.split('//')[-1]: v.state
for k, v in schd.data_store_mgr.data[schd.id][JOBS].items()
id_.split('//')[-1]: (job.state, job.platform, job.job_runner_name)
for id_, job in ds_jobs.items()
}
assert updates == {
'1/broken/01': 'submit-failed',
'1/broken2/01': 'submit-failed'
'1/foo/01': ('submit-failed', 'broken_group', ''),
'1/bar/01': ('submit-failed', 'broken2', 'def'),
}
for job in ds_jobs.values():
assert job.submitted_time


async def test__submit_failed_job_id(flow, scheduler, start, db_select):
"""If a job is killed in the submitted state, the job ID should still be
in the DB/data store.

See https://github.com/cylc/cylc-flow/pull/6926
"""
async def get_ds_job_id(schd: Scheduler):
await schd.update_data_structure()
return list(schd.data_store_mgr.data[schd.id][JOBS].values())[0].job_id

id_ = flow('foo')
schd: Scheduler = scheduler(id_)
job_id = '1234'
async with start(schd):
itask = schd.pool.get_tasks()[0]
itask.state_reset(TASK_STATUS_PREPARING)
itask.submit_num = 1
itask.summary['submit_method_id'] = job_id
schd.workflow_db_mgr.put_insert_task_jobs(itask, {})
schd.task_events_mgr.process_message(
itask, 'INFO', schd.task_events_mgr.EVENT_SUBMITTED
)
assert await get_ds_job_id(schd) == job_id

schd.task_events_mgr.process_message(
itask, 'CRITICAL', schd.task_events_mgr.EVENT_SUBMIT_FAILED
)
assert itask.state(TASK_STATUS_SUBMIT_FAILED)
assert await get_ds_job_id(schd) == job_id

assert db_select(schd, False, 'task_jobs', 'job_id', 'submit_status') == [
(job_id, 1)
]

# Restart and check data store again:
schd = scheduler(id_)
async with start(schd):
assert await get_ds_job_id(schd) == job_id


async def test__process_message_failed_with_retry(one, start, log_filter):
Expand Down
6 changes: 2 additions & 4 deletions tests/integration/tui/test_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ async def workflow(
# mark 1/a/01 as failed
job_1 = schd.tokens.duplicate(cycle='1', task='a', job='01')
schd.data_store_mgr.insert_job(
'a',
IntegerPoint('1'),
itask,
TASK_STATUS_SUCCEEDED,
{'submit_num': 1, 'platform': {'name': 'x'}}
)
Expand All @@ -160,8 +159,7 @@ async def workflow(
itask.submit_num = 2
job_2 = schd.tokens.duplicate(cycle='1', task='a', job='02')
schd.data_store_mgr.insert_job(
'a',
IntegerPoint('1'),
itask,
TASK_STATUS_SUCCEEDED,
{'submit_num': 2, 'platform': {'name': 'x'}}
)
Expand Down
Loading