Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
[Unreleased]
### Added
### Changed
- When restoring the sampler with the pool, use an updated value of `queue_size` based on the pool size
- Use `chunksize=1` for the dynesty pool, as that is better behaved for `queue_size > nthreads` and unequal durations of function evaluations
- When starting dynesty with a multiprocessing pool, dynesty now tries to use the `_processes` keyword to determine how many CPUs are available. This should reduce the need for manual `queue_size` specification
### Fixed

[3.0.0 - 2025-10-04]
Expand Down
2 changes: 1 addition & 1 deletion docs/source/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ it's likely that I might not be able to help all that much.

**How to decide on the number of processes in a pool and how to set queue_size**

Assuming that you decided on the number of live-points K that you want to use and that the likelihood evaluation is not very quick, you should use as many processes as you can up to around K. The queue_size should be equal the number of processes. If you are using the the number of processes that M is smaller than K, you may want to use :math:`M=K//2` or :math:`M=K//3` i.e integer fractions. So if you are using 1024 live-points all powers of two up to 1024 would be good choice for the number of processes.
Assuming that you decided on the number of live-points K that you want to use and that the likelihood evaluation is not very quick, you should use as many processes as you can up to around K. The queue_size should be equal the number of processes. If you are using the the number of processes that M is smaller than K, you may want to use :math:`M=K//2` or :math:`M=K//3` i.e integer fractions. So if you are using 1024 live-points all powers of two up to 1024 would be good choice for the number of processes. If you have likelihood functions that take highly variable amount of time to evaluate, you may benefit from using the queue_size that is significantly larger than the number CPUs (but queue_size should be always < K)


**I would like to run dynesty across multiple nodes on a cluster. How do I do that ?**
Expand Down
3 changes: 3 additions & 0 deletions docs/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,9 @@ If you used the pool in the sampler and you want to use the pool after restoring
# resume
sampler.run_nested(resume=True)

You should be careful when restoring the sampler on machine with different number of CPUs when using a pool.
We will still use the original queue_size unless it was 1 before.
Comment on lines +657 to +658

The checkpointing may be helpful if you are running dynesty on HPC with a queue system that has a limit on a wall-time that your jobs can run.
There is a however an important reminder that should *NOT* use checkpointing for persistence. I.e. if you want to save the results of the sampling, you should save samples, weights or the results object, rather than the whole nested sampling object (as checkpointing does). The reason for this is that the checkpoint files are not guaranteed to be compatible between dynesty versions (even minor ones).

Expand Down
29 changes: 2 additions & 27 deletions py/dynesty/dynesty.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from .bounding import BOUND_LIST
from . import bounding
from .dynamicsampler import DynamicSampler
from .utils import (LogLikelihood, get_random_generator, get_nonbounded)
from .utils import (LogLikelihood, get_random_generator, get_nonbounded,
_parse_pool_queue)

__all__ = ["NestedSampler", "DynamicNestedSampler", "_function_wrapper"]

Expand Down Expand Up @@ -199,32 +200,6 @@ def _get_enlarge_bootstrap(sample, enlarge, bootstrap):
'sense unless bootstrap=0 or enlarge = 1')


def _parse_pool_queue(pool, queue_size):
"""
Common functionality of interpretign the pool and queue_size
arguments to Dynamic and static nested samplers
"""
if queue_size is not None and queue_size < 1:
raise ValueError("The queue must contain at least one element!")
elif (queue_size == 1) or (pool is None and queue_size is None):
mapper = map
queue_size = 1
elif pool is not None:
mapper = pool.map
if queue_size is None:
try:
queue_size = pool.size
except AttributeError as e:
raise ValueError("Cannot initialize `queue_size` because "
"`pool.size` has not been provided. Please"
"define `pool.size` or specify `queue_size` "
"explicitly.") from e
else:
raise ValueError("`queue_size > 1` but no `pool` provided.")

return mapper, queue_size


def _check_first_update(first_update):
"""
Verify that the first_update dictionary is valid
Expand Down
2 changes: 1 addition & 1 deletion py/dynesty/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def map(self, F, x):
F: function
x: iterable
"""
return self.pool.map(F, x)
return self.pool.map(F, x, chunksize=1)

def __exit__(self, exc_type, exc_val, exc_tb):
try:
Expand Down
47 changes: 43 additions & 4 deletions py/dynesty/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2146,10 +2146,22 @@ def restore_sampler(fname, pool=None):
f'The dynesty version in the checkpoint file ({save_ver})'
f'does not match the current dynesty version'
f'({DYNESTY_VERSION}). That is *NOT* guaranteed to work')
if pool is not None:
mapper = pool.map
else:
mapper = map

queue_size_old = getattr(sampler, 'queue_size', None)
assert queue_size_old is not None # I don't think it could ever happen
try:
# we first try to get the new queue_size
# that may fail if the pool has no information about the size
mapper, queue_size_new = _parse_pool_queue(pool, None)
except ValueError:
# if first failed we are using the old queue_size
mapper, queue_size_new = _parse_pool_queue(pool, queue_size_old)

if queue_size_new != queue_size_old and queue_size_old != 1:
warnings.warn(
f'Restoring the sampler with queue_size {queue_size_old}')
Comment on lines +2160 to +2162
queue_size_new = queue_size_old

if hasattr(sampler, 'sampler'):
# This is the case of the dynamic sampler
# this is better be written as isinstanceof()
Expand All @@ -2168,6 +2180,7 @@ def restore_sampler(fname, pool=None):
for cursamp in samplers:
cursamp.mapper = mapper
cursamp.pool = pool
cursamp.queue_size = queue_size_new
return sampler


Expand Down Expand Up @@ -2206,3 +2219,29 @@ def save_sampler(sampler, fname):
except: # noqa
pass
raise


def _parse_pool_queue(pool, queue_size):
"""
Common functionality of interpreting the pool and queue_size
arguments to Dynamic and static nested samplers
"""
if queue_size is not None and queue_size < 1:
raise ValueError("The queue must contain at least one element!")
if pool is None:
if queue_size is not None and queue_size > 1:
raise ValueError("`queue_size > 1` but no `pool` provided.")
mapper = map
queue_size = 1
elif pool is not None:
mapper = pool.map
if queue_size is None:
queue_size = getattr(pool, '_processes', None) or getattr(
pool, 'size', None)
if queue_size is None:
raise ValueError(
"Cannot initialize `queue_size` because "
"`pool.size` or `pool._processes` has not been provided. "
"Please define `pool.size` or specify `queue_size` "
"explicitly.")
return mapper, queue_size
21 changes: 20 additions & 1 deletion tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def test_pool_args2():
terminator(pool)


@pytest.mark.parametrize('sample', ['slice', 'rwalk', 'rslice'])
@pytest.mark.parametrize('sample', ['slice', 'rwalk', 'rslice', 'unif'])
def test_pool_samplers(sample):
# this is to test how the samplers are dealing with queue_size>1
rstate = get_rstate()
Expand Down Expand Up @@ -223,3 +223,22 @@ def test_usepool(func):
queue_size=100)
sampler.run_nested(maxiter=10000, print_progress=printing)
terminator(pool)


@pytest.mark.parametrize('queue_size', [None, 2])
def test_pool_queue_size(queue_size):
# this is to test how the samplers are dealing with specified
# or unspecified queue_size
rstate = get_rstate()

ctx = mp.get_context('spawn')
with ctx.Pool(2) as pool:
sampler = dynesty.NestedSampler(loglike_gau,
prior_transform_gau,
ndim,
nlive=nlive,
sample='rslice',
pool=pool,
queue_size=queue_size,
rstate=rstate)
sampler.run_nested(print_progress=printing, dlogz=10)
71 changes: 71 additions & 0 deletions tests/test_resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,77 @@ def test_resume(dynamic, delay_frac, with_pool, dyn_pool):
pass


@pytest.mark.parametrize(
"delay_frac",
[.2, .5, .75, .9],
)
@pytest.mark.xdist_group(name="resume_group")
def test_resume_queue_size(delay_frac):
"""
Test we can interrupt and resume nested runs
Note that I used xdist_group here in order to guarantee that if all the
tests are run in parallel, this one is executed in one thread because
I want to only use one getlogz() call.
"""
fname = get_fname(inspect.currentframe().f_code.co_name)

save_every = 1
cache_dt, cache_logz = getlogz(fname, save_every)
npool = 2
curdt, curlogz = [_[False, True] for _ in [cache_dt, cache_logz]]
save_every = min(save_every, curdt / 10)
curdt *= delay_frac

# For spawn context, we need to account for startup time
ready_file = fname + '.ready'

try:
# Always use spawn context to match actual usage
fit_proc = mp.get_context('spawn').Process(
target=fit_main,
args=(fname, False, save_every, npool, False, NEFF0, ready_file))
start_time = time.time()
fit_proc.start()

# Wait for spawn process to be ready before starting timer
while not os.path.exists(ready_file):
time.sleep(0.01)
if time.time() - start_time > 5: # Safety timeout
raise RuntimeError("Process failed to start")
# Account for startup time in the timeout
startup_time = time.time() - start_time
actual_timeout = curdt + startup_time

fit_proc.join(actual_timeout)
# Proceed to terminate only if the process did not finish in time.
if fit_proc.is_alive():
print('terminating', file=sys.stderr)
fit_proc.terminate()
if np.allclose(delay_frac, .2) and not os.path.exists(fname):
warnings.warn(
"The checkpoint file was not created I'm skipping the test"
)
return

with mp.get_context('spawn').Pool(npool + 1) as pool:
fit_resume(fname, False, curlogz, pool=pool)
else:
assert fit_proc.exitcode == 0
finally:
try:
os.unlink(fname)
except: # noqa
pass
try:
os.unlink(fname + '.tmp')
except: # noqa
pass
try:
os.unlink(ready_file)
except: # noqa
pass


@pytest.mark.parametrize("dynamic", [False, True])
def test_save(dynamic):
"""
Expand Down