Skip to content

Simplify scale API // Cluster + Adaptive class hierarchy change #5080

Open
@fjetter

Description

@fjetter

We currently have a heterogeneous set of implementations involving clusters and their scaling capabilities. Taking the LocalCluster as an example, with adaptive scaling, this involves many different and subtly similar classes

In [1]: from distributed import LocalCluster

In [2]: LocalCluster.__mro__
Out[2]:
(distributed.deploy.local.LocalCluster,
 distributed.deploy.spec.SpecCluster,
 distributed.deploy.cluster.Cluster,
 object)

and on top there is the adaptive implementation with distributed.deploy.adaptive.Adaptive and distributed.deploy.adaptive_core.AdaptiveCore

There are the following methods capable of scaling

  • SpecCluster.scale
  • Cluster.scale (NotImplemented)
  • async SpecCluster.scale_down
  • async Adaptive.scale_up
  • async Adaptive.scale_down
  • async AdaptiveCore.scale_up (NotImplementedError)
  • async AdaptiveCore.scale_down (NotImplementedError)
  • async AdaptiveCore.adapt

The only obvious and full implementation of scale is in SpecCluster but this implementation has the shortcoming that it is not smart, i.e. upon downscaling it will remove random workers. Clusters which do not inherit from SpecCluster cannot benefit from manual scaling by default but they can use adaptive scaling to a certain extend since scale_down is implemented in Adaptive. By implementing Cluster.scale_up a cluster would be enabled for fully adaptive, smart scaling but no manual Cluster.scale

There is an unobvious but smart implementation available as part of AdaptiveCore.adapt, see

target = await self.safe_target()
recommendations = await self.recommendations(target)
if recommendations["status"] != "same":
self.log.append((time(), dict(recommendations)))
status = recommendations.pop("status")
if status == "same":
return
if status == "up":
await self.scale_up(**recommendations)
if status == "down":
await self.scale_down(**recommendations)

which uses the method AdaptiveCore.recommendations to determine a smart scaling decision taking into account scheduler information and planned but not yet observed workers. This allows for an elegant and smart scaling implementation with the only requirement that the attributes plan, requested, observed and the method workers_to_close (default available in scheduler) are implemented. The attributes are already implemented for Cluster.

    async def scale(self, target):
        recommendations = await self.recommendations(target)

        if recommendations["status"] != "same":
            self.log.append((time(), dict(recommendations)))

        status = recommendations.pop("status")
        if status == "same":
            return
        if status == "up":
            await self.scale_up(**recommendations)
        if status == "down":
            await self.scale_down(**recommendations)

Moving the recommendations logic up the class hierarchies into Cluster would allow us to implement one Cluster.scale which should serve all/most use cases.
This would either require us to redefine the Adaptive interface or copy code. Since I'm a bit confused about the class hierarchies, the above mentioned scale* methods and about various minor implementations I would like to propose a slightly new interface.

  • I would like to merge the AdaptiveCore and Adaptive classes since the core class is not functional and I imagine subclassing is typically done using Adaptive
  • All scale* methods will be implemented as part of Cluster. These methods are not supposed to implement smart logic about picking the "right" workers but are merely there to communicate with scheduler and resource manager to do the actual scaling. How the scaling is performed should use the same logic whether adaptivity is enabled or not.
  • Cluster.scale will be implemented using the current Adaptive.recommendations + scale_{up/down}. Therefore, the recommendations method will also be part of the Cluster together with workers_to_close which will simply query the scheduler.
  • adaptive = True / Cluster.adapt will simply start a PC which calls Cluster.scale(await Adaptive.safe_target)
  • The only user facing API for manual scaling will be the generic Cluster.scale(target: int). Subclasses are required to at least implement scale_up(workers: List[str])
  • That all reduces the entire Adaptive class to a single function defining the adaptive target to be put into a PC. This is user customisable but will default to scheduler.adaptive_target

The only thing I'm not entirely sure about is the workers_to_close method since I don't have a good feeling about whether this is actually something people want to override.

To summarise, all components to provide a generic and smart Cluster.scale implementation are there but the pieces are scattered all over the place. I believe by implementing the above suggestions, the class hierarchy would become simpler and scaling and adaptivity would become more accessible.

Thoughts?

cc @jacobtomlinson @marcosmoyano @mrocklin

Metadata

Metadata

Assignees

No one assigned

    Labels

    adaptiveAll things relating to adaptive scalingdiscussionDiscussing a topic with no specific actions yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions