Skip to content

Commit 40465f8

Browse files
committed
Tidy set-command logging etc.
1 parent 04b887c commit 40465f8

10 files changed

Lines changed: 140 additions & 103 deletions

File tree

cylc/flow/id.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,11 @@ def duplicate(
529529
)
530530

531531

532-
def quick_relative_id(cycle: Union[str, int, 'PointBase'], task: str) -> str:
532+
def quick_relative_id(
533+
cycle: Union[str, int, 'PointBase'],
534+
task: str,
535+
output: Optional[str] = None
536+
) -> str:
533537
"""Generate a relative ID for a task.
534538
535539
This is a more efficient solution to `Tokens` for cases where
@@ -541,7 +545,10 @@ def quick_relative_id(cycle: Union[str, int, 'PointBase'], task: str) -> str:
541545
True
542546
543547
"""
544-
return f'{cycle}/{task}'
548+
if output is None:
549+
return f'{cycle}/{task}'
550+
else:
551+
return f'{cycle}/{task}:{output}'
545552

546553

547554
def _dict_strip(dictionary):

cylc/flow/prerequisite.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
KeysView,
2727
NamedTuple,
2828
Optional,
29-
Set,
3029
Tuple,
3130
Union,
3231
)
@@ -57,9 +56,12 @@ class PrereqTuple(NamedTuple):
5756
task: str
5857
output: str
5958

60-
def get_id(self) -> str:
59+
def get_id(self, show_output=False) -> str:
6160
"""Get the relative ID of the task in this prereq output."""
62-
return quick_relative_id(self.point, self.task)
61+
if show_output:
62+
return quick_relative_id(self.point, self.task, self.output)
63+
else:
64+
return quick_relative_id(self.point, self.task)
6365

6466
@staticmethod
6567
def coerce(tuple_: AnyPrereqTuple) -> 'PrereqTuple':
@@ -265,32 +267,27 @@ def satisfy_me(
265267
outputs: Iterable['Tokens'],
266268
mode: Optional[RunMode] = None,
267269
forced: bool = False,
268-
) -> 'Set[Tokens]':
270+
) -> None:
269271
"""Set the given outputs as satisfied (if they are not already).
270272
271-
Return outputs that match.
272-
273273
Args:
274274
outputs: List of outputs to satisfy.
275275
mode: Task run mode.
276276
forced: If True, records that this should not be undone by
277277
`cylc remove`.
278278
"""
279-
valid = set()
280279
for output in outputs:
281280
output_tuple = PrereqTuple(
282281
output['cycle'], output['task'], output['task_sel']
283282
)
284283
if output_tuple not in self._satisfied:
285284
continue
286-
valid.add(output)
287285
if not self._satisfied[output_tuple]:
288286
self[output_tuple] = (
289287
'force satisfied' if forced
290288
else 'satisfied by skip mode' if mode == RunMode.SKIP
291289
else 'satisfied naturally'
292290
)
293-
return valid
294291

295292
def api_dump(self) -> Optional[PbPrerequisite]:
296293
"""Return list of populated Protobuf data objects."""

cylc/flow/subprocctx.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,13 +199,20 @@ def update_command(self, workflow_run_dir):
199199
json.dumps(self.func_kwargs),
200200
workflow_run_dir]
201201

202-
def get_signature(self):
203-
"""Return the function call signature (as a string)."""
202+
def get_signature(self) -> str:
203+
"""Return the function call signature."""
204204
skeys = sorted(self.func_kwargs.keys())
205205
args = self.func_args + [
206206
"%s=%s" % (i, self.func_kwargs[i]) for i in skeys]
207207
return "%s(%s)" % (self.func_name, ", ".join([str(a) for a in args]))
208208

209+
def get_description(self, show_intvl: bool = False) -> str:
210+
"""Return a complete description of the function."""
211+
descr = f"{self.label} = {self.get_signature()}"
212+
if show_intvl:
213+
descr = f"call interval {self.intvl}s: {descr}"
214+
return descr
215+
209216
def dump(self) -> str:
210217
"""Output for logging."""
211218
return SubProcContext.__str__(self)

cylc/flow/task_pool.py

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
from cylc.flow.id_cli import contains_fnmatch
6161
from cylc.flow.id_match import filter_ids
6262
from cylc.flow.platforms import get_platform
63+
from cylc.flow.prerequisite import PrereqTuple
6364
from cylc.flow.run_modes import RunMode
6465
from cylc.flow.run_modes.skip import process_outputs as get_skip_mode_outputs
6566
from cylc.flow.task_action_timer import (
@@ -1916,7 +1917,9 @@ def _get_task_proxy_db_outputs(
19161917
self._load_historical_outputs(itask)
19171918
return itask
19181919

1919-
def _standardise_prereqs(self, prereqs: 'Iterable[str]') -> 'Set[Tokens]':
1920+
def _standardise_prereqs(
1921+
self, prereqs: 'Iterable[str]'
1922+
) -> 'Set[PrereqTuple]':
19201923
"""Extract task prerequistes from user input and standardise.
19211924
19221925
Weed out any xtrigger prerequisites.
@@ -1953,7 +1956,7 @@ def _standardise_prereqs(self, prereqs: 'Iterable[str]') -> 'Set[Tokens]':
19531956
LOG.warning(
19541957
f'Invalid prerequisite cycle point:\n{exc.args[0]}')
19551958
else:
1956-
_prereqs.add(pre.duplicate(task_sel=msg, cycle=cycle))
1959+
_prereqs.add(PrereqTuple(str(cycle), str(pre['task']), msg))
19571960
return _prereqs
19581961

19591962
def _standardise_outputs(
@@ -1975,7 +1978,7 @@ def _standardise_outputs(
19751978

19761979
def _get_prereq_params(
19771980
self, prereqs: 'Iterable[str]', tdef: 'TaskDef', point: 'PointBase'
1978-
) -> 'Tuple[bool, Set[Tokens], Dict[str, bool]]':
1981+
) -> 'Tuple[bool, Set[PrereqTuple], Dict[str, bool]]':
19791982
"""Convert input prerequisites to Tokens of just the valid ones.
19801983
19811984
And convert the (mutually exclusive) "['all']" shortcut to a bool.
@@ -2104,7 +2107,7 @@ def set_prereqs_and_outputs(
21042107

21052108
def _get_valid_prereqs(
21062109
self, prereqs: Iterable[str], tdef: 'TaskDef', point: 'PointBase'
2107-
) -> 'Set[Tokens]':
2110+
) -> 'Set[PrereqTuple]':
21082111
"""Validate CLI prerequisites and return associated task messages.
21092112
21102113
To set prerequisites, the user gives trigger names, but we need the
@@ -2120,24 +2123,23 @@ def _get_valid_prereqs(
21202123
"""
21212124
# Valid prerequisites as tokens (outputs as task messages).
21222125
valid_pre = {
2123-
Tokens(f"{key.point}/{key.task}:{key.output}", relative=True)
2126+
PrereqTuple(key.point, key.task, key.output)
21242127
for pre in tdef.get_prereqs(point)
21252128
for key in pre.keys()
21262129
}
21272130

2128-
# standardise, tokenise, and weed out xtrigger prerequisites
2131+
# standardise and weed out xtrigger prerequisites
21292132
req_pre = self._standardise_prereqs(prereqs)
21302133

21312134
for prereq in req_pre - valid_pre:
21322135
# But log bad ones with triggers, not messages.
21332136
trg = self.config.get_taskdef(
2134-
str(prereq["task"])
2135-
).get_output(prereq["task_sel"])
2137+
str(prereq.task)
2138+
).get_output(prereq.output)
21362139
LOG.warning(
21372140
f'{point}/{tdef.name} does not depend on '
2138-
f'"{prereq["cycle"]}/{prereq["task"]}:{trg}"'
2141+
f'"{prereq.point}/{prereq.task}:{trg}"'
21392142
)
2140-
21412143
return valid_pre & req_pre
21422144

21432145
def _get_valid_xtrigs(
@@ -2228,7 +2230,7 @@ def _set_outputs_itask(
22282230
def _set_prereqs_itask(
22292231
self,
22302232
itask: 'TaskProxy',
2231-
prereqs: 'Iterable[Tokens]',
2233+
prereqs: 'Iterable[PrereqTuple]',
22322234
xtrigs: 'Dict[str, bool]',
22332235
set_all: bool
22342236
) -> None:
@@ -2237,9 +2239,12 @@ def _set_prereqs_itask(
22372239
Designated flows should already be merged to the task proxy.
22382240
"""
22392241
if set_all:
2240-
itask.state.set_prerequisites_all_satisfied()
2242+
# (task prerequisites, not xtriggers)
2243+
itask.force_satisfy_all()
22412244
else:
2242-
itask.satisfy_me(prereqs, forced=True)
2245+
# task prerequisites
2246+
itask.force_satisfy(prereqs)
2247+
# xtriggers, including "all"
22432248
self.xtrigger_mgr.force_satisfy(itask, xtrigs)
22442249

22452250
if (
@@ -2253,7 +2258,7 @@ def _set_prereqs_tdef(
22532258
self,
22542259
point: 'PointBase',
22552260
taskdef: 'TaskDef',
2256-
prereqs: 'Iterable[Tokens]',
2261+
prereqs: 'Iterable[PrereqTuple]',
22572262
xtrigs: 'Dict[str, bool]',
22582263
flow_nums: 'FlowNums',
22592264
flow_wait: bool,

cylc/flow/task_proxy.py

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -569,20 +569,63 @@ def state_reset(
569569

570570
def satisfy_me(
571571
self,
572-
task_messages: 'Iterable[Tokens]',
572+
outputs: 'Iterable[Tokens]',
573573
mode: Optional[RunMode] = RunMode.LIVE,
574-
forced: bool = False,
575-
) -> 'Set[Tokens]':
576-
"""Try to satisfy my prerequisites with given output messages.
574+
forced: bool = False, # TODO forced no longer needed here
575+
) -> None:
576+
"""Try to satisfy my prerequisites with given task output messages.
577+
578+
Output format: "cycle/task:message"
577579
578-
The task output messages are of the form "cycle/task:message"
579-
Log a warning for messages that I don't depend on.
580+
"""
581+
for prereq in (
582+
*self.state.prerequisites, *self.state.suicide_prerequisites
583+
):
584+
prereq.satisfy_me(outputs, mode=mode, forced=forced)
580585

581-
Return a set of unmatched task messages.
586+
def force_satisfy(self, prereqs: 'Iterable[PrereqTuple]') -> None:
587+
"""Force satisfy given task prerequisites.
588+
589+
Only called via "cylc set" command so no need to record run mode.
590+
591+
"""
592+
for prereq in self.state.prerequisites:
593+
for pre, state in prereq.items():
594+
# (PrereqTuple, False or "satisfied naturally" etc.)
595+
if pre not in prereqs:
596+
continue
597+
if not state:
598+
prereq[pre] = "force satisfied"
599+
LOG.info(
600+
f"[{self}] prerequisite satisfied (forced):"
601+
f" {pre.get_id(True)}"
602+
)
603+
else:
604+
LOG.info(
605+
f"[{self}] prerequisite already satisfied:"
606+
f" {pre.get_id(True)}"
607+
)
608+
609+
def force_satisfy_all(self):
610+
"""Force satisfy all task prerequisites.
611+
612+
Only called via "cylc set" command so no need to record run mode.
582613
583614
"""
584-
used = self.state.satisfy_me(task_messages, mode=mode, forced=forced)
585-
return set(task_messages) - used
615+
for prereq in self.state.prerequisites:
616+
for pre, state in prereq.items():
617+
# (PrereqTuple, False or "satisfied naturally" etc.)
618+
if not state:
619+
prereq[pre] = "force satisfied"
620+
LOG.info(
621+
f"[{self}] prerequisite satisfied (forced):"
622+
f" {pre.get_id(True)}"
623+
)
624+
else:
625+
LOG.info(
626+
f"[{self}] prerequisite already satisfied:"
627+
f" {pre.get_id(True)}"
628+
)
586629

587630
def clock_expire(self) -> bool:
588631
"""Return True if clock expire time is up, else False."""

cylc/flow/task_state.py

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919
from typing import (
2020
TYPE_CHECKING,
2121
Dict,
22-
Iterable,
2322
List,
2423
Optional,
25-
Set,
2624
)
2725

2826
from cylc.flow.prerequisite import Prerequisite
@@ -40,9 +38,7 @@
4038

4139
if TYPE_CHECKING:
4240
from cylc.flow.cycling import PointBase
43-
from cylc.flow.id import Tokens
4441
from cylc.flow.prerequisite import PrereqTuple
45-
from cylc.flow.run_modes import RunMode
4642
from cylc.flow.taskdef import TaskDef
4743

4844

@@ -321,23 +317,6 @@ def __call__(
321317
)
322318
)
323319

324-
def satisfy_me(
325-
self,
326-
outputs: Iterable['Tokens'],
327-
mode: 'Optional[RunMode]',
328-
forced: bool = False,
329-
) -> Set['Tokens']:
330-
"""Try to satisfy my prerequisites with given outputs.
331-
332-
Return which outputs I actually depend on.
333-
"""
334-
valid: Set[Tokens] = set()
335-
for prereq in (*self.prerequisites, *self.suicide_prerequisites):
336-
valid.update(
337-
prereq.satisfy_me(outputs, mode=mode, forced=forced)
338-
)
339-
return valid
340-
341320
def xtriggers_all_satisfied(self):
342321
"""Return True if all xtriggers are satisfied."""
343322
return all(self.xtriggers.values())

0 commit comments

Comments
 (0)