Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes.d/6817.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixes two rare bugs associated with reloading the workflow configuration
after removing tasks or xtriggers.
89 changes: 77 additions & 12 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,37 @@
WORKFLOW: {'log_records': 10}
}

# internal runtime to protobuf field name mapping
RUNTIME_CFG_MAP_TO_FIELD = {
'completion': 'completion',
'directives': 'directives',
'environment': 'environment',
'env-script': 'env_script',
'err-script': 'err_script',
'execution polling intervals': 'execution_polling_intervals',
'execution retry delays': 'execution_retry_delays',
'execution time limit': 'execution_time_limit',
'exit-script': 'exit_script',
'init-script': 'init_script',
'outputs': 'outputs',
'post-script': 'post_script',
'platform': 'platform',
'pre-script': 'pre_script',
'run mode': 'run_mode',
'script': 'script',
'submission polling intervals': 'submission_polling_intervals',
'submission retry delays': 'submission_retry_delays',
'work sub-directory': 'work_sub_dir',
}
RUNTIME_LIST_JOINS = {
'execution polling intervals',
'execution retry delays',
'submission polling intervals',
'submission retry delays',
}
RUNTIME_JSON_DUMPS = {'directives', 'environment', 'outputs'}
RUNTIME_STRINGIFYS = {'execution time limit'}


def setbuff(obj, key, value):
"""Set an attribute on a protobuf object.
Expand Down Expand Up @@ -318,6 +349,41 @@ def runtime_from_config(rtconfig):
)


def runtime_from_partial(rtconfig, runtimeold: Optional[PbRuntime] = None):
"""Populate runtime object from partial/full config.

Potentially slower than the non-partial one, due to tha the setattr calls,
but does not have expected fields.
"""
runtime = PbRuntime()
if runtimeold is not None:
runtime.CopyFrom(runtimeold)
for key, val in rtconfig.items():
if val is None or key not in RUNTIME_CFG_MAP_TO_FIELD:
continue
elif key in RUNTIME_LIST_JOINS:
setattr(runtime, RUNTIME_CFG_MAP_TO_FIELD[key], listjoin(val))
elif key in RUNTIME_JSON_DUMPS:
setattr(
runtime,
RUNTIME_CFG_MAP_TO_FIELD[key],
json.dumps(
[
{'key': k, 'value': v}
for k, v in val.items()
]
)
)
elif key == 'platform' and isinstance(val, dict):
with suppress(KeyError, TypeError):
setattr(runtime, RUNTIME_CFG_MAP_TO_FIELD[key], val['name'])
elif key in RUNTIME_STRINGIFYS:
setattr(runtime, RUNTIME_CFG_MAP_TO_FIELD[key], str(val or ''))
else:
setattr(runtime, RUNTIME_CFG_MAP_TO_FIELD[key], val)
return runtime


def reset_protobuf_object(msg_class, msg_orig):
"""Reset object to clear memory build-up."""
# See: https://github.com/protocolbuffers/protobuf/issues/19674
Expand Down Expand Up @@ -1573,6 +1639,9 @@ def _process_internal_task_proxy(
ext_trig.satisfied = satisfied

for label, satisfied in itask.state.xtriggers.items():
# Reload may have removed xtrigger of orphan task
if label not in self.schd.xtrigger_mgr.xtriggers.functx_map:
continue
sig = self.schd.xtrigger_mgr.get_xtrig_ctx(
itask, label).get_signature()
xtrig = tproxy.xtriggers[f'{label}={sig}']
Expand Down Expand Up @@ -1656,13 +1725,9 @@ def insert_job(
)
# Not all fields are populated with some submit-failures,
# so use task cfg as base.
j_cfg = pdeepcopy(self._apply_broadcasts_to_runtime(
tp_tokens,
self.schd.config.cfg['runtime'][tproxy.name]
))
for key, val in job_conf.items():
j_cfg[key] = val
j_buf.runtime.CopyFrom(runtime_from_config(j_cfg))
j_buf.runtime.CopyFrom(
runtime_from_partial(job_conf, tproxy.runtime)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used instead, because it can pick up existing orphan runtime, or runtime of newly created post reload.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this is covered by insert_job testing of tests/i/test_data_store_mgr.py.

)

# Add in log files.
j_buf.job_log_dir = get_task_job_log(
Expand Down Expand Up @@ -2326,16 +2391,16 @@ def delta_broadcast(self):
self.updates_pending = True

def _generate_broadcast_node_deltas(self, node_data, node_type):
cfg = self.schd.config.cfg
rt_cfg = self.schd.config.cfg['runtime']
# NOTE: node_data may change during operation so make a copy
# see https://github.com/cylc/cylc-flow/pull/6397
for node_id, node in list(node_data.items()):
# Avoid removed tasks with deltas queued during reload.
if node.name not in rt_cfg:
continue
tokens = Tokens(node_id)
new_runtime = runtime_from_config(
self._apply_broadcasts_to_runtime(
tokens,
cfg['runtime'][node.name]
)
self._apply_broadcasts_to_runtime(tokens, rt_cfg[node.name])
)
new_sruntime = new_runtime.SerializeToString(
deterministic=True
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class ResponseDict(TypedDict, total=False):
different versions of Cylc 8.
"""
data: object
"""For most Cylc commands that issue GQL mutations, the data field will
look like:
"""For most Cylc commands that issue GraphQL mutations, the data field
will look like:
data: {
<mutationName1>: {
result: [
Expand Down
18 changes: 5 additions & 13 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
FAMILIES,
FAMILY_PROXIES,
JOBS,
RUNTIME_CFG_MAP_TO_FIELD,
TASK_PROXIES,
TASKS,
)
Expand Down Expand Up @@ -874,23 +875,14 @@ class Meta:


RUNTIME_FIELD_TO_CFG_MAP = {
**{
k: k.replace('_', ' ') for k in Runtime.__dict__
if not k.startswith('_')
},
'init_script': 'init-script',
'env_script': 'env-script',
'err_script': 'err-script',
'exit_script': 'exit-script',
'pre_script': 'pre-script',
'post_script': 'post-script',
'work_sub_dir': 'work sub-directory',
v: k
for k, v in RUNTIME_CFG_MAP_TO_FIELD.items()
}
"""Map GQL Runtime fields' names to workflow config setting names."""
"""Map Pb/GraphQL Runtime fields' names to workflow config setting names."""


def runtime_schema_to_cfg(runtime: dict) -> dict:
"""Covert GQL Runtime field names to workflow config setting names and
"""Covert GraphQL Runtime field names to workflow config setting names and
perform any necessary processing on the values."""
# We have to manually lowercase the run_mode field because we don't define
# a proper schema for BroadcastSetting (it's just GenericScalar) so
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ def job_config(schd):
'script': 'sleep 5; echo "I come in peace"',
'work_d': None,
'directives': {},
'environment': {},
'environment': {"FOO": "foo"},
'param_var': {},
'platform': {'name': 'platform'},
'execution retry delays': [10.0, 20.0],
'execution time limit': 30.0,
}


Expand Down
63 changes: 63 additions & 0 deletions tests/integration/test_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,69 @@ async def test_reload_global_platform_group(
assert platform['meta']['x'] == '2'


async def test_orphan_reload(
flow,
scheduler,
start,
log_filter,
):
"""Reload should not fail because of orphaned tasks.

The following aspects of reload-with-orphans are tested:
- Broadcast deltas generated after reload.
https://github.com/cylc/cylc-flow/issues/6814
- Removal of both xtrigger and associated active/incomplete task.
https://github.com/cylc/cylc-flow/issues/6815

(Orphans being active/incomplete tasks removed from reloaded workflow cfg.)
"""
before = {
'scheduling': {
'initial cycle point': '20010101T0000Z',
'graph': {
'R1': '''
foo => bar
@wall_clock => bar
'''
}
}
}
after = {
'scheduling': {
'initial cycle point': '20010101T0000Z',
'graph': {
'R1': 'foo'
}
}
}
id_ = flow(before)
schd = scheduler(id_)
async with start(schd):
# spawn in bar
foo = schd.pool._get_task_by_id('20010101T0000Z/foo')
schd.pool.task_events_mgr.process_message(foo, 'INFO', 'succeeded')
bar = schd.pool._get_task_by_id('20010101T0000Z/bar')
# set bar to failed
schd.pool.task_events_mgr.process_message(bar, 'INFO', 'failed')

# Save our progress
schd.workflow_db_mgr.put_task_pool(schd.pool)

# Change workflow to one without bar and xtrigger
flow(after, workflow_id=id_)

# reload the workflow
await commands.run_cmd(commands.reload_workflow(schd))

# test broadcast delta over orphaned task
schd.data_store_mgr.delta_broadcast()

# the reload should have completed successfully
assert log_filter(
contains=('Reload completed')
)


async def test_data_store_tproxy(flow, scheduler, start):
"""Check N>0 task proxy in data store has correct info on reload.

Expand Down
11 changes: 4 additions & 7 deletions tests/unit/network/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,12 @@ def test_sort_args(elements, sort_args, expected_result):
assert elements == expected_result


@pytest.mark.parametrize(
'field_name', RUNTIME_FIELD_TO_CFG_MAP.keys()
)
def test_runtime_field_to_cfg_map(field_name: str):
def test_runtime_field_to_cfg_map():
"""Ensure the Runtime type's fields can be mapped back to the workflow
config."""
cfg_name = RUNTIME_FIELD_TO_CFG_MAP[field_name]
assert field_name in Runtime.__dict__
assert WORKFLOW_SPEC.get('runtime', '__MANY__', cfg_name)
assert set(RUNTIME_FIELD_TO_CFG_MAP) == set(Runtime._meta.fields)
for cfg_name in RUNTIME_FIELD_TO_CFG_MAP.values():
assert WORKFLOW_SPEC.get('runtime', '__MANY__', cfg_name)


@pytest.mark.parametrize('runtime_dict,expected', [
Expand Down
Loading