Description
Executive summary
A worker may end up being completely unused for the best part of a computation if it was briefly paused at some point in its early stages.
Work stealing does not correct unbalances caused by a worker pausing and then later unpausing.
It is unwise to immediately exclude a worker from scheduling heuristics as soon as it pauses.
Expected (naive) behaviour
When a worker reaches 85% process memory usage, it is paused.
Its memory bar becomes red in the GUI.
It continues computing any currently running tasks, but doesn't start any new ones that are in the worker-side queue.
It is excluded from task assignment from the scheduler - which means new tasks as well as queued rootish tasks.
As soon as process memory falls back below 85% (without any hysteresis cycle), it goes back to running state. It starts any tasks that were in ready state in the worker-side queue, and the scheduler starts sending it new tasks again.
Work stealing takes care of rebalancing the workload.
What actually happens
If a worker is paused - even for just a fraction of a second - while a large amount of independent, but not rootish, tasks land on the scheduler, then it will be excluded from scheduling and all tasks will be sent at once to other workers.
Work stealing does not rebalance anything after the worker unpauses.
Use case
test_spilling
in coiled_runtime is as follows:
# 64 GiB; running on a cluster of 5x8 GiB workers
a = da.random.random((92682, 92682))
a = a.persist()
distributed.wait(a)
b = a.sum().persist()
del a
b.compute()
The workflow is divided in three stages:
- (map) A wealth of rootish, independent tasks land on the scheduler at the same time. They produce more data than the cluster can hold in memory, which causes heavy spilling. Because they are rootish, they are sent to the workers at most 3 at a time.
- (map) Once the tasks are all in memory (or on disk), a wealth of non-rootish, independent tasks causes the previous tasks to be unspilled. As soon as each task from step 1 is consumed by step 2, it is released. Because they are non-rootish, they are sent to the workers queues all at once, and subsequently rely on work stealing for optimal balancing.
- (reduce) Step 2 produces trivially sized output chunks, which are recursively aggregated.
In main, this use case never reaches the pause threshold. The reason is that there's modest amounts of unmanaged memory involved, so well before your process memory hits the pause
threshold, your managed memory hits the target
threshold. This in turns blocks the event loop, which effectively puts a hard limit to how many tasks are in managed memory at any given time.
In the video below we can see this effect, as well as the clear split between phase 1 and 2-3:
test_spilling-main-steal-wait.mp4
#4424 however makes it a lot easier (by design) to reach the pause threshold. What the PR does is that a task that transitions from executing to memory will not cause the event loop to block until older tasks are spilled out; if the next task at the top of the ready queue does not have any spilled-out dependencies, it will start immediately while spilling/unspilling happens in a separate thread.
In the video below we can see that tasks in stage 1 very quickly reach the pause threshold. This is because numpy.random.random
produces data faster than the disk write throughput on the Coiled workers (AWS EC2 m6i.large) can consume. The workers then start flickering in and out of paused state, as the disk catches up. This behaviour is good and by design - this is exactly how the pause
system should work, and it means that the worker will start slowing down later, after a higher degree of memory pressure.
However, when phase 2 kicks in, most workers are still spilling in the background and are still paused. While this lasts for just a second or two, that's enough for the scheduler to completely exclude them when scheduling many hundreds of non-rootish tasks. When they later go back to running state, work stealing does not rebalance the queue, causing a dramatic degradation in end-to-end runtime.
test_spilling-async-steal-nowait.mp4
I tried adding a brief pause on the client side between phase 1 and phase 2, waiting for all workers to unpause, and that caused the problem to disappear, achieving perfect work balance:
test_spilling-async-steal-wait.mp4
Proposed actions
First of all, we need to figure out why work stealing is not kicking in. This is clearly a bug.
Second, however, there is a consensus among developers that work stealing should be avoided if possible. To this extent, I propose to introduce a new worker state, pausing
. A worker enters pausing
state when it passes the pause
threshold.
After a quite generous amount of time - e.g. 10s - the worker transitions to paused
if it didn't fall below the pause
threshold. This resets the timer.
From the worker's point of view, pausing
is the same as paused: tasks will be left in the ready
queued, and network transfers are trottled.
From the scheduler's perspective, pausing
is the same as running
: it will not be excluded from scheduling heuristics and it will remain in the running
set.