Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6625.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Efficiency improvement: avoid storing duplicate information on graph triggers.
55 changes: 44 additions & 11 deletions cylc/flow/task_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from typing import TYPE_CHECKING, Tuple
from typing import (
TYPE_CHECKING,
Optional,
Tuple,
)

from cylc.flow.cycling.loader import (
get_interval,
Expand All @@ -38,21 +42,28 @@ class TaskTrigger:
"""Class representing an upstream dependency.

Args:
task_name (str): The name of the upstream task.
cycle_point_offset (str): String representing the offset of the
task_name: The name of the upstream task.
cycle_point_offset: String representing the offset of the
upstream task (e.g. -P1D) if this dependency is not an absolute
one. Else None.
output (str): The task state / output for this trigger e.g. succeeded.
output: The task state / output for this trigger e.g. succeeded.

"""

__slots__ = ['task_name', 'cycle_point_offset', 'output',
'offset_is_irregular', 'offset_is_absolute',
'offset_is_from_icp', 'initial_point']

def __init__(self, task_name, cycle_point_offset, output,
offset_is_irregular=False, offset_is_absolute=False,
offset_is_from_icp=False, initial_point=None):
def __init__(
self,
task_name: str,
cycle_point_offset: Optional[str],
output: str,
offset_is_irregular: bool = False,
offset_is_absolute: bool = False,
offset_is_from_icp: bool = False,
initial_point: 'Optional[PointBase]' = None,
):
self.task_name = task_name
self.cycle_point_offset = cycle_point_offset
self.output = output
Expand All @@ -64,9 +75,10 @@ def __init__(self, task_name, cycle_point_offset, output,
# 2000, 20000101T0600Z, 2000-01-01T06:00+00:00, ...
# AND NON-ABSOLUTE IRREGULAR:
# -PT6H+P1D, T00, ...
if (self.offset_is_irregular and any(
self.cycle_point_offset.startswith(c)
for c in ['P', '+', '-', 'T'])):
if self.offset_is_irregular and any(
self.cycle_point_offset.startswith(c) # type: ignore[union-attr]
for c in ['P', '+', '-', 'T']
):
self.offset_is_absolute = False

def get_parent_point(self, from_point):
Expand Down Expand Up @@ -147,7 +159,28 @@ def __str__(self):
else:
return '%s:%s' % (self.task_name, self.output)

__repr__ = __str__
def __repr__(self) -> str:
"""
>>> TaskTrigger('', '', '')
<TaskTrigger ...>
"""
return f"<{type(self).__name__} {self}>"

def __hash__(self) -> int:
return hash((
self.task_name,
self.cycle_point_offset,
self.output,
self.offset_is_irregular,
self.offset_is_from_icp,
self.offset_is_absolute,
self.initial_point,
))

def __eq__(self, other: object) -> bool:
if not isinstance(other, TaskTrigger):
return NotImplemented
return hash(self) == hash(other)

@staticmethod
def standardise_name(name):
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def __init__(self, name, rtcfg, start_point, initial_point):
self.dependencies: Dict[SequenceBase, List[Dependency]] = {}
self.outputs = {} # {output: (message, is_required)}
self.graph_children: Dict[
SequenceBase, Dict[str, List[Tuple[str, TaskTrigger]]]
SequenceBase, Dict[str, Set[Tuple[str, TaskTrigger]]]
] = {}
self.graph_parents: Dict[
SequenceBase, Set[Tuple[str, TaskTrigger]]
Expand Down Expand Up @@ -250,8 +250,8 @@ def add_graph_child(
self.graph_children.setdefault(
sequence, {}
).setdefault(
trigger.output, []
).append((taskname, trigger))
trigger.output, set()
).add((taskname, trigger))

def add_graph_parent(
self, trigger: 'TaskTrigger', parent: str, sequence: 'SequenceBase'
Expand Down
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ addopts = --verbose
# disable pytest-tornasync because it conflicts with pytest-asyncio's auto mode
-p no:tornado
-m "not linkcheck"
verbosity_assertions = 2
testpaths =
cylc/flow/
tests/unit/
Expand Down
1 change: 1 addition & 0 deletions tests/integration/test_graphql.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ async def test_edges(harness):
'graphql',
{'request_string': 'query { edges { id } }'}
)
ret['edges'].sort(key=lambda x: x['id'])
assert ret == {
'edges': [
{'id': id_}
Expand Down
46 changes: 46 additions & 0 deletions tests/integration/test_taskdef.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from cylc.flow.config import WorkflowConfig
from cylc.flow.pathutil import get_workflow_run_dir
from cylc.flow.scheduler_cli import RunOptions
from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED
from cylc.flow.workflow_files import WorkflowFiles


def test_graph_children(flow):
"""TaskDef.graph_children should not include duplicates.

https://github.com/cylc/cylc-flow/issues/6619#issuecomment-2668932069
"""
wid = flow({
'scheduling': {
'graph': {
'R1': 'foo | bar<n> => fin',
},
},
'task parameters': {
'n': '1..3',
},
})
config = WorkflowConfig(
wid, get_workflow_run_dir(wid, WorkflowFiles.FLOW_FILE), RunOptions()
)
foo = config.taskdefs['foo']
graph_children = list(foo.graph_children.values())[0]
assert [name for name, _ in graph_children[TASK_OUTPUT_SUCCEEDED]] == [
'fin'
]
19 changes: 17 additions & 2 deletions tests/unit/test_task_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,16 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from cylc.flow.cycling.loader import get_point, get_sequence
from cylc.flow.task_trigger import TaskTrigger, Dependency
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.cycling.loader import (
get_point,
get_sequence,
)
from cylc.flow.task_outputs import TaskOutputs
from cylc.flow.task_trigger import (
Dependency,
TaskTrigger,
)


def test_check_with_cycle_point():
Expand Down Expand Up @@ -171,3 +178,11 @@ def test_str(set_cycling_type):

trigger = TaskTrigger('name', None, 'output')
assert str(trigger) == 'name:output'


def test_eq():
args = ('foo', '+P1', 'succeeded')
assert TaskTrigger(*args) == TaskTrigger(*args)
assert TaskTrigger(*args) != TaskTrigger(
*args, initial_point=IntegerPoint('1')
)
Loading