Description
The documentation in the Specify Workers in Compute/Persist section in the Data Locality article seems to indicate that the workers
keyword in Client.compute()
accepts a dictionary.
What happened:
In practice, the workers
keyword does not accept a dictionary.
What you expected to happen:
Expected that the workers
keyword would accept the dictionary AND the tasks would execute on the workers specified by the dictionary.
Minimal Complete Verifiable Example:
import dask
dask.__version__
OUT:
'2021.11.2'
from dask.distributed import Client
client = Client()
worker_addresses = [x for x in client.cluster.scheduler_info['workers']]
worker_addresses
OUT:
['tcp://127.0.0.1:35297',
'tcp://127.0.0.1:36365',
'tcp://127.0.0.1:41759',
'tcp://127.0.0.1:38571']
from time import sleep
def add(x, y):
sleep(1)
return x + y
def subtract(x, y):
sleep(1)
return x - y
def multiply(x, y):
sleep(2)
return x * y
from dask import delayed
x = 12
y = 8
add_result = delayed(add)(x, y)
subtract_result = delayed(subtract)(x, y)
multiply_result = delayed(multiply)(add_result, subtract_result)
multiply_result.visualize()
OUT:
futures = client.compute(multiply_result, workers={multiply_result: worker_addresses[0], add_result: worker_addresses[1]})
futures
OUT:
Future: multiply
status:
pending,
type: NoneType,
key: multiply-4c43bba2-8553-4a09-a7af-63d78db6cf05
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/protocol/core.py", line 76, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
distributed.comm.utils - ERROR - can not serialize 'Delayed' object
Traceback (most recent call last):
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/comm/utils.py", line 33, in _to_frames
return list(protocol.dumps(msg, **kwargs))
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/protocol/core.py", line 76, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/batched.py", line 93, in _background_send
nbytes = yield self.comm.write(
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/comm/tcp.py", line 250, in write
frames = await to_frames(
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/comm/utils.py", line 50, in to_frames
return _to_frames()
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/comm/utils.py", line 33, in _to_frames
return list(protocol.dumps(msg, **kwargs))
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/protocol/core.py", line 76, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
futures = client.compute(multiply_result, workers={'multiply_result': worker_addresses[0], 'add_result': worker_addresses[1]})
futures
OUT:
Future: multiply
status:
cancelled,
type: NoneType,
key: multiply-4c43bba2-8553-4a09-a7af-63d78db6cf05
distributed.core - ERROR - unhashable type: 'dict'
Traceback (most recent call last):
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/core.py", line 596, in handle_stream
handler(**merge(extra, msg))
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 4528, in update_graph_hlg
return self.update_graph(
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 4765, in update_graph
w = self.coerce_address(w)
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 7353, in coerce_address
if addr in parent._aliases:
TypeError: unhashable type: 'dict'
distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/core.py", line 530, in handle_comm
result = await result
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 5359, in add_client
await self.handle_stream(comm=comm, extra={"client": client})
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/core.py", line 596, in handle_stream
handler(**merge(extra, msg))
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 4528, in update_graph_hlg
return self.update_graph(
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 4765, in update_graph
w = self.coerce_address(w)
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 7353, in coerce_address
if addr in parent._aliases:
TypeError: unhashable type: 'dict'
tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7f49d0c5a790>, <Task finished name='Task-101' coro=<BaseTCPListener._handle_stream() done, defined at /home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/comm/tcp.py:502> exception=TypeError("unhashable type: 'dict'")>)
Traceback (most recent call last):
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
ret = callback()
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/tornado/tcpserver.py", line 331, in <lambda>
gen.convert_yielded(future), lambda f: f.result()
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/comm/tcp.py", line 519, in _handle_stream
await self.comm_handler(comm)
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/core.py", line 530, in handle_comm
result = await result
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 5359, in add_client
await self.handle_stream(comm=comm, extra={"client": client})
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/core.py", line 596, in handle_stream
handler(**merge(extra, msg))
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 4528, in update_graph_hlg
return self.update_graph(
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 4765, in update_graph
w = self.coerce_address(w)
File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 7353, in coerce_address
if addr in parent._aliases:
TypeError: unhashable type: 'dict'
futures = client.compute(multiply_result, workers=[worker_addresses[0]])
futures
OUT:
Future: multiply
status:
pending,
type: NoneType,
key: multiply-4c43bba2-8553-4a09-a7af-63d78db6cf05
Anything else we need to know?:
Apart from fixing the documentation, is there an alternative way of calling the compute method and specifying that the intermediate tasks be executed on a specific worker?
Environment:
- Dask version: 2021.11.2
- Python version: 3.8.10
- Operating System: Linux (ubuntu 20.04)
- Install method (conda, pip, source): pip