Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6936.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes workflow shutdown on reload with orphaned xtriggered task.
33 changes: 19 additions & 14 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1666,15 +1666,20 @@ def insert_job(
platform=job_conf['platform']['name'],
job_runner_name=job_conf.get('job_runner_name'),
)
# Not all fields are populated with some submit-failures,
# so use task cfg as base.
j_cfg = pdeepcopy(self._apply_broadcasts_to_runtime(
tp_tokens,
self.schd.config.cfg['runtime'][tproxy.name]
))
for key, val in job_conf.items():
j_cfg[key] = val
j_buf.runtime.CopyFrom(runtime_from_config(j_cfg))

# Use config runtime, otherwise use orphan runtime.
if tproxy.name in self.schd.config.cfg['runtime']:
# Not all fields are populated with some submit-failures,
# so use task cfg as base.
j_cfg = pdeepcopy(self._apply_broadcasts_to_runtime(
tp_tokens,
self.schd.config.cfg['runtime'][tproxy.name]
))
for key, val in job_conf.items():
j_cfg[key] = val
j_buf.runtime.CopyFrom(runtime_from_config(j_cfg))
else:
j_buf.runtime.CopyFrom(tproxy.runtime)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Toyed about with creating a function that is less rigid and can use existing store objects as a base..

def runtime_from_partial(rtconfig, runtimeold=None):
    """Populate runtime object from partial/full config.

    Potentially slower with all the setattr calls, but less argvar assignments
    than `runtime_from_config`.
    """
    runtime = PbRuntime()
    if runtimeold is not None:
        runtime.CopyFrom(runtimeold)
    for key, val in rtconfig.items():
        if key not in RUNTIME_MAPPING:
            continue
        elif key in RUNTIME_LIST_JOINS:
            setattr(runtime, RUNTIME_MAPPING[key], listjoin(val))
        elif key in RUNTIME_JSON_DUMPS:
            setattr(
                runtime,
                RUNTIME_MAPPING[key],
                json.dumps(
                    {'key': k, 'value': v}
                    for k, v in val.items()
                )
            )
        elif key in RUNTIME_TRY_ITEMS:
            try:
                setattr(
                    runtime,
                    RUNTIME_MAPPING[key],
                    val[RUNTIME_TRY_ITEMS[key]]
                )
            except (KeyError, TypeError):
                setattr(runtime, RUNTIME_MAPPING[key], val)
        elif key in RUNTIME_STRINGIFYS:
            setattr(runtime, RUNTIME_MAPPING[key], str(val or ''))
        else:
            setattr(runtime, RUNTIME_MAPPING[key], val)
    return runtime

but setattr doesn't seem to be playing nice with these objects (will find out why), and maybe it's slower anyway..

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test giving:

            elif key in RUNTIME_STRINGIFYS:
                setattr(runtime, RUNTIME_MAPPING[key], str(val or ''))
            else:
>               setattr(runtime, RUNTIME_MAPPING[key], val)
E               TypeError: bad argument type for built-in operation

protobuf objects can be a little finnicky about how fields are assigned/activated sometimes


# Add in log files.
j_buf.job_log_dir = get_task_job_log(
Expand Down Expand Up @@ -2338,16 +2343,16 @@ def delta_broadcast(self):
self.updates_pending = True

def _generate_broadcast_node_deltas(self, node_data, node_type):
cfg = self.schd.config.cfg
rtcfg = self.schd.config.cfg['runtime']
# NOTE: node_data may change during operation so make a copy
# see https://github.com/cylc/cylc-flow/pull/6397
for node_id, node in list(node_data.items()):
# In case of orphan, skip.
if node.name not in rtcfg:
continue
tokens = Tokens(node_id)
new_runtime = runtime_from_config(
self._apply_broadcasts_to_runtime(
tokens,
cfg['runtime'][node.name]
)
self._apply_broadcasts_to_runtime(tokens, rtcfg[node.name])
)
new_sruntime = new_runtime.SerializeToString(
deterministic=True
Expand Down
Loading