Skip to content

Commit 50de05d

Browse files
authored
✨ Introduce maxtasksperchild to increase stability (#29)
1 parent 019ee26 commit 50de05d

4 files changed

Lines changed: 9 additions & 16 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
### mapply vs. pandarallel vs. swifter
1313

14-
Where [`pandarallel`](https://github.com/nalepae/pandarallel) only requires [`dill`](https://github.com/uqfoundation/dill) (and therefore has to rely on in-house multiprocessing and progressbars), [`swifter`](https://github.com/jmcarpenter2/swifter) relies on the heavy [`dask`](https://github.com/dask/dask) framework, converting to Dask DataFrames and back. In an attempt to find the golden mean, `mapply` is highly customizable and remains lightweight, leveraging the powerful [`pathos`](https://github.com/uqfoundation/pathos) framework, which shadows Python's built-in multiprocessing module using `dill` for universal pickling.
14+
Where [`pandarallel`](https://github.com/nalepae/pandarallel) only requires [`dill`](https://github.com/uqfoundation/dill) (and therefore has to rely on in-house multiprocessing and progressbars), [`swifter`](https://github.com/jmcarpenter2/swifter) relies on the heavy [`dask`](https://github.com/dask/dask) framework, converting to Dask DataFrames and back. In an attempt to find the golden mean, `mapply` is highly customizable and remains lightweight, leveraging [`tqdm`](https://github.com/tqdm/tqdm) and [`multiprocess`](https://github.com/uqfoundation/multiprocess), which shadows Python's built-in multiprocessing module using [`dill`](https://github.com/uqfoundation/dill) for universal pickling.
1515

1616

1717
## Installation

docs/source/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,6 @@ def linkcode_resolve( # noqa:CCR001
181181
intersphinx_mapping = {
182182
"python": ("https://docs.python.org/3", None),
183183
"pandas": ("https://pandas.pydata.org/pandas-docs/stable", None),
184-
"pathos": ("https://pathos.readthedocs.io/en/latest", None),
184+
"multiprocess": ("https://multiprocess.readthedocs.io/en/latest/", None),
185185
# "tqdm": ("https://tqdm.github.io/docs/tqdm", None), # mkdocs not working
186186
}

requirements/prod.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
pathos>=0.2.0
1+
multiprocess
22
psutil
33
tqdm>=4.27 # from tqdm.auto import tqdm

src/mapply/parallel.py

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""Submodule containing code to distribute computation over multiple processes using :class:`pathos.multiprocessing.ProcessPool`.
1+
"""Submodule containing code to distribute computation over multiple processes using :class:`multiprocess.pool.Pool`.
22
33
Standalone usage:
44
::
@@ -19,11 +19,12 @@ def some_heavy_computation(x, power):
1919
)
2020
"""
2121
import logging
22+
import os
2223
from functools import partial
2324
from typing import Any, Callable, Iterable, Iterator, Optional
2425

2526
import psutil
26-
from pathos.multiprocessing import ProcessPool
27+
from multiprocess.pool import Pool
2728
from tqdm.auto import tqdm as _tqdm
2829

2930
logger = logging.getLogger(__name__)
@@ -37,6 +38,7 @@ def sensible_cpu_count() -> int:
3738

3839

3940
N_CORES = sensible_cpu_count()
41+
MAX_TASKS_PER_CHILD = os.environ.get("MAPPLY_MAX_TASKS_PER_CHILD", 4)
4042

4143

4244
def _choose_n_workers(n_chunks: Optional[int], n_workers: int) -> int:
@@ -78,10 +80,6 @@ def multiprocessing_imap(
7880
7981
Yields:
8082
Results in same order as input iterable.
81-
82-
Raises:
83-
Exception: Any error occurred during computation (will terminate the pool early).
84-
KeyboardInterrupt: Any KeyboardInterrupt sent by the user (will terminate the pool early).
8583
"""
8684
n_chunks: Optional[int] = tqdm(iterable, disable=True).__len__() # doesn't exhaust
8785
func = partial(func, *args, **kwargs)
@@ -94,7 +92,7 @@ def multiprocessing_imap(
9492
stage = map(func, iterable)
9593
else:
9694
logger.debug("Starting ProcessPool with %d workers", n_workers)
97-
pool = ProcessPool(n_workers)
95+
pool = Pool(n_workers, maxtasksperchild=MAX_TASKS_PER_CHILD)
9896

9997
stage = pool.imap(func, iterable)
10098

@@ -103,12 +101,7 @@ def multiprocessing_imap(
103101

104102
try:
105103
yield from stage
106-
except (Exception, KeyboardInterrupt):
104+
finally:
107105
if pool:
108106
logger.debug("Terminating ProcessPool")
109107
pool.terminate()
110-
raise
111-
finally:
112-
if pool:
113-
logger.debug("Closing ProcessPool")
114-
pool.clear()

0 commit comments

Comments
 (0)