Skip to content

Commit 0a997cd

Browse files
NJManganelliNick Manganellipre-commit-ci[bot]
authored
fix: set _max_workers in ThreadPoolExecutor with non-None initializer (#1254)
* Add _max_workers definition to ThreadPoolExecutor when arg is not None * Add test for ThreadPoolExecutor for None and 1 max_workers * style: pre-commit fixes --------- Co-authored-by: Nick Manganelli <[email protected]> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 599984e commit 0a997cd

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

Diff for: src/uproot/source/futures.py

+2
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ def __init__(self, max_workers: int | None = None):
217217
import multiprocessing
218218

219219
self._max_workers = multiprocessing.cpu_count()
220+
else:
221+
self._max_workers = max_workers
220222

221223
self._work_queue = queue.Queue()
222224
self._workers = []

Diff for: tests/test_1254_test_threadpool_executor_for_dask.py

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import pytest
2+
import skhep_testdata
3+
4+
import uproot
5+
6+
pytest.importorskip("pandas")
7+
8+
9+
def test_decompression_threadpool_executor_for_dask():
10+
11+
class TestThreadPoolExecutor(uproot.source.futures.ThreadPoolExecutor):
12+
def __init__(self, max_workers=None):
13+
super().__init__(max_workers=max_workers)
14+
self.submit_count = 0
15+
16+
def submit(self, task, /, *args, **kwargs):
17+
self.submit_count += 1
18+
super().submit(task, *args, **kwargs)
19+
20+
implicitexecutor = TestThreadPoolExecutor(max_workers=None)
21+
22+
a = uproot.dask(
23+
{skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root"): "sample"},
24+
decompression_executor=implicitexecutor,
25+
)
26+
27+
a["i4"].compute()
28+
29+
assert implicitexecutor.max_workers > 0
30+
31+
assert implicitexecutor.submit_count > 0
32+
33+
explicitexecutor = TestThreadPoolExecutor(max_workers=1)
34+
35+
b = uproot.dask(
36+
{skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root"): "sample"},
37+
decompression_executor=explicitexecutor,
38+
)
39+
40+
b["i4"].compute()
41+
42+
assert explicitexecutor.max_workers == 1
43+
44+
assert explicitexecutor.submit_count > 0

0 commit comments

Comments
 (0)