Skip to content

adaptive_target: no more workers then runnable tasks #4155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import dask
import pytest

from distributed import Client, wait, Adaptive, LocalCluster, SpecCluster, Worker
from distributed.utils_test import gen_test, slowinc, clean
from distributed.utils_test import loop, nodebug, cleanup # noqa: F401
from distributed import Adaptive, Client, LocalCluster, SpecCluster, Worker, wait
from distributed.metrics import time
from distributed.utils_test import clean, cleanup, gen_test, loop, slowinc # noqa: F401
from distributed.worker_client import get_worker


@pytest.mark.asyncio
Expand Down Expand Up @@ -449,3 +449,44 @@ async def test_update_adaptive(cleanup):
await asyncio.sleep(0.2)
assert first.periodic_callback is None
assert second.periodic_callback.is_running()


@pytest.mark.asyncio
async def test_adaptive_too_many_workers(cleanup):
""" Make sure that not more workers are spawned than runnable tasks."""
n_tasks = 3
n_max_workers = 10

async with LocalCluster(
n_workers=0, threads_per_worker=1, memory_limit=0, asynchronous=True
) as cluster:
cluster.adapt(minimum=0, maximum=n_max_workers, interval="100 ms")
async with Client(cluster, asynchronous=True) as client:
futures = client.map(sleep, range(n_tasks))
await client.gather(futures)

log = cluster._adaptive.log
for state in log:
if state[1]["status"] != "up":
continue
assert state[1]["n"] <= n_tasks


@pytest.mark.parametrize("memory_limit", (0, "1G"))
@pytest.mark.asyncio
async def test_adaptive_memory_limit(memory_limit, cleanup):
"""Make sure that adapt() works even when a memory limit is set."""

def _test_fun(x):
sleep(3)
return get_worker().name

n_tasks = 3

async with LocalCluster(
n_workers=0, threads_per_worker=1, memory_limit=memory_limit, asynchronous=True
) as cluster:
async with Client(cluster, asynchronous=True) as client:
cluster.adapt(minimum=1, maximum=n_tasks, interval="100 ms")
futures = client.map(_test_fun, range(n_tasks))
assert max(await client.gather(futures)) > 0
34 changes: 15 additions & 19 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6201,31 +6201,27 @@ def adaptive_target(self, comm=None, target_duration=None):
self.total_occupancy / target_duration
) # TODO: threads per worker

# Avoid a few long tasks from asking for many cores
ws: WorkerState
tasks_processing = 0
for ws in self.workers.values():
tasks_processing += len(ws._processing)

if tasks_processing > cpu:
break
else:
cpu = min(tasks_processing, cpu)
Copy link
Contributor Author

@twoertwein twoertwein Jan 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code prevented more worker to spawn for me. It is not needed anymore as the code now limits the number of targeted workers by the number of runnable tasks.


if self.unrunnable and not self.workers:
cpu = max(1, cpu)

# Memory
limit_bytes = {addr: ws._memory_limit for addr, ws in self.workers.items()}
worker_bytes = [ws._nbytes for ws in self.workers.values()]
limit = sum(limit_bytes.values())
total = sum(worker_bytes)
if total > 0.6 * limit:
# add more workers if more than 60% of memory is used
limit = sum(ws.memory_limit for ws in self.workers.values())
used = sum(ws.nbytes for ws in self.workers.values())
memory = 0
if used > 0.6 * limit:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be good to change this to used > 0.6 * limit and limit > 0. With no memory limit set, it tries to always increase the number of workers (that is why I initially thought it was related to memory).

memory = 2 * len(self.workers)
else:
memory = 0

target = max(memory, cpu)

# avoid having more workers than runnable tasks
runnable_states = ("processing", "no-worker")
tasks = 0
for task_group in self.task_groups.values():
tasks += sum(task_group.states[state] for state in runnable_states)
if tasks >= target:
break
target = min(target, tasks)

if target >= len(self.workers):
return target
else: # Scale down?
Expand Down