Skip to content

Adaptive scaling went above the number of available tasks #4108

Open
@mrocklin

Description

@mrocklin

Currently Dask supports adaptive scaling. This allows it to scale the number of workers based on load. The logic for how many workers to ask for is based on many things, like how much work we have, how much memory we need, and so on.

I recently observed a cluster scale up beyond the number of available tasks. It shouldn't do this. I had three tasks, each of which took several minutes, and I saw my cluster scale up to ten workers. Probably there should be a maximum applied in there somewhere that wasn't being set. There appears to already be some code to do this (the intent is there) but perhaps there is a but. The relevant code is here

def adaptive_target(self, comm=None, target_duration=None):
"""Desired number of workers based on the current workload
This looks at the current running tasks and memory use, and returns a
number of desired workers. This is often used by adaptive scheduling.
Parameters
----------
target_duration: str
A desired duration of time for computations to take. This affects
how rapidly the scheduler will ask to scale.
See Also
--------
distributed.deploy.Adaptive
"""
if target_duration is None:
target_duration = dask.config.get("distributed.adaptive.target-duration")
target_duration = parse_timedelta(target_duration)
# CPU
cpu = math.ceil(
self.total_occupancy / target_duration
) # TODO: threads per worker
# Avoid a few long tasks from asking for many cores
tasks_processing = 0
for ws in self.workers.values():
tasks_processing += len(ws.processing)
if tasks_processing > cpu:
break
else:
cpu = min(tasks_processing, cpu)
if self.unrunnable and not self.workers:
cpu = max(1, cpu)
# Memory
limit_bytes = {addr: ws.memory_limit for addr, ws in self.workers.items()}
worker_bytes = [ws.nbytes for ws in self.workers.values()]
limit = sum(limit_bytes.values())
total = sum(worker_bytes)
if total > 0.6 * limit:
memory = 2 * len(self.workers)
else:
memory = 0
target = max(memory, cpu)
if target >= len(self.workers):
return target
else: # Scale down?
to_close = self.workers_to_close()
return len(self.workers) - len(to_close)

Metadata

Metadata

Assignees

No one assigned

    Labels

    adaptiveAll things relating to adaptive scalinggood second issueClearly described, educational, but less trivial than "good first issue".

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions