Skip to content

adaptive_target: no more workers then runnable tasks #4155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

twoertwein
Copy link
Contributor

@twoertwein twoertwein commented Oct 7, 2020

I think that the memory condition in Scheduler.adaptive_target is swapped. If the total memory of all workers is less than half (or for some reason less than 60%) of the memory limit, then you should spawn more workers (and not the other way round)?

If I understand the code correctly, adapt worked only when the memory limit was set to zero.

Fixes dask/dask-jobqueue#463

Edit:
Limit the number of workers by the number of runnable tasks
Fixes #4108

@mrocklin
Copy link
Member

mrocklin commented Oct 7, 2020

limit refers to our target maximal amount of memory. total refers to the amount of memory used currently. I think that the logic was correct before. Are you running into something that caused you to investigate this?

@twoertwein
Copy link
Contributor Author

twoertwein commented Oct 7, 2020

limit refers to our target maximal amount of memory. total refers to the amount of memory used currently.

Doesn't this mean that we want limit > total to create new workers - currently it is the opposite.

Are you running into something that caused you to investigate this?

Yes, the linked dask_jobqueue issue but the same also happens for the LocalCluster (see testcase test_adaptive_memory_limit). The important part is that a non-zero memory limit is set for the workers (it works if it is zero):

import time

from dask.distributed import Client, LocalCluster
from distributed.worker_client import get_worker


def test_fun():
    time.sleep(5)
    return get_worker().name


if __name__ == "__main__":
    # create simple task graph
    n = 4
    graph = {f"test_{i}": (test_fun,) for i in range(n)}
    targets = [f"test_{i}" for i in range(n)]

    workers = 10  # we shouldn't need more than n=4 workers

    # LocalCluster
    kwargs = {
        "n_workers": 1,
        "diagnostics_port": None,
        "threads_per_worker": 1,
        "memory_limit": "2G",
        "processes": True,
    }
    with LocalCluster(**kwargs) as cluster:
        cluster.adapt(minimum=1, maximum=workers)
        with Client(cluster) as client:
            client.register_worker_callbacks(
                lambda dask_worker: print("setup", dask_worker.name)
            )
            print(client.get(graph, targets))  # prints only 0s. If memory_limit=0, more workers are spawned

@twoertwein twoertwein force-pushed the master branch 3 times, most recently from 4472c05 to e186634 Compare October 12, 2020 22:49
@twoertwein
Copy link
Contributor Author

I think the remaining tests would pass if #4108 was fixed.

@twoertwein twoertwein changed the title Fix memory condition in Scheduler.adaptive_target Scheduler.adaptive_target: fix memory condition and do not spawn more workers than runnable tasks Oct 13, 2020
@twoertwein twoertwein force-pushed the master branch 3 times, most recently from 76c9952 to aa9a890 Compare October 13, 2020 03:03
@twoertwein twoertwein force-pushed the master branch 3 times, most recently from 99e9897 to e843342 Compare October 14, 2020 00:02
@twoertwein
Copy link
Contributor Author

@mrocklin if you do not agree with the memory-related part, I can separate the "not spawning more jobs than runnable tasks" into its own PR

@twoertwein twoertwein force-pushed the master branch 2 times, most recently from 02af313 to e215c73 Compare October 16, 2020 05:59
@twoertwein twoertwein force-pushed the master branch 2 times, most recently from 4c5162e to 19af2d1 Compare November 8, 2020 20:13
@twoertwein twoertwein changed the title Scheduler.adaptive_target: fix memory condition and do not spawn more workers than runnable tasks adaptive_target: no more workers then runnable tasks and fix memory condition Nov 8, 2020
@twoertwein twoertwein force-pushed the master branch 3 times, most recently from dc24900 to d0eae6c Compare November 18, 2020 16:39
@twoertwein twoertwein force-pushed the master branch 2 times, most recently from 56563b4 to d86172b Compare January 2, 2021 19:33
@twoertwein
Copy link
Contributor Author

bump @mrocklin @guillaumeeb

Other people seem to encounter the same issue (SLURMCluster not adapting) esi-neuroscience/acme#14

I have been using this patch for more than a month on a SLURM cluster and it addressed my main concern (wasting resources) using dask on a cluster:

  • do not spawn more workers than necessary and
  • down-scale number of workers: I have typically one long'ish task left at the end, no need to block resources with idling workers.

@mrocklin
Copy link
Member

mrocklin commented Jan 3, 2021

limit refers to our target maximal amount of memory. total refers to the amount of memory used currently.

Doesn't this mean that we want limit > total to create new workers - currently it is the opposite.

I don't think so. For example, if I have a cluster with 100TB of memory (limit=1e14) and it stores no data (total=0) then I think that we don't want to scale up.

@twoertwein
Copy link
Contributor Author

thank you @mrocklin for your comments! I think I understand my misconception :)

I will create a separate PR for the number of runnable task (after I switched to TaskGroups) and then spend more time to analyze why Local/SLURMCluster do not spawn more workers in my setup. Based on my current understanding it shouldn't be related to memory at all (must be the CPU part).

if tasks_processing > cpu:
break
else:
cpu = min(tasks_processing, cpu)
Copy link
Contributor Author

@twoertwein twoertwein Jan 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code prevented more worker to spawn for me. It is not needed anymore as the code now limits the number of targeted workers by the number of runnable tasks.

limit = sum(ws.memory_limit for ws in self.workers.values())
used = sum(ws.nbytes for ws in self.workers.values())
memory = 0
if used > 0.6 * limit:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be good to change this to used > 0.6 * limit and limit > 0. With no memory limit set, it tries to always increase the number of workers (that is why I initially thought it was related to memory).

@twoertwein twoertwein changed the title adaptive_target: no more workers then runnable tasks and fix memory condition adaptive_target: no more workers then runnable tasks Jan 3, 2021
@twoertwein
Copy link
Contributor Author

I think that all changes in this PR are not addressing the root cause: self.total_occupancy doesn't seem to get updated well enough (neither for SLURMCluster nor LocalCluster). I would expect that if a task is sleeping for 30s, that total_occupancy will increase and at some point reach 30. By default, the targeted duration is 5s. As soon as the total_occupancy is above 5s, dask should spawn more workers. Unfortunately, total_occupancy doesn't ever go above 2.0 in my test examples even though wall time is much larger.

If all tasks take much longer than spawning a worker, something like the following is an ugly workaround:

with dask.config.set({"distributed.adaptive.target-duration": 0.1}):
    ...

The only reason we don't see this behavior when no memory limit is set (limit = 0), is that the if condition for memory is always True for limit = 0.

Iterating over TaskGroup.status does not seem to contain all tasks. Might just be missing the 'no-worker' cases?

@twoertwein twoertwein closed this Jan 4, 2021
@guillaumeeb
Copy link
Member

For the record, the initial problem was that the graph used for the computation had not enough information to enable adaptive scaling. See dask/dask-jobqueue#463.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants