Description
For a while I've been playing around with this prototype repo which implements Dask Runners for HPC systems. I'm motivated to reduce the fragmentation and confusion around tooling in the Dask HPC community, so perhaps this new code should live here.
In dask/community#346 I wrote up the difference between Dask Clusters and Dask Runners. The TL;DR is that a Cluster creates the scheduler and worker tasks directly, for example dask_jobqueue.SLURMCluster
submits jobs to SLURM for each worker. A Dask Runner is different because it is invoked from within an existing allocation and populates that job with Dask processes. This the same as how Dask MPI works.
SlurmRunner
Example
If I write a Python script and call it with srun -n 6 python myscript.py
the script will be invoked by Slurm 6 times in parallel on 6 different nodes/cores on the HPC. The Dask Runner class then uses the Slurm process ID environment variable to decide what role reach process should play and uses the shared filesystem to bootstrap communications with a scheduler file.
# myscript.py
from dask.distributed import Client
from dask_hpc_runner import SlurmRunner
# When entering the SlurmRunner context manager processes will decide if they should be
# the client, schdeduler or a worker.
# Only process ID 1 executes the contents of the context manager.
# All other processes start the Dask components and then block here forever.
with SlurmRunner(scheduler_file="/path/to/shared/filesystem/scheduler-{job_id}.json") as runner:
# The runner object contains the scheduler address info and can be used to construct a client.
with Client(runner) as client:
# Wait for all the workers to be ready before continuing.
client.wait_for_workers(runner.n_workers)
# Then we can submit some work to the Dask scheduler.
assert client.submit(lambda x: x + 1, 10).result() == 11
assert client.submit(lambda x: x + 1, 20, workers=2).result() == 21
# When process ID 1 exits the SlurmRunner context manager it sends a graceful shutdown to the Dask processes.
Should this live in dask-jobqueue
?
I'm at the point of trying to decide where this code should live within the Dask ecosystem. So far I have implemented MPIRunner
and SlurmRunner
as a proof-of-concept. It would be very straight forward to write runners for other batch systems provided it is possible to detect the process ID/rank from the environment.
I can imagine users choosing between SLURMCluster
and SlurmRunner
depending on their use case and how they want to deploy Dask. There are pros/cons to each deployment model, for example the cluster can adaptively scale, but the runner only requires a single job submission which will guarantee better node locality. So perhaps it makes sense for SlurmRunner
to live here in dask-jobqueue
and we can use documentation to help users choose the right one for them? (We can make the name casing more consistent).
The MPIRunner
and SlurmRunner
share a common base class, so I'm not sure if that means MPIRunner
should also live here, or whether we should accept some code duplication and put it in dask-mpi
?
Alternatively my prototype repo could just move to dask-contrib
and become a separate project?
Or we could roll all of dask-jobqueue
, dask-mpi
and the new dask-hpc-runners
into a single dask-hpc
package? Or pull everything into dask-jobqueue
?
The Dask HPC tooling is currently very fragmented and I'm keen to make things better, not worse. But I'm very keen to hear opinions from folks like @guillaumeeb @lesteve @jrbourbeau @kmpaul @mrocklin on what we should do here.