Skip to content

Commit 0b5332c

Browse files
stevebachmeierzmbc
andauthored
Sbachmei/mic 5786/embarrassingly parallel sections (#189)
* rename user_configurable attr to directly_implemented * break reliance on splitting and aggregating node being the same * Add a new custom-schema integration test (#173) * set e.p.'s step parent; remove i/o slots during e.p. step construction * simplify EmbarrassinglyParallelStep construction * Sbachmei/mic 5987/implement splitter steps and aggregator steps (#183) * save here: first working cut * remove unused splitter and aggregator attrs from I/OSlots * remove old commented out code * bugfix InputSlot.__eq__ * fix broken tests; add docstrings * add new StandaloneStep class for IOSteps, SplitterSteps, and AggregatorSteps * small tweaks * fix doc build failures * rename splitter_step_name to splitter_node_name * remove unused arg to _check_embarrassingly_parallel_details * assert we only have one output slot when aggregating and stop looping * modify docstring * special-case e.p. steps to prevent duplicative naming conventions * test against embarrassingly parallel parallel steps (#185) * test against embarrassingly parallel parallel steps * use string representation of input_files list * add test against embarrassingly parallel loop steps (#186) * add test against embarrassingly parallel HierarchicalSteps (#187) * add test against embarrassingly parallel HierarchicalSteps * Update tests/integration/test_compositions.py Co-authored-by: Zeb Burke-Conte <zmbc@users.noreply.github.com> --------- Co-authored-by: Zeb Burke-Conte <zmbc@users.noreply.github.com> * changelog --------- Co-authored-by: Zeb Burke-Conte <zmbc@users.noreply.github.com>
1 parent 624ee5d commit 0b5332c

34 files changed

Lines changed: 2065 additions & 1143 deletions

CHANGELOG.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
**0.1.14 - 5/1/25**
2+
3+
- Add support for EmbarrassinglyParallelSteps to accept sections (i.e. non-leaf steps)
4+
15
**0.1.13 - 4/21/25**
26

37
- remove graphviz from doc build install

setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
# TODO MIC-4963: Resolve quoting issue and remove pin
5757
"snakemake-interface-executor-plugins<9.0.0",
5858
"snakemake-executor-plugin-slurm",
59+
# Type stubs
60+
"pandas-stubs",
61+
"pyarrow-stubs",
5962
]
6063

6164
setup_requires = ["setuptools_scm"]

src/easylink/configuration.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,10 @@ def _validate_environment(self) -> dict[Any, Any]:
274274

275275

276276
def load_params_from_specification(
277-
pipeline_specification: str,
278-
input_data: str,
279-
computing_environment: str | None,
280-
results_dir: str,
277+
pipeline_specification: str | Path,
278+
input_data: str | Path,
279+
computing_environment: str | Path | None,
280+
results_dir: str | Path,
281281
) -> dict[str, Any]:
282282
"""Gathers together all specification data.
283283
@@ -325,7 +325,7 @@ def _load_input_data_paths(
325325

326326

327327
def _load_computing_environment(
328-
computing_environment_specification_path: str | None,
328+
computing_environment_specification_path: str | Path | None,
329329
) -> dict[Any, Any]:
330330
"""Loads the computing environment specification file and returns the contents as a dict."""
331331
if not computing_environment_specification_path:

src/easylink/graph_components.py

Lines changed: 48 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818

1919
import networkx as nx
2020

21+
from easylink.implementation import (
22+
NullAggregatorImplementation,
23+
NullImplementation,
24+
NullSplitterImplementation,
25+
)
26+
2127
if TYPE_CHECKING:
2228
from easylink.implementation import Implementation
2329
from easylink.step import Step
@@ -42,48 +48,38 @@ class InputSlot:
4248
env_var: str | None
4349
"""The environment variable that is used to pass a list of data filepaths to
4450
an ``Implementation``."""
45-
validator: Callable[[str], None] = field(compare=False)
51+
validator: Callable[[str], None] | None = field(compare=False)
4652
"""A function that validates the input data being passed into the pipeline via
4753
this ``InputSlot``. If the data is invalid, the function should raise an exception
4854
with a descriptive error message which will then be reported to the user.
4955
**Note that the function *must* be defined in the** :mod:`easylink.utilities.validation_utils`
5056
**module!**"""
51-
splitter: Callable[[list[str], str, Any], None] | None = field(
52-
default=None, compare=False
53-
)
54-
"""A function that splits the incoming data to this ``InputSlot`` into smaller
55-
pieces. The primary purpose of this functionality is to run sections of the
56-
pipeline in an embarrassingly parallel manner. **Note that the function *must*
57-
be defined in the **:mod:`easylink.utilities.splitter_utils`** module!**"""
5857

5958
def __eq__(self, other: Any) -> bool | NotImplementedType:
60-
"""Checks if two ``InputSlots`` are equal.
61-
62-
Two ``InputSlots`` are considered equal if their names, ``env_vars``, and
63-
names of their ``validators`` and ``splitters`` are all the same.
64-
"""
59+
"""Checks if two ``InputSlots`` are equal."""
6560
if not isinstance(other, InputSlot):
6661
return NotImplemented
67-
splitter_name = self.splitter.__name__ if self.splitter else None
68-
other_splitter_name = other.splitter.__name__ if other.splitter else None
62+
validator_name = self.validator.__name__ if self.validator else None
63+
other_validator_name = other.validator.__name__ if other.validator else None
6964
return (
7065
self.name == other.name
7166
and self.env_var == other.env_var
72-
and self.validator.__name__ == other.validator.__name__
73-
and splitter_name == other_splitter_name
67+
and validator_name == other_validator_name
7468
)
7569

7670
def __hash__(self) -> int:
77-
"""Hashes an ``InputSlot``.
78-
79-
The hash is based on the name of the ``InputSlot``, its ``env_var``, and
80-
the names of its ``validator`` and ``splitter``.
81-
"""
82-
splitter_name = self.splitter.__name__ if self.splitter else None
83-
return hash((self.name, self.env_var, self.validator.__name__, splitter_name))
71+
"""Hashes an ``InputSlot``."""
72+
validator_name = self.validator.__name__ if self.validator else None
73+
return hash(
74+
(
75+
self.name,
76+
self.env_var,
77+
validator_name,
78+
)
79+
)
8480

8581

86-
@dataclass()
82+
@dataclass(frozen=True)
8783
class OutputSlot:
8884
"""A single output slot from a specific node.
8985
@@ -104,31 +100,6 @@ class OutputSlot:
104100

105101
name: str
106102
"""The name of the ``OutputSlot``."""
107-
aggregator: Callable[[list[str], str], None] = field(default=None, compare=False)
108-
"""A function that aggregates all of the generated data to be passed out via this
109-
``OutputSlot``. The primary purpose of this functionality is to run sections
110-
of the pipeline in an embarrassingly parallel manner. **Note that the function
111-
*must* be defined in the **:py:mod:`easylink.utilities.aggregator_utils`** module!**"""
112-
113-
def __eq__(self, other: Any) -> bool | NotImplementedType:
114-
"""Checks if two ``OutputSlots`` are equal.
115-
116-
Two ``OutputSlots`` are considered equal if their names and the names of their
117-
``aggregators`` are the same.
118-
"""
119-
if not isinstance(other, OutputSlot):
120-
return NotImplemented
121-
aggregator_name = self.aggregator.__name__ if self.aggregator else None
122-
other_aggregator_name = other.aggregator.__name__ if other.aggregator else None
123-
return self.name == other.name and aggregator_name == other_aggregator_name
124-
125-
def __hash__(self) -> int:
126-
"""Hashes an ``OutputSlot``.
127-
128-
The hash is based on the name of the ``OutputSlot`` and the name of its ``aggregator``.
129-
"""
130-
aggregator_name = self.aggregator.__name__ if self.aggregator else None
131-
return hash((self.name, aggregator_name))
132103

133104

134105
@dataclass(frozen=True)
@@ -263,7 +234,33 @@ class ImplementationGraph(nx.MultiDiGraph):
263234
def implementation_nodes(self) -> list[str]:
264235
"""The topologically sorted list of ``Implementation`` names."""
265236
ordered_nodes = list(nx.topological_sort(self))
266-
return [node for node in ordered_nodes if node != "input_data" and node != "results"]
237+
# Remove nodes that do not actually have implementations
238+
null_implementations = [
239+
node
240+
for node in ordered_nodes
241+
if isinstance(self.nodes[node]["implementation"], NullImplementation)
242+
]
243+
return [node for node in ordered_nodes if node not in null_implementations]
244+
245+
@property
246+
def splitter_nodes(self) -> list[str]:
247+
"""The topologically sorted list of splitter nodes (which have no implementations)."""
248+
ordered_nodes = list(nx.topological_sort(self))
249+
return [
250+
node
251+
for node in ordered_nodes
252+
if isinstance(self.nodes[node]["implementation"], NullSplitterImplementation)
253+
]
254+
255+
@property
256+
def aggregator_nodes(self) -> list[str]:
257+
"""The topologically sorted list of aggregator nodes (which have no implementations)."""
258+
ordered_nodes = list(nx.topological_sort(self))
259+
return [
260+
node
261+
for node in ordered_nodes
262+
if isinstance(self.nodes[node]["implementation"], NullAggregatorImplementation)
263+
]
267264

268265
@property
269266
def implementations(self) -> list[Implementation]:

src/easylink/implementation.py

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,20 @@
99
1010
"""
1111

12+
from __future__ import annotations
13+
1214
from collections.abc import Iterable
1315
from pathlib import Path
16+
from typing import TYPE_CHECKING
1417

1518
from layered_config_tree import LayeredConfigTree
1619

17-
from easylink.graph_components import InputSlot, OutputSlot
1820
from easylink.utilities import paths
1921
from easylink.utilities.data_utils import load_yaml
2022

23+
if TYPE_CHECKING:
24+
from easylink.graph_components import InputSlot, OutputSlot
25+
2126

2227
class Implementation:
2328
"""A representation of an actual container that will be executed for a :class:`~easylink.step.Step`.
@@ -43,8 +48,8 @@ def __init__(
4348
self,
4449
schema_steps: list[str],
4550
implementation_config: LayeredConfigTree,
46-
input_slots: Iterable["InputSlot"] = (),
47-
output_slots: Iterable["OutputSlot"] = (),
51+
input_slots: Iterable[InputSlot] = (),
52+
output_slots: Iterable[OutputSlot] = (),
4853
is_embarrassingly_parallel: bool = False,
4954
):
5055
self.name = implementation_config.name
@@ -137,9 +142,8 @@ def outputs(self) -> dict[str, list[str]]:
137142
class NullImplementation:
138143
"""A partial :class:`Implementation` interface when no container is needed to run.
139144
140-
The primary use case for this class is when adding an
141-
:class:`~easylink.step.IOStep` - which does not have a corresponding
142-
``Implementation`` - to an :class:`~easylink.graph_components.ImplementationGraph`
145+
The primary use case for this class is to be able to add a :class:`~easylink.step.Step`
146+
that does *not* have a corresponding ``Implementation`` to an :class:`~easylink.graph_components.ImplementationGraph`
143147
since adding any new node requires an object with :class:`~easylink.graph_components.InputSlot`
144148
and :class:`~easylink.graph_components.OutputSlot` names.
145149
@@ -151,13 +155,14 @@ class NullImplementation:
151155
All required ``InputSlots``.
152156
output_slots
153157
All required ``OutputSlots``.
158+
154159
"""
155160

156161
def __init__(
157162
self,
158163
name: str,
159-
input_slots: Iterable["InputSlot"] = (),
160-
output_slots: Iterable["OutputSlot"] = (),
164+
input_slots: Iterable[InputSlot] = (),
165+
output_slots: Iterable[OutputSlot] = (),
161166
):
162167
self.name = name
163168
"""The name of this ``NullImplementation``."""
@@ -172,6 +177,61 @@ def __init__(
172177
is a constituent. This is definitionally None."""
173178

174179

180+
class NullSplitterImplementation(NullImplementation):
181+
"""A type of :class:`NullImplementation` specifically for :class:`SplitterSteps<easylink.step.SplitterStep>`.
182+
183+
See ``NullImplementation`` for inherited attributes.
184+
185+
Parameters
186+
----------
187+
splitter_func_name
188+
The name of the splitter function to use.
189+
190+
"""
191+
192+
def __init__(
193+
self,
194+
name: str,
195+
input_slots: Iterable[InputSlot],
196+
output_slots: Iterable[OutputSlot],
197+
splitter_func_name: str,
198+
):
199+
super().__init__(name, input_slots, output_slots)
200+
self.splitter_func_name = splitter_func_name
201+
"""The name of the splitter function to use."""
202+
203+
204+
class NullAggregatorImplementation(NullImplementation):
205+
"""A type of :class:`NullImplementation` specifically for :class:`AggregatorSteps<easylink.step.AggregatorStep>`.
206+
207+
See ``NullImplementation`` for inherited attributes.
208+
209+
Parameters
210+
----------
211+
aggregator_func_name
212+
The name of the aggregation function to use.
213+
splitter_node_name
214+
The name of the :class:`~easylink.step.SplitterStep` and its corresponding
215+
:class:`NullSplitterImplementation` that did the splitting.
216+
217+
"""
218+
219+
def __init__(
220+
self,
221+
name: str,
222+
input_slots: Iterable[InputSlot],
223+
output_slots: Iterable[OutputSlot],
224+
aggregator_func_name: str,
225+
splitter_node_name: str,
226+
):
227+
super().__init__(name, input_slots, output_slots)
228+
self.aggregator_func_name = aggregator_func_name
229+
"""The name of the aggregation function to use."""
230+
self.splitter_node_name = splitter_node_name
231+
"""The name of the :class:`~easylink.step.SplitterStep` and its corresponding
232+
:class:`NullSplitterImplementation` that did the splitting."""
233+
234+
175235
class PartialImplementation:
176236
"""One part of a combined implementation that spans multiple :class:`Steps<easylink.step.Step>`.
177237
@@ -205,8 +265,8 @@ def __init__(
205265
self,
206266
combined_name: str,
207267
schema_step: str,
208-
input_slots: Iterable["InputSlot"] = (),
209-
output_slots: Iterable["OutputSlot"] = (),
268+
input_slots: Iterable[InputSlot] = (),
269+
output_slots: Iterable[OutputSlot] = (),
210270
):
211271
self.combined_name = combined_name
212272
"""The name of the combined implementation of which this ``PartialImplementation``

0 commit comments

Comments
 (0)