Description
Describe the issue:
While using client.map
with async functions, I noticed a different behavior when I tried to pass in keyword arguments to the function. In this case, the function doesn't seem to run, but instead the coroutine object returned by calling the async function is returned.
It does work as expected if I don't pass in any keyword arguments, or use a normal (non-async) function.
The differing results, containing coroutine objects instead of values, lead to various errors in code that tries to use the results. The example below shows some TypeError: cannot pickle 'coroutine' object
errors, but I think they are only a consequence of the unexpected results from mapping the async functions with keyword arguments.
The differing results can be observed in the output of the print_data
function below. Without keyword arguments, or using a normal function, everything works, and the output looks something like:
DATA OUTPUT: 3+True
DATA OUTPUT: 1+True
DATA OUTPUT: 2+True
['1+True', '2+True', '3+True']
DATA OUTPUT: 3@False
DATA OUTPUT: 1@False
DATA OUTPUT: 2@False
['1@False', '2@False', '3@False']
(values with +
are from the normal/sync function, values with @
are from the async function)
However, mapping the async function with keyword argument some_setting=True
leads to a different output, before printing lots of other errors and tracebacks:
DATA OUTPUT: 1+True
DATA OUTPUT: 3+True
DATA OUTPUT: 2+True
DATA OUTPUT: <coroutine object async_computation at 0x00000128C84C2810>
DATA OUTPUT: <coroutine object async_computation at 0x0000022F03209080>
DATA OUTPUT: <coroutine object async_computation at 0x000001A97403D080>
['1+True', '2+True', '3+True']
The difference is only whether the keyword argument some_setting=True
is passed, or not (see commented line in the code below).
Is this behavior expected? I found it quite surprising and hard to track down in a larger code base.
(Just for completeness, await
ing the coroutine objects results in the correct values, but it seems weird that passing keyword arguments requires a different usage of the result)
Minimal Complete Verifiable Example:
import asyncio
from dask.distributed import Client
def sync_computation(index, some_setting=False):
return f"{index}+{some_setting}"
async def async_computation(index, some_setting=False):
await asyncio.sleep(1)
return f"{index}@{some_setting}"
def print_data(data):
print("DATA OUTPUT:", data)
return data
def schedule_and_run_computations(client):
all_ids = [1, 2, 3]
sync_computed_data = client.map(sync_computation, all_ids, some_setting=True)
## ERROR - does not work with `some_setting=True`:
async_computed_data = client.map(async_computation, all_ids, some_setting=True)
## but it does work without kwargs:
# async_computed_data = client.map(async_computation, all_ids)
sync_data = client.map(print_data, sync_computed_data)
async_data = client.map(print_data, async_computed_data)
print(client.gather(sync_data))
print(client.gather(async_data))
if __name__ == "__main__":
client = Client()
schedule_and_run_computations(client)
Environment:
- Dask version: 2023.10.0 (same for distributed)
- Python version: 3.11.6
- Operating System: Windows 10
- Install method (conda, pip, source): pip