Skip to content

Undulating cluster auto-scaling #5931

Open
@orf

Description

@orf

What happened:

After nearing the end of a simple yet long running job with ~1200 workers at peak, the scheduler auto-scaler seems to become confused and start launching and destroying large numbers of workers.

What's concerning about this is that new workers joining the cluster seems to be expensive: the scheduler CPU spikes, and large numbers of concurrent workers connecting at the same time has caused us instability in the past. So this seems like a perfect storm: A long running, expensive job starts to end, then the scheduler essentially starts trying to DDOS itself by launching and destroying hundreds of workers every minute or so.

The lower number of workers is probably the correct number, given the number of tasks remaining:

Screenshot 2022-03-10 at 23 59 52

But the scheduler will spin up another ~400 workers that sit idle, before tearing them down again:

Screen.Recording.2022-03-10.at.23.51.33.mov

You can see this on Grafana, even though the numbers are slightly off and the resolution doesn't capture every peak and dip:

image

The task graph isn't complex: It's reading from parquet, repartitioning into small chunks, doing map_partitions, then splitting the results again before writing to parquet:

df = ddf.read_parquet("s3:/...")
df = df.repartition(4_462)
# Runtime for each partition: ~1 hour
df = df.map_partitions(expensive_function)
# Partitions produced are _big_, which can lead to OOM when writing parquet. So split it up again
df = df.repartition(df.npartitions * 3)
df.write_parquet("s3://...", compute=False).compute()

image

Anything else we need to know?:

I'm not able to run a dump state. I know this isn't helpful, but this is a very expensive and long running job and I can't take any chances that it may disrupt the job nearing completion. I can supply raw Dask logs from the scheduler via email or another method. I also can't get a raw client connection to the scheduler without a fair bit of effort.

I can probably run this job once more to collect some specific debug information if you tell me exactly what is needed: for example, would dumping the cluster state at the end of the run be OK? Or does it need to be in the middle of the scheduler exhibiting this issue?

Environment:

  • Dask version: 2022.2.1
  • Python version: 3.9
  • Operating System: Linux
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    adaptiveAll things relating to adaptive scaling

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions