diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cf0f6d0..64ca4ee2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), [Unreleased] ### Added ### Changed +- When restoring the sampler with the pool, use an updated value of `queue_size` based on the pool size +- Use `chunksize=1` for the dynesty pool, as that is better behaved for `queue_size > nthreads` and unequal durations of function evaluations +- When starting dynesty with a multiprocessing pool, dynesty now tries to use the `_processes` keyword to determine how many CPUs are available. This should reduce the need for manual `queue_size` specification ### Fixed [3.0.0 - 2025-10-04] diff --git a/docs/source/faq.rst b/docs/source/faq.rst index 10e5047a..74ed0baa 100644 --- a/docs/source/faq.rst +++ b/docs/source/faq.rst @@ -434,7 +434,7 @@ it's likely that I might not be able to help all that much. **How to decide on the number of processes in a pool and how to set queue_size** -Assuming that you decided on the number of live-points K that you want to use and that the likelihood evaluation is not very quick, you should use as many processes as you can up to around K. The queue_size should be equal the number of processes. If you are using the the number of processes that M is smaller than K, you may want to use :math:`M=K//2` or :math:`M=K//3` i.e integer fractions. So if you are using 1024 live-points all powers of two up to 1024 would be good choice for the number of processes. +Assuming that you decided on the number of live-points K that you want to use and that the likelihood evaluation is not very quick, you should use as many processes as you can up to around K. The queue_size should be equal the number of processes. If you are using the the number of processes that M is smaller than K, you may want to use :math:`M=K//2` or :math:`M=K//3` i.e integer fractions. So if you are using 1024 live-points all powers of two up to 1024 would be good choice for the number of processes. If you have likelihood functions that take highly variable amount of time to evaluate, you may benefit from using the queue_size that is significantly larger than the number CPUs (but queue_size should be always < K) **I would like to run dynesty across multiple nodes on a cluster. How do I do that ?** diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index 7e101d69..73f7fede 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -654,6 +654,9 @@ If you used the pool in the sampler and you want to use the pool after restoring # resume sampler.run_nested(resume=True) +You should be careful when restoring the sampler on machine with different number of CPUs when using a pool. +We will still use the original queue_size unless it was 1 before. + The checkpointing may be helpful if you are running dynesty on HPC with a queue system that has a limit on a wall-time that your jobs can run. There is a however an important reminder that should *NOT* use checkpointing for persistence. I.e. if you want to save the results of the sampling, you should save samples, weights or the results object, rather than the whole nested sampling object (as checkpointing does). The reason for this is that the checkpoint files are not guaranteed to be compatible between dynesty versions (even minor ones). diff --git a/py/dynesty/dynesty.py b/py/dynesty/dynesty.py index 4484ac75..e030eb46 100644 --- a/py/dynesty/dynesty.py +++ b/py/dynesty/dynesty.py @@ -18,7 +18,8 @@ from .bounding import BOUND_LIST from . import bounding from .dynamicsampler import DynamicSampler -from .utils import (LogLikelihood, get_random_generator, get_nonbounded) +from .utils import (LogLikelihood, get_random_generator, get_nonbounded, + _parse_pool_queue) __all__ = ["NestedSampler", "DynamicNestedSampler", "_function_wrapper"] @@ -199,32 +200,6 @@ def _get_enlarge_bootstrap(sample, enlarge, bootstrap): 'sense unless bootstrap=0 or enlarge = 1') -def _parse_pool_queue(pool, queue_size): - """ - Common functionality of interpretign the pool and queue_size - arguments to Dynamic and static nested samplers - """ - if queue_size is not None and queue_size < 1: - raise ValueError("The queue must contain at least one element!") - elif (queue_size == 1) or (pool is None and queue_size is None): - mapper = map - queue_size = 1 - elif pool is not None: - mapper = pool.map - if queue_size is None: - try: - queue_size = pool.size - except AttributeError as e: - raise ValueError("Cannot initialize `queue_size` because " - "`pool.size` has not been provided. Please" - "define `pool.size` or specify `queue_size` " - "explicitly.") from e - else: - raise ValueError("`queue_size > 1` but no `pool` provided.") - - return mapper, queue_size - - def _check_first_update(first_update): """ Verify that the first_update dictionary is valid diff --git a/py/dynesty/pool.py b/py/dynesty/pool.py index 30b6ce0e..d46095b4 100644 --- a/py/dynesty/pool.py +++ b/py/dynesty/pool.py @@ -154,7 +154,7 @@ def map(self, F, x): F: function x: iterable """ - return self.pool.map(F, x) + return self.pool.map(F, x, chunksize=1) def __exit__(self, exc_type, exc_val, exc_tb): try: diff --git a/py/dynesty/utils.py b/py/dynesty/utils.py index cf8fbe38..021c6a93 100644 --- a/py/dynesty/utils.py +++ b/py/dynesty/utils.py @@ -2146,10 +2146,22 @@ def restore_sampler(fname, pool=None): f'The dynesty version in the checkpoint file ({save_ver})' f'does not match the current dynesty version' f'({DYNESTY_VERSION}). That is *NOT* guaranteed to work') - if pool is not None: - mapper = pool.map - else: - mapper = map + + queue_size_old = getattr(sampler, 'queue_size', None) + assert queue_size_old is not None # I don't think it could ever happen + try: + # we first try to get the new queue_size + # that may fail if the pool has no information about the size + mapper, queue_size_new = _parse_pool_queue(pool, None) + except ValueError: + # if first failed we are using the old queue_size + mapper, queue_size_new = _parse_pool_queue(pool, queue_size_old) + + if queue_size_new != queue_size_old and queue_size_old != 1: + warnings.warn( + f'Restoring the sampler with queue_size {queue_size_old}') + queue_size_new = queue_size_old + if hasattr(sampler, 'sampler'): # This is the case of the dynamic sampler # this is better be written as isinstanceof() @@ -2168,6 +2180,7 @@ def restore_sampler(fname, pool=None): for cursamp in samplers: cursamp.mapper = mapper cursamp.pool = pool + cursamp.queue_size = queue_size_new return sampler @@ -2206,3 +2219,29 @@ def save_sampler(sampler, fname): except: # noqa pass raise + + +def _parse_pool_queue(pool, queue_size): + """ + Common functionality of interpreting the pool and queue_size + arguments to Dynamic and static nested samplers + """ + if queue_size is not None and queue_size < 1: + raise ValueError("The queue must contain at least one element!") + if pool is None: + if queue_size is not None and queue_size > 1: + raise ValueError("`queue_size > 1` but no `pool` provided.") + mapper = map + queue_size = 1 + elif pool is not None: + mapper = pool.map + if queue_size is None: + queue_size = getattr(pool, '_processes', None) or getattr( + pool, 'size', None) + if queue_size is None: + raise ValueError( + "Cannot initialize `queue_size` because " + "`pool.size` or `pool._processes` has not been provided. " + "Please define `pool.size` or specify `queue_size` " + "explicitly.") + return mapper, queue_size diff --git a/tests/test_pool.py b/tests/test_pool.py index 3aa8f64d..166c3953 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -178,7 +178,7 @@ def test_pool_args2(): terminator(pool) -@pytest.mark.parametrize('sample', ['slice', 'rwalk', 'rslice']) +@pytest.mark.parametrize('sample', ['slice', 'rwalk', 'rslice', 'unif']) def test_pool_samplers(sample): # this is to test how the samplers are dealing with queue_size>1 rstate = get_rstate() @@ -223,3 +223,22 @@ def test_usepool(func): queue_size=100) sampler.run_nested(maxiter=10000, print_progress=printing) terminator(pool) + + +@pytest.mark.parametrize('queue_size', [None, 2]) +def test_pool_queue_size(queue_size): + # this is to test how the samplers are dealing with specified + # or unspecified queue_size + rstate = get_rstate() + + ctx = mp.get_context('spawn') + with ctx.Pool(2) as pool: + sampler = dynesty.NestedSampler(loglike_gau, + prior_transform_gau, + ndim, + nlive=nlive, + sample='rslice', + pool=pool, + queue_size=queue_size, + rstate=rstate) + sampler.run_nested(print_progress=printing, dlogz=10) diff --git a/tests/test_resume.py b/tests/test_resume.py index feab133e..db8657fb 100644 --- a/tests/test_resume.py +++ b/tests/test_resume.py @@ -232,6 +232,77 @@ def test_resume(dynamic, delay_frac, with_pool, dyn_pool): pass +@pytest.mark.parametrize( + "delay_frac", + [.2, .5, .75, .9], +) +@pytest.mark.xdist_group(name="resume_group") +def test_resume_queue_size(delay_frac): + """ + Test we can interrupt and resume nested runs + Note that I used xdist_group here in order to guarantee that if all the + tests are run in parallel, this one is executed in one thread because + I want to only use one getlogz() call. + """ + fname = get_fname(inspect.currentframe().f_code.co_name) + + save_every = 1 + cache_dt, cache_logz = getlogz(fname, save_every) + npool = 2 + curdt, curlogz = [_[False, True] for _ in [cache_dt, cache_logz]] + save_every = min(save_every, curdt / 10) + curdt *= delay_frac + + # For spawn context, we need to account for startup time + ready_file = fname + '.ready' + + try: + # Always use spawn context to match actual usage + fit_proc = mp.get_context('spawn').Process( + target=fit_main, + args=(fname, False, save_every, npool, False, NEFF0, ready_file)) + start_time = time.time() + fit_proc.start() + + # Wait for spawn process to be ready before starting timer + while not os.path.exists(ready_file): + time.sleep(0.01) + if time.time() - start_time > 5: # Safety timeout + raise RuntimeError("Process failed to start") + # Account for startup time in the timeout + startup_time = time.time() - start_time + actual_timeout = curdt + startup_time + + fit_proc.join(actual_timeout) + # Proceed to terminate only if the process did not finish in time. + if fit_proc.is_alive(): + print('terminating', file=sys.stderr) + fit_proc.terminate() + if np.allclose(delay_frac, .2) and not os.path.exists(fname): + warnings.warn( + "The checkpoint file was not created I'm skipping the test" + ) + return + + with mp.get_context('spawn').Pool(npool + 1) as pool: + fit_resume(fname, False, curlogz, pool=pool) + else: + assert fit_proc.exitcode == 0 + finally: + try: + os.unlink(fname) + except: # noqa + pass + try: + os.unlink(fname + '.tmp') + except: # noqa + pass + try: + os.unlink(ready_file) + except: # noqa + pass + + @pytest.mark.parametrize("dynamic", [False, True]) def test_save(dynamic): """