Skip to content

Commit 6f1daf7

Browse files
authored
Merge pull request #506 from joshspeagle/pool_branch
Pool improvements
2 parents 8812f7e + 68c40fd commit 6f1daf7

8 files changed

Lines changed: 144 additions & 34 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
77
[Unreleased]
88
### Added
99
### Changed
10+
- When restoring the sampler with the pool, use an updated value of `queue_size` based on the pool size
11+
- Use `chunksize=1` for the dynesty pool, as that is better behaved for `queue_size > nthreads` and unequal durations of function evaluations
12+
- When starting dynesty with a multiprocessing pool, dynesty now tries to use the `_processes` keyword to determine how many CPUs are available. This should reduce the need for manual `queue_size` specification
1013
### Fixed
1114

1215
[3.0.0 - 2025-10-04]

docs/source/faq.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ it's likely that I might not be able to help all that much.
434434

435435
**How to decide on the number of processes in a pool and how to set queue_size**
436436

437-
Assuming that you decided on the number of live-points K that you want to use and that the likelihood evaluation is not very quick, you should use as many processes as you can up to around K. The queue_size should be equal the number of processes. If you are using the the number of processes that M is smaller than K, you may want to use :math:`M=K//2` or :math:`M=K//3` i.e integer fractions. So if you are using 1024 live-points all powers of two up to 1024 would be good choice for the number of processes.
437+
Assuming that you decided on the number of live-points K that you want to use and that the likelihood evaluation is not very quick, you should use as many processes as you can up to around K. The queue_size should be equal the number of processes. If you are using the the number of processes that M is smaller than K, you may want to use :math:`M=K//2` or :math:`M=K//3` i.e integer fractions. So if you are using 1024 live-points all powers of two up to 1024 would be good choice for the number of processes. If you have likelihood functions that take highly variable amount of time to evaluate, you may benefit from using the queue_size that is significantly larger than the number CPUs (but queue_size should be always < K)
438438

439439

440440
**I would like to run dynesty across multiple nodes on a cluster. How do I do that ?**

docs/source/quickstart.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,9 @@ If you used the pool in the sampler and you want to use the pool after restoring
654654
# resume
655655
sampler.run_nested(resume=True)
656656

657+
You should be careful when restoring the sampler on machine with different number of CPUs when using a pool.
658+
We will still use the original queue_size unless it was 1 before.
659+
657660
The checkpointing may be helpful if you are running dynesty on HPC with a queue system that has a limit on a wall-time that your jobs can run.
658661
There is a however an important reminder that should *NOT* use checkpointing for persistence. I.e. if you want to save the results of the sampling, you should save samples, weights or the results object, rather than the whole nested sampling object (as checkpointing does). The reason for this is that the checkpoint files are not guaranteed to be compatible between dynesty versions (even minor ones).
659662

py/dynesty/dynesty.py

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
from .bounding import BOUND_LIST
1919
from . import bounding
2020
from .dynamicsampler import DynamicSampler
21-
from .utils import (LogLikelihood, get_random_generator, get_nonbounded)
21+
from .utils import (LogLikelihood, get_random_generator, get_nonbounded,
22+
_parse_pool_queue)
2223

2324
__all__ = ["NestedSampler", "DynamicNestedSampler", "_function_wrapper"]
2425

@@ -199,32 +200,6 @@ def _get_enlarge_bootstrap(sample, enlarge, bootstrap):
199200
'sense unless bootstrap=0 or enlarge = 1')
200201

201202

202-
def _parse_pool_queue(pool, queue_size):
203-
"""
204-
Common functionality of interpretign the pool and queue_size
205-
arguments to Dynamic and static nested samplers
206-
"""
207-
if queue_size is not None and queue_size < 1:
208-
raise ValueError("The queue must contain at least one element!")
209-
elif (queue_size == 1) or (pool is None and queue_size is None):
210-
mapper = map
211-
queue_size = 1
212-
elif pool is not None:
213-
mapper = pool.map
214-
if queue_size is None:
215-
try:
216-
queue_size = pool.size
217-
except AttributeError as e:
218-
raise ValueError("Cannot initialize `queue_size` because "
219-
"`pool.size` has not been provided. Please"
220-
"define `pool.size` or specify `queue_size` "
221-
"explicitly.") from e
222-
else:
223-
raise ValueError("`queue_size > 1` but no `pool` provided.")
224-
225-
return mapper, queue_size
226-
227-
228203
def _check_first_update(first_update):
229204
"""
230205
Verify that the first_update dictionary is valid

py/dynesty/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def map(self, F, x):
154154
F: function
155155
x: iterable
156156
"""
157-
return self.pool.map(F, x)
157+
return self.pool.map(F, x, chunksize=1)
158158

159159
def __exit__(self, exc_type, exc_val, exc_tb):
160160
try:

py/dynesty/utils.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2146,10 +2146,22 @@ def restore_sampler(fname, pool=None):
21462146
f'The dynesty version in the checkpoint file ({save_ver})'
21472147
f'does not match the current dynesty version'
21482148
f'({DYNESTY_VERSION}). That is *NOT* guaranteed to work')
2149-
if pool is not None:
2150-
mapper = pool.map
2151-
else:
2152-
mapper = map
2149+
2150+
queue_size_old = getattr(sampler, 'queue_size', None)
2151+
assert queue_size_old is not None # I don't think it could ever happen
2152+
try:
2153+
# we first try to get the new queue_size
2154+
# that may fail if the pool has no information about the size
2155+
mapper, queue_size_new = _parse_pool_queue(pool, None)
2156+
except ValueError:
2157+
# if first failed we are using the old queue_size
2158+
mapper, queue_size_new = _parse_pool_queue(pool, queue_size_old)
2159+
2160+
if queue_size_new != queue_size_old and queue_size_old != 1:
2161+
warnings.warn(
2162+
f'Restoring the sampler with queue_size {queue_size_old}')
2163+
queue_size_new = queue_size_old
2164+
21532165
if hasattr(sampler, 'sampler'):
21542166
# This is the case of the dynamic sampler
21552167
# this is better be written as isinstanceof()
@@ -2168,6 +2180,7 @@ def restore_sampler(fname, pool=None):
21682180
for cursamp in samplers:
21692181
cursamp.mapper = mapper
21702182
cursamp.pool = pool
2183+
cursamp.queue_size = queue_size_new
21712184
return sampler
21722185

21732186

@@ -2206,3 +2219,29 @@ def save_sampler(sampler, fname):
22062219
except: # noqa
22072220
pass
22082221
raise
2222+
2223+
2224+
def _parse_pool_queue(pool, queue_size):
2225+
"""
2226+
Common functionality of interpreting the pool and queue_size
2227+
arguments to Dynamic and static nested samplers
2228+
"""
2229+
if queue_size is not None and queue_size < 1:
2230+
raise ValueError("The queue must contain at least one element!")
2231+
if pool is None:
2232+
if queue_size is not None and queue_size > 1:
2233+
raise ValueError("`queue_size > 1` but no `pool` provided.")
2234+
mapper = map
2235+
queue_size = 1
2236+
elif pool is not None:
2237+
mapper = pool.map
2238+
if queue_size is None:
2239+
queue_size = getattr(pool, '_processes', None) or getattr(
2240+
pool, 'size', None)
2241+
if queue_size is None:
2242+
raise ValueError(
2243+
"Cannot initialize `queue_size` because "
2244+
"`pool.size` or `pool._processes` has not been provided. "
2245+
"Please define `pool.size` or specify `queue_size` "
2246+
"explicitly.")
2247+
return mapper, queue_size

tests/test_pool.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ def test_pool_args2():
178178
terminator(pool)
179179

180180

181-
@pytest.mark.parametrize('sample', ['slice', 'rwalk', 'rslice'])
181+
@pytest.mark.parametrize('sample', ['slice', 'rwalk', 'rslice', 'unif'])
182182
def test_pool_samplers(sample):
183183
# this is to test how the samplers are dealing with queue_size>1
184184
rstate = get_rstate()
@@ -223,3 +223,22 @@ def test_usepool(func):
223223
queue_size=100)
224224
sampler.run_nested(maxiter=10000, print_progress=printing)
225225
terminator(pool)
226+
227+
228+
@pytest.mark.parametrize('queue_size', [None, 2])
229+
def test_pool_queue_size(queue_size):
230+
# this is to test how the samplers are dealing with specified
231+
# or unspecified queue_size
232+
rstate = get_rstate()
233+
234+
ctx = mp.get_context('spawn')
235+
with ctx.Pool(2) as pool:
236+
sampler = dynesty.NestedSampler(loglike_gau,
237+
prior_transform_gau,
238+
ndim,
239+
nlive=nlive,
240+
sample='rslice',
241+
pool=pool,
242+
queue_size=queue_size,
243+
rstate=rstate)
244+
sampler.run_nested(print_progress=printing, dlogz=10)

tests/test_resume.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,77 @@ def test_resume(dynamic, delay_frac, with_pool, dyn_pool):
232232
pass
233233

234234

235+
@pytest.mark.parametrize(
236+
"delay_frac",
237+
[.2, .5, .75, .9],
238+
)
239+
@pytest.mark.xdist_group(name="resume_group")
240+
def test_resume_queue_size(delay_frac):
241+
"""
242+
Test we can interrupt and resume nested runs
243+
Note that I used xdist_group here in order to guarantee that if all the
244+
tests are run in parallel, this one is executed in one thread because
245+
I want to only use one getlogz() call.
246+
"""
247+
fname = get_fname(inspect.currentframe().f_code.co_name)
248+
249+
save_every = 1
250+
cache_dt, cache_logz = getlogz(fname, save_every)
251+
npool = 2
252+
curdt, curlogz = [_[False, True] for _ in [cache_dt, cache_logz]]
253+
save_every = min(save_every, curdt / 10)
254+
curdt *= delay_frac
255+
256+
# For spawn context, we need to account for startup time
257+
ready_file = fname + '.ready'
258+
259+
try:
260+
# Always use spawn context to match actual usage
261+
fit_proc = mp.get_context('spawn').Process(
262+
target=fit_main,
263+
args=(fname, False, save_every, npool, False, NEFF0, ready_file))
264+
start_time = time.time()
265+
fit_proc.start()
266+
267+
# Wait for spawn process to be ready before starting timer
268+
while not os.path.exists(ready_file):
269+
time.sleep(0.01)
270+
if time.time() - start_time > 5: # Safety timeout
271+
raise RuntimeError("Process failed to start")
272+
# Account for startup time in the timeout
273+
startup_time = time.time() - start_time
274+
actual_timeout = curdt + startup_time
275+
276+
fit_proc.join(actual_timeout)
277+
# Proceed to terminate only if the process did not finish in time.
278+
if fit_proc.is_alive():
279+
print('terminating', file=sys.stderr)
280+
fit_proc.terminate()
281+
if np.allclose(delay_frac, .2) and not os.path.exists(fname):
282+
warnings.warn(
283+
"The checkpoint file was not created I'm skipping the test"
284+
)
285+
return
286+
287+
with mp.get_context('spawn').Pool(npool + 1) as pool:
288+
fit_resume(fname, False, curlogz, pool=pool)
289+
else:
290+
assert fit_proc.exitcode == 0
291+
finally:
292+
try:
293+
os.unlink(fname)
294+
except: # noqa
295+
pass
296+
try:
297+
os.unlink(fname + '.tmp')
298+
except: # noqa
299+
pass
300+
try:
301+
os.unlink(ready_file)
302+
except: # noqa
303+
pass
304+
305+
235306
@pytest.mark.parametrize("dynamic", [False, True])
236307
def test_save(dynamic):
237308
"""

0 commit comments

Comments
 (0)