Skip to content

Commit b1f3442

Browse files
authored
Merge pull request #6625 from MetRonnie/task-param
Fix duplicate task triggers
2 parents 0590bdd + 76ef1c7 commit b1f3442

File tree

6 files changed

+112
-16
lines changed

6 files changed

+112
-16
lines changed

changes.d/6625.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Efficiency improvement: avoid storing duplicate information on graph triggers.

cylc/flow/task_trigger.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@
1414
# You should have received a copy of the GNU General Public License
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616

17-
from typing import TYPE_CHECKING, Tuple
17+
from typing import (
18+
TYPE_CHECKING,
19+
Optional,
20+
Tuple,
21+
)
1822

1923
from cylc.flow.cycling.loader import (
2024
get_interval,
@@ -38,21 +42,28 @@ class TaskTrigger:
3842
"""Class representing an upstream dependency.
3943
4044
Args:
41-
task_name (str): The name of the upstream task.
42-
cycle_point_offset (str): String representing the offset of the
45+
task_name: The name of the upstream task.
46+
cycle_point_offset: String representing the offset of the
4347
upstream task (e.g. -P1D) if this dependency is not an absolute
4448
one. Else None.
45-
output (str): The task state / output for this trigger e.g. succeeded.
49+
output: The task state / output for this trigger e.g. succeeded.
4650
4751
"""
4852

4953
__slots__ = ['task_name', 'cycle_point_offset', 'output',
5054
'offset_is_irregular', 'offset_is_absolute',
5155
'offset_is_from_icp', 'initial_point']
5256

53-
def __init__(self, task_name, cycle_point_offset, output,
54-
offset_is_irregular=False, offset_is_absolute=False,
55-
offset_is_from_icp=False, initial_point=None):
57+
def __init__(
58+
self,
59+
task_name: str,
60+
cycle_point_offset: Optional[str],
61+
output: str,
62+
offset_is_irregular: bool = False,
63+
offset_is_absolute: bool = False,
64+
offset_is_from_icp: bool = False,
65+
initial_point: 'Optional[PointBase]' = None,
66+
):
5667
self.task_name = task_name
5768
self.cycle_point_offset = cycle_point_offset
5869
self.output = output
@@ -64,9 +75,10 @@ def __init__(self, task_name, cycle_point_offset, output,
6475
# 2000, 20000101T0600Z, 2000-01-01T06:00+00:00, ...
6576
# AND NON-ABSOLUTE IRREGULAR:
6677
# -PT6H+P1D, T00, ...
67-
if (self.offset_is_irregular and any(
68-
self.cycle_point_offset.startswith(c)
69-
for c in ['P', '+', '-', 'T'])):
78+
if self.offset_is_irregular and any(
79+
self.cycle_point_offset.startswith(c) # type: ignore[union-attr]
80+
for c in ['P', '+', '-', 'T']
81+
):
7082
self.offset_is_absolute = False
7183

7284
def get_parent_point(self, from_point):
@@ -147,7 +159,28 @@ def __str__(self):
147159
else:
148160
return '%s:%s' % (self.task_name, self.output)
149161

150-
__repr__ = __str__
162+
def __repr__(self) -> str:
163+
"""
164+
>>> TaskTrigger('', '', '')
165+
<TaskTrigger ...>
166+
"""
167+
return f"<{type(self).__name__} {self}>"
168+
169+
def __hash__(self) -> int:
170+
return hash((
171+
self.task_name,
172+
self.cycle_point_offset,
173+
self.output,
174+
self.offset_is_irregular,
175+
self.offset_is_from_icp,
176+
self.offset_is_absolute,
177+
self.initial_point,
178+
))
179+
180+
def __eq__(self, other: object) -> bool:
181+
if not isinstance(other, TaskTrigger):
182+
return NotImplemented
183+
return hash(self) == hash(other)
151184

152185
@staticmethod
153186
def standardise_name(name):

cylc/flow/taskdef.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ def __init__(self, name, rtcfg, start_point, initial_point):
183183
self.dependencies: Dict[SequenceBase, List[Dependency]] = {}
184184
self.outputs = {} # {output: (message, is_required)}
185185
self.graph_children: Dict[
186-
SequenceBase, Dict[str, List[Tuple[str, TaskTrigger]]]
186+
SequenceBase, Dict[str, Set[Tuple[str, TaskTrigger]]]
187187
] = {}
188188
self.graph_parents: Dict[
189189
SequenceBase, Set[Tuple[str, TaskTrigger]]
@@ -250,8 +250,8 @@ def add_graph_child(
250250
self.graph_children.setdefault(
251251
sequence, {}
252252
).setdefault(
253-
trigger.output, []
254-
).append((taskname, trigger))
253+
trigger.output, set()
254+
).add((taskname, trigger))
255255

256256
def add_graph_parent(
257257
self, trigger: 'TaskTrigger', parent: str, sequence: 'SequenceBase'

tests/integration/test_graphql.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ async def test_edges(harness):
287287
'graphql',
288288
{'request_string': 'query { edges { id } }'}
289289
)
290+
ret['edges'].sort(key=lambda x: x['id'])
290291
assert ret == {
291292
'edges': [
292293
{'id': id_}

tests/integration/test_taskdef.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
2+
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
3+
#
4+
# This program is free software: you can redistribute it and/or modify
5+
# it under the terms of the GNU General Public License as published by
6+
# the Free Software Foundation, either version 3 of the License, or
7+
# (at your option) any later version.
8+
#
9+
# This program is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
# GNU General Public License for more details.
13+
#
14+
# You should have received a copy of the GNU General Public License
15+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
17+
from cylc.flow.config import WorkflowConfig
18+
from cylc.flow.pathutil import get_workflow_run_dir
19+
from cylc.flow.scheduler_cli import RunOptions
20+
from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED
21+
from cylc.flow.workflow_files import WorkflowFiles
22+
23+
24+
def test_graph_children(flow):
25+
"""TaskDef.graph_children should not include duplicates.
26+
27+
https://github.com/cylc/cylc-flow/issues/6619#issuecomment-2668932069
28+
"""
29+
wid = flow({
30+
'scheduling': {
31+
'graph': {
32+
'R1': 'foo | bar<n> => fin',
33+
},
34+
},
35+
'task parameters': {
36+
'n': '1..3',
37+
},
38+
})
39+
config = WorkflowConfig(
40+
wid, get_workflow_run_dir(wid, WorkflowFiles.FLOW_FILE), RunOptions()
41+
)
42+
foo = config.taskdefs['foo']
43+
graph_children = list(foo.graph_children.values())[0]
44+
assert [name for name, _ in graph_children[TASK_OUTPUT_SUCCEEDED]] == [
45+
'fin'
46+
]

tests/unit/test_task_trigger.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,16 @@
1414
# You should have received a copy of the GNU General Public License
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616

17-
from cylc.flow.cycling.loader import get_point, get_sequence
18-
from cylc.flow.task_trigger import TaskTrigger, Dependency
17+
from cylc.flow.cycling.integer import IntegerPoint
18+
from cylc.flow.cycling.loader import (
19+
get_point,
20+
get_sequence,
21+
)
1922
from cylc.flow.task_outputs import TaskOutputs
23+
from cylc.flow.task_trigger import (
24+
Dependency,
25+
TaskTrigger,
26+
)
2027

2128

2229
def test_check_with_cycle_point():
@@ -171,3 +178,11 @@ def test_str(set_cycling_type):
171178

172179
trigger = TaskTrigger('name', None, 'output')
173180
assert str(trigger) == 'name:output'
181+
182+
183+
def test_eq():
184+
args = ('foo', '+P1', 'succeeded')
185+
assert TaskTrigger(*args) == TaskTrigger(*args)
186+
assert TaskTrigger(*args) != TaskTrigger(
187+
*args, initial_point=IntegerPoint('1')
188+
)

0 commit comments

Comments
 (0)