Skip to content

add cluster spinup & job management utilities #53

@delgadom

Description

@delgadom

Cluster spinup

Spin up for a cluster and (optionally) wait for workers to appear, with a progress bar. Optionally use as a context manager to spin down cluster after job execution.

try:
    from tqdm.auto import tqdm
except ImportError:
    from tqdm import tqdm_notebook as tqdm


class setup_cluster(object):
    '''
    Scales up a dask cluster, with the option to block until the workers are available

    If ``setup_cluster`` is used as a context manager, the workers will be spun down
    upon exit. Note that this does not automatically wait for all tasks to be
    completed, so care should be taken to ensure all jobs will block until the context
    is exited. Because the workers will be spun down after job completion,
    debugging tasks can be difficult. Therefore, it's recommended that this not be
    used for prototyping, and that tasks have their own mechanism for error
    handling and possibly reporting when using ``setup_cluster`` this way.

    Parameters
    ----------
    nworkers : int
        Number of workers to create
    cluster_creator : func, optional
        Cluster creation function. Object returned must have ``scale`` and ``close``
        methods. Default is :py:func:`rhg_compute_tools.kubernetes.get_cluster`.
    cluster_kwargs : dict, optional
        Keyword arguments passed to ``cluster_creator``. Default ``{}``
    block : bool, optional
        Whether to block until all workers have come online. Default ``True``.
    pbar : bool, optional
        Whether to create a tqdm progress bar displaying worker spinup. Ignored
        if ``block`` is ``False``. Default ``True``.
    pbar_kwargs : dict, optional
        Keyword arguments passed to :py:func:`tqdm.auto.tqdm`

    Examples
    --------
    Can be used as a helper to scale up workers:

    .. code-block:: python

        >>> s = setup_cluster(10)
        >>> client, cluster = s.scale_and_wait_for_workers()  # doctest: +SKIP
        100%|██████████| 10/10 [00:12<00:00,  21.72s/it]
        >>> futures = client.map(lambda x: x**2, range(20))  # doctest: +SKIP

    Alternatively, can be used as a context manager:

    .. code-block:: python

        >>> with setup_cluster(10, pbar_kwargs={'desc': 'workers'}) as client, cluster:
        ...     futures = client.map(lambda x: x**2, range(20))
        ...     wait_for_futures(
        ...         futures,
        ...         pbar_kwargs={'desc': 'jobs'})  # doctest: +SKIP
        ...
        workers: 100%|██████████| 10/10 [00:12<00:00,  1.20s/it]
        jobs: 100%|██████████| 10/10 [00:01<00:00,  9.83it/s]
    '''
    def __init__(
            self,
            nworkers,
            cluster_creator=None,
            cluster_kwargs=None,
            block=True,
            pbar=True,
            pbar_kwargs=None):
        self.nworkers = nworkers
        self.cluster_creator = (
            cluster_creator if cluster_creator is not None else rhgk.get_cluster)
        self.cluster_kwargs = cluster_kwargs if cluster_kwargs is not None else {}
        self.block = block
        self.pbar = pbar
        self.pbar_kwargs = None
    
    def scale_and_wait_for_workers(self):
        self.client, self.cluster = self.cluster_creator(**self.cluster_args)
        self.cluster.scale(self.nworkers)
        
        if self.block and self.pbar:
            pbar = tqdm.tqdm_notebook(total=self.nworkers, desc='workers')

            while True:
                nworkers = len(self.client.ncores().values())
                pbar.n = nworkers
                pbar.refresh()
                if nworkers < self.nworkers:
                    time.sleep(0.2)
                else:
                    pbar.n = nworkers
                    pbar.refresh()
                    break

        return self.client, self.cluster
        
    def __enter__(self):
        return self.scale_and_wait_for_workers()

    def __exit__(self, *args, **kwargs):
        self.cluster.scale(0)
        self.client.close()
        self.cluster.close()

Task management

Wait for futures to complete, with a progress bar

def wait_for_futures(futures, pbar_kwargs=None):
    '''
    Blocking progress bar for dask futures

    Provides a progress bar which will block the python interpreter until
    all futures are completed

    Parameters
    ----------
    futures : list or dask object
        list of dask futures objects, or a dask collection such as a
        Dataframe or Array object with a dask attribute
    kwargs:
        Keyword arguments passed to tqdm.auto.tqdm constructor
    '''
    if pbar_kwargs is None:
        pbar_kwargs = {}

    if hasattr(futures, 'dask'):
        futures = futures.dask.values()

    pbar = tqdm(dd.as_completed(futures), total=len(futures), **pbar_kwargs)

    errors = 0
    for f in pbar:
        if f.status == 'error':
            errors += 1
            pbar.set_postfix({'errors': errors})

Other useful features

  • Task progress save/recovery (maybe a job for parameterize_jobs)
  • Job caching utilities (a decorator, probably?)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions