Skip to content

Support external tasks #8199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open

Conversation

GueroudjiAmal
Copy link

@GueroudjiAmal GueroudjiAmal commented Sep 20, 2023

Closes #8070

  • Tests added / passed
  • Passes pre-commit run --all-files

@GPUtester
Copy link
Collaborator

Can one of the admins verify this patch?

Admins can comment ok to test to allow this one PR to run or add to allowlist to allow all future PRs from the same author to run.

@GueroudjiAmal
Copy link
Author

GueroudjiAmal commented Sep 20, 2023

Here's a working external task.
Mostly if we scatter data with external bool to True.
We will make sure to trigger the transitions on the scheduler.

Motivation:

This helps to submit tasks to Dask that will be run externally, but still, we know them and can submit task graphs that use them.

Usage:

  • having a Client that will create and submit a graph depending on those external tasks, the keys are known and will be used when we scatter data.
  • having another Client that will scatter data with the same "key".
  • When the scatter is done the worker will handle the task differently. we have new external_task_events, transitions and so,
  • The work will inform the scheduler about this task transiting from "external" status to "memory"
  • The scheduler handles it as any finished task, thus it will trigger the transition process depending on it.
  • The client that desires it, or has depending tasks on it will be notified when needed.

Use cases:

Coupling Dask to any running producer of data.
exp: a running MPI simulation that produces data incrementally.

@GueroudjiAmal GueroudjiAmal marked this pull request as draft September 20, 2023 22:03
@github-actions
Copy link
Contributor

github-actions bot commented Sep 20, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       27 files  ±    0         27 suites  ±0   10h 51m 54s ⏱️ - 53m 8s
  3 952 tests +  18    3 776 ✔️  -   47     109 💤  - 1    67 +  66 
49 709 runs  +221  46 659 ✔️  - 538  2 289 💤  - 1  761 +760 

For more details on these failures, see this check.

Results for commit 9b7ce18. ± Comparison against base commit 2f04dcb.

This pull request removes 4 and adds 22 tests. Note that renamed tests count towards both.
distributed.diagnostics.tests.test_scheduler_plugin ‑ test_register_plugin_pickle_disabled
distributed.shuffle.tests.test_rechunk ‑ test_rechunk_with_single_output_chunk_raises
distributed.tests.test_scheduler ‑ test_run_on_scheduler_disabled
distributed.tests.test_worker ‑ test_gpu_executor
distributed.dashboard.tests.test_scheduler_bokeh ‑ test_FinePerformanceMetrics_shuffle
distributed.deploy.tests.test_subprocess ‑ test_subprocess_cluster_does_not_depend_on_logging
distributed.diagnostics.tests.test_cudf_diagnostics
distributed.shuffle.tests.test_merge ‑ test_merge_does_not_deadlock_if_worker_joins
distributed.shuffle.tests.test_metrics
distributed.shuffle.tests.test_metrics ‑ test_dataframe
distributed.shuffle.tests.test_metrics ‑ test_rechunk
distributed.shuffle.tests.test_rechunk ‑ test_cull_p2p_rechunk_independent_partitions
distributed.shuffle.tests.test_rechunk ‑ test_cull_p2p_rechunk_overlapping_partitions
distributed.shuffle.tests.test_rechunk ‑ test_partial_rechunk_homogeneous_distribution
…

♻️ This comment has been updated with latest results.

@GueroudjiAmal GueroudjiAmal changed the title External Support external tasks Sep 21, 2023
@GueroudjiAmal
Copy link
Author

Example:

We can test this with 2 clients or 1 Client and 1 Bridge (a class that allows connecting an
external code to Dask workers without using clients or going through the scheduler for
performance and scalability reasons)

Dask_client.py

from distributed import Client, Future
from dask import delayed
import dask.array as da

c = Client(tcp://...)
# We create a future that is related to data that will be generated by an external source
external_future = distributed.Future("external_key", external=True,  inform=True) 
# We describe the data that will be associated with this future, here it is dask.array
External_dask_array = da.from_delayed(delayed(external_future), shape=(10,10), dtype=float)
"""
 We submit a task graph that uses this external data.
Internally this will create an *external* task rather than a released one. This will prevent the 
scheduler from sending this empty task to the workers.
These tasks will be switched to the in-memory state when the external application 
scatters it to a worker.
The worker will inform the scheduler about this new state.
"""
External_sum = External_dask_array.sum()
# Here the client is blocked until the scatter is performed and the result is available.
res = client1.compute(External_sum).result()
print(res)

We can use the client class to send data to the workers, here is the code:

External_client.py

from distributed import Client
import numpy as np

# Connect a client to the same scheduler as the previous one
external_client = Client("tcp://...")
"""
Now we send an array to a connected worker to that scheduler.
- We need the same key
- direct has to be activated, because we don't want to go through the scheduler, 
- We have to provide a list of workers
- And make sure to activate the external boolean.
"""
external_future = external_client.scatter(np.ones((10,10)),  keys=["external_key"],
                                          direct=True,   workers=["tcp://127.0.0.1:33251"],
                                          external=True)

# Now the first Client is unblocked and should get the result.

Another possibility is to use the dedicated Bridge class, here is the code:

External_bridge.py

from distributed import Bridge
import numpy as np

# Connect a bridge directly to the associated worker
external_bridge = Bridge(workers=["tcp:\\.."])
"""
Now we send an array to a connected worker to that scheduler.
- We need the same key
- And all the other params of the default scatter are retrieved by default
"""
external_future = external_bridge.scatter(np.ones((10,10)),  keys=["external_key"])

# Now the first Client is unblocked and should get the result.

@GueroudjiAmal
Copy link
Author

Here is our paper showing a typical use case of external tasks in HPC/ML workflows: Dask-Extended External Tasks for HPC/ML In transit Workflows

@GueroudjiAmal GueroudjiAmal marked this pull request as ready for review November 27, 2023 17:21
@jbigot
Copy link

jbigot commented Dec 12, 2023

Adding such a feature would be of extreme interest to us at CEA. We would really love to be able to use Dask to process data produced by numerical simulations and as demonstrated in @GueroudjiAmal 's paper this feature is of critical importance to get this use case efficiently working.

Please let us know what we could do to help get this in Dask. We can help with testing, working together to improve API and whatnot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make Dask support External tasks
3 participants