Skip to content

Commit ce2ace9

Browse files
jonassorgenfreiPaulSchweizergrische
authored
Add type hints and make mypy compatible (#192)
* add type hints enable mypy check enable test and linting check on pull request drop python2 support Closes #189 * split push and pull request * fix typo in mypy file * Adding to the typehints * Remove old py2 code * Add mypy to pre-commit and github pipeline * Improving the type hints of the Node decorator * fix mypy * fix python3.9 issues * fix python import issues * Update .github/workflows/pytest.yml Co-authored-by: Grische <[email protected]> * mypy python version matrix * only explicit versios * fix mypy version matrix --------- Co-authored-by: Paul Schweizer <[email protected]> Co-authored-by: Grische <[email protected]>
1 parent 596e4f3 commit ce2ace9

File tree

9 files changed

+442
-261
lines changed

9 files changed

+442
-261
lines changed

.github/workflows/mypy.yml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
name: Mypy
2+
3+
on:
4+
push:
5+
branches:
6+
- "master"
7+
- "main"
8+
pull_request:
9+
branches:
10+
- "master"
11+
- "main"
12+
workflow_dispatch:
13+
14+
jobs:
15+
mypy:
16+
name: Mypy
17+
runs-on: ubuntu-latest
18+
strategy:
19+
matrix:
20+
python-version: ["3.9", "3.10", "3.11", "3.x"]
21+
steps:
22+
- name: Check out repository code
23+
uses: actions/checkout@v5
24+
- name: Set up Python ${{ matrix.python-version }}
25+
uses: actions/setup-python@v6
26+
with:
27+
python-version: ${{ matrix.python-version }}
28+
- name: Install Poetry
29+
run: |
30+
python -m pip install --upgrade poetry wheel
31+
32+
- name: Install dependencies
33+
run: |
34+
poetry install
35+
36+
- name: Run mypy type checks
37+
run: |
38+
poetry run mypy ./flowpipe

.pre-commit-config.yaml

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,18 @@ repos:
77
rev: 5.12.0
88
hooks:
99
- id: isort
10-
- repo: https://github.com/python-poetry/poetry
11-
rev: 1.7.0
10+
- repo: local
1211
hooks:
1312
- id: poetry-check
13+
name: poetry-check
14+
entry: poetry check
15+
language: system
16+
pass_filenames: false
1417
- id: poetry-lock
15-
- repo: local
16-
hooks:
18+
name: poetry-lock
19+
entry: poetry lock
20+
language: system
21+
pass_filenames: false
1722
- id: unittests
1823
name: unittests
1924
language: system
@@ -24,3 +29,9 @@ repos:
2429
language: system
2530
entry: poetry run pylint ./flowpipe
2631
pass_filenames: false
32+
- id: mypy
33+
name: mypy (flowpipe)
34+
entry: poetry run mypy flowpipe
35+
language: system
36+
pass_filenames: false
37+
files: ^flowpipe/.*\.py$

flowpipe/evaluator.py

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,28 @@
11
"""Classes to evaluate flowpipe Graphs in various ways."""
22

3+
from __future__ import annotations
4+
35
import logging
46
import time
57
from concurrent import futures
68
from multiprocessing import Manager, Process
79
from pickle import PicklingError
10+
from typing import TYPE_CHECKING
811

912
from .errors import FlowpipeMultiprocessingError
1013

14+
if TYPE_CHECKING: # pragma: no cover
15+
from .graph import Graph
16+
from .node import INode
17+
1118
log = logging.getLogger(__name__)
1219

1320

1421
class Evaluator:
1522
"""An engine to evaluate a Graph."""
1623

1724
@staticmethod
18-
def _evaluation_sequence(graph):
25+
def _evaluation_sequence(graph: Graph) -> list[INode]:
1926
"""Sort Nodes into a sequential, flat execution order.
2027
2128
Replicated here for flexibility; defaults to Graph's implementation.
@@ -27,18 +34,20 @@ def _evaluation_sequence(graph):
2734
"""
2835
return graph.evaluation_sequence
2936

30-
def _nodes_to_evaluate(self, graph, skip_clean):
37+
def _nodes_to_evaluate(
38+
self, graph: Graph, skip_clean: bool
39+
) -> list[INode]:
3140
"""Get the nodes to evaluate, in order."""
3241
nodes = self._evaluation_sequence(graph)
3342
if skip_clean:
3443
nodes = [n for n in nodes if n.is_dirty]
3544
return nodes
3645

37-
def _evaluate_nodes(self, nodes):
46+
def _evaluate_nodes(self, nodes: list[INode]) -> None:
3847
"""Perform the actual node evaluation."""
3948
raise NotImplementedError # pragma: no cover
4049

41-
def evaluate(self, graph, skip_clean=False):
50+
def evaluate(self, graph: Graph, skip_clean: bool = False) -> None:
4251
"""Evaluate the graph.
4352
4453
Args:
@@ -55,7 +64,7 @@ def evaluate(self, graph, skip_clean=False):
5564
class LinearEvaluator(Evaluator):
5665
"""Evaluate the graph linearly in a single thread."""
5766

58-
def _evaluate_nodes(self, nodes):
67+
def _evaluate_nodes(self, nodes: list[INode]) -> None:
5968
"""Evaluate the graph linearly in a single thread.
6069
6170
Args:
@@ -69,18 +78,17 @@ def _evaluate_nodes(self, nodes):
6978
class ThreadedEvaluator(Evaluator):
7079
"""Evaluate each node in a separate thread."""
7180

72-
def __init__(self, max_workers=None):
81+
def __init__(self, max_workers: int | None = None):
7382
"""Intialize with the graph and how many threads to use.
7483
7584
Args:
76-
graph (flowpipe.Graph): The graph to evaluate.
7785
max_workers (int): The number of threads to use in parallel,
7886
defaults to the futures.ThreadPoolExecutor default.
7987
8088
"""
8189
self.max_workers = max_workers
8290

83-
def _evaluate_nodes(self, nodes):
91+
def _evaluate_nodes(self, nodes: list[INode]) -> None:
8492
"""Evaluate each node in a separate thread.
8593
8694
Args:
@@ -90,11 +98,11 @@ def _evaluate_nodes(self, nodes):
9098
# create copy to prevent side effects
9199
nodes_to_evaluate = list(nodes)
92100

93-
def node_runner(node):
101+
def node_runner(node: INode) -> INode:
94102
node.evaluate()
95103
return node
96104

97-
running_futures = {}
105+
running_futures: dict = {}
98106
with futures.ThreadPoolExecutor(max_workers=self.max_workers) as tpe:
99107
while nodes_to_evaluate or running_futures:
100108
log.debug(
@@ -148,7 +156,7 @@ def node_runner(node):
148156
class LegacyMultiprocessingEvaluator(Evaluator):
149157
"""Evaluate nodes in separate processes."""
150158

151-
def __init__(self, submission_delay=0.1):
159+
def __init__(self, submission_delay: float = 0.1):
152160
"""Initialize with the graph and the delay between launching nodes.
153161
154162
Args:
@@ -158,14 +166,14 @@ def __init__(self, submission_delay=0.1):
158166
"""
159167
self.submission_delay = submission_delay
160168

161-
def _evaluate_nodes(self, nodes):
169+
def _evaluate_nodes(self, nodes: list[INode]):
162170
# create copy to prevent side effects
163171
nodes_to_evaluate = list(nodes)
164172
manager = Manager()
165173
nodes_data = manager.dict()
166-
processes = {}
174+
processes: dict[str, Process] = {}
167175

168-
def upstream_ready(node):
176+
def upstream_ready(node: INode) -> bool:
169177
"""Check whether all upstream nodes have been evaluated."""
170178
for upstream in node.upstream_nodes:
171179
if upstream in nodes_to_evaluate:
@@ -208,7 +216,7 @@ def upstream_ready(node):
208216
time.sleep(self.submission_delay)
209217

210218

211-
def _evaluate_node_in_process(identifier, nodes_data):
219+
def _evaluate_node_in_process(identifier: str, nodes_data: dict):
212220
"""Evaluate a node when multiprocessing.
213221
214222
1. Deserializing the node from the given nodes_data dict
@@ -253,7 +261,7 @@ def _evaluate_node_in_process(identifier, nodes_data):
253261
nodes_data[identifier] = data
254262

255263

256-
def _update_node(node, data):
264+
def _update_node(node: INode, data: dict):
257265
"""Apply the plug values of the data dict to the node object."""
258266
for name, input_plug in data["inputs"].items():
259267
node.inputs[name].value = input_plug["value"]

flowpipe/event.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
33
They an be used to observe the evaluation process.
44
"""
5+
56
import logging
7+
from collections.abc import Callable
68

79
log = logging.getLogger(__name__)
810

@@ -14,38 +16,38 @@ class Event:
1416
Please note that the integrity of the listeners is not enforced or checked.
1517
"""
1618

17-
def __init__(self, name):
19+
def __init__(self, name: str):
1820
"""Initialize the list of listeners
1921
2022
Args:
2123
name (str): The (unique) name of the signal
2224
"""
2325
self.name = name
24-
self._listeners = []
26+
self._listeners:list[Callable] = []
2527

26-
def emit(self, *args, **kwargs):
28+
def emit(self, *args, **kwargs) -> None:
2729
"""Call all the listeners with the given args and kwargs."""
2830
for listener in self._listeners:
2931
listener(*args, **kwargs)
3032

31-
def register(self, listener):
33+
def register(self, listener: Callable) -> None:
3234
"""Register the given function object if it is not yet registered."""
3335
if not self.is_registered(listener):
3436
self._listeners.append(listener)
3537

36-
def deregister(self, listener):
38+
def deregister(self, listener: Callable) -> None:
3739
"""Deregister the given function object if it is registered."""
3840
if self.is_registered(listener):
3941
self._listeners.pop(self._listeners.index(listener))
4042
log.debug("%s deregistered", listener)
4143
else:
4244
log.exception("%s was never registered", listener)
4345

44-
def is_registered(self, listener):
46+
def is_registered(self, listener: Callable) -> bool:
4547
"""Whether the given function object is already registered."""
4648
return listener in self._listeners
4749

48-
def clear(self):
50+
def clear(self) -> None:
4951
"""Remove all listeners from this event."""
5052
for listener in self._listeners:
5153
self.deregister(listener)

0 commit comments

Comments
 (0)