Description
Dask's remote task execution is very straightforward, using a function not dependent on an external package. However, the most common use case relies on existing installed libraries or project packages. There are two types of dependencies:
- pip/poetry installed packages
- dependencies located in the project source/package
To deliver both dependencies to workers, we do the following:
- We create a custom dask image that contains all required extra pip packages. The primary assumption is that we don't change dependencies often, and project-specific dask images can remain untouched for a while. So, we don't rebuild images often. However, to simplify the process, we use some automation that extracts all required packages with
poetry export -f requirements.txt --output requirements.txt
and builds a docker image remotely using the Kubernetes driver. PipInstall plugin is another way to do it, but it might slow down the cluster starting time till minutes. In our case, it takes less than a minute after image warmup on Kubernetes nodes. - The project source is more dynamic and requires to be uploaded each time we spin up a cluster. We use the existing client.upload_file() function that rely on UploadFile plugin plugin. To clarify, we keep running the cluster only during Python script execution and tear it down when the script finishes.
While we successfully solved the delivery of extra dependencies to remote worker nodes, this requires a deep understanding of Dask cluster deployment and extra helper functions that do not come with Dask out of the box. I propose improving the Developer's Experience in this direction. I would focus on local source delivery on worker nodes first. To be more specific:
- Creating a new function
upload_package(module: ModuleType)
as a complimentary function for existing upload_file(path). - egg file automated creation by a new function
upload_package()
. - Possibility to update existing
venv
packages like Dask-specific modules on remove worker nodes that should simplify the debug process. In the scope of #11160 investigation, I already proved that is possible (please see Can not process datasets created by the older version of Dask dask#11160 (comment))
We already have a working prototype of the Worker/Scheduler plugin that performs all the above described. If there is a demand for such a plugin, we look forward to contributing our source. Any comments and suggestions are very welcome 🤗
Here are some usage examples:
Project source uploading to all workers:
import my_project_source
cluster = KubeCluster()
client = cluster.get_client()
# Upload the entire project source to all worker nodes in a very convenient way
clients.register_plugin(UploadModule(my_project_source))
# It will be even more convenient with a new client function
client.upload_package(my_project_source)
We can replace part of the Dask source on all worker nodes for debugging purposes:
from dask.dataframe import backends
client.upload_package(backends)
Here is an example of an adjusted function:
dask/dask#11160 (comment)