Open
Description
When calling dask.compute
within a task, it is ignoring the default or current Client
mechanism and always seems to use a Client
that is managed by the Worker
from distributed import Client, LocalCluster, worker_client
from dask.base import get_scheduler
def launch_cluster(id):
with LocalCluster(n_workers=1, processes=False, dashboard_address=":0") as cluster:
cluster_client = cluster.get_client()
# Even if one instantiates the client, it is not picked
# cluster_client = Client(cluster, set_as_default=True)
# It will not even pick the correct client if used with the `as_current` contextmanager
# with cluster_client.as_current():
compute_default = get_scheduler()
assert compute_default is cluster_client.get, f"Dask compute selects wrong client. Cluster client {cluster_client} compute client {compute_default}"
with Client(n_workers=1, threads_per_worker=1) as c:
f = c.submit(launch_cluster, 0)
f.result()
dask.base.get_scheduler
is calling into worker.get_client
which always prefers the worker client mechanism if no address is provided.
This behavior results in the default client to be rejected, the current client not even considered such that always a new client is instantiated that connects to the scheduler the worker is connected to.
The expected behavior is obviously for dask.compute/get_scheduler to automatically pick the correct Client that is connected to the LocalCluster