Description
Hi there, we ran into an issue in dask-jobqueue that is explained here: dask/dask-jobqueue#498.
To sum up: when we use adaptive mode with a Cluster
object starting several worker processes in each Job
, and with a minimum number of workers to 0, adaptive goes into a endless loop starting and stopping jobs before the workers ever connect to the Scheduler
once tasks are submitted.
I'm thinking this bug is also present with other SpecCluster
implementation which allow for "grouped" workers in one ProcessInterface
.
This can be reproduced (without any job queuing system, but with dask-jobqueue) with the following snippet:
import time
from dask import delayed
from dask.distributed import Client, progress, LocalCluster
from dask_jobqueue.local import LocalCluster
import numpy as np
@delayed
def job(x):
time.sleep(1)
return x+1
cluster = LocalCluster(
cores=2,
processes=2,
name='multi-worker',
memory="2GiB",
walltime='1:00:00'
)
client = Client(cluster)
cluster.adapt(maximum_jobs=6, interval='100ms', wait_count=1)#Small interval and wait_count to simulate some queuing system startup time
njobs = 1000
outputs = []
for i in range(njobs):
output = job(i)
outputs.append(output)
results = client.persist(outputs)
print("Running test...")
progress(results)
I've narrowed the problem to two places:
- https://github.com/dask/distributed/blob/main/distributed/scheduler.py#L7562 : the Scheduler always return a target of number of workers to 1 if it has no connected workers. I'm not sure if this is useful to limit the target to 1 in this case?
- https://github.com/dask/distributed/blob/main/distributed/deploy/adaptive_core.py#L194 : In the recommendations method, it will first ask to scale up (target of 1 is greater than 0 worker), but right after, it will ask to scale down (because we are starting more than 1 worker in one job, greater than target of 1), which will end in stopping the job that has not yet started and all the incoming workers (see https://github.com/dask/distributed/blob/main/distributed/deploy/adaptive.py#L202). And then, just loop.
I guess for some simple fix, we could either just modify the code in one or both places. We could suppress the target to 1 if no workers (but this is probably here for a reason), or we could prevent stopping not connected workers if no workers have yet arrived. These are just suggestions, I'm really curious of what any other propositions you might have.
Environment:
- Dask version: 2022.9.0
- Python version: 3.9
- Operating System: CentOS 7
- Install method (conda, pip, source): conda