Skip to content

Broadcast-like operations are poorly scheduled (widely-shared dependencies) #6570

Open
@gjoseph92

Description

@gjoseph92

Graphs like this are not currently scheduled well:

. . . . . . . .   . . . . . . . .
|\|\|\|\|/|/|/|   |\|\|\|\|/|/|/|
| | | | a | | |   | | | | b | | |
* * * * * * * *   * * * * * * * *

The . tasks should definitely take into account the location of the * data when scheduling. But if we have 5 workers, every worker will have * data on it, but only 2 workers will have an a or b. In scheduling the first few .s, there's a tug-of-war between the a and the *—which do we want to schedule near? We want a way to disregard the a.

Say (*, 0) completes first, and a is already complete, on a different worker. Each * is the same size (or smaller than) a. We now schedule (., 0). If we choose to go to a, we might have a short-term gain, but we've taken a spot that could have gone to better use in the near future. Say the worker holding a is already running (*, 6). Now, (., 6) may get scheduled on yet another worker, because (., 0) is already running where it should have gone, and the scheduler prioritizes "where can I start this task soonest" over "how can I minimize data transfer".

This can cascade through all the .s, until we've transferred most root tasks to different workers (on top of a, which we have to transfer everywhere no matter what).

What could have been a nearly-zero-transfer operation is instead likely to transfer every piece of input data to a different worker, greatly increasing memory usage.

This pattern will occur anytime you broadcast one thing against another in a binary operation (which can occur in arrays, dataframes, bags, etc.).

import dask.array as da
a = da.random.random(100, chunks=10)
x = da.random.random(1)
r = (a[1:] * x)  # `[1:]` slicing prevents blockwise fusion
r.visualize(optimize_graph=True, collapse_outputs=True)

mydask

In the above case, the mul tasks will tend to "dogpile" onto the one worker that holds the middle random_sample task (x).

@crusaderky has also observed cases where this "dogpile" effect can cause what should be an embarrassingly-parallell operation to all get scheduled on one worker, overwhelming it.

#5325 was a heuristic attempt to fix this, but there are probably better ways to approach it.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions