Skip to content

Who's responsible for owning futures in dask.dataframe.from_delayed? #9041

@TomAugspurger

Description

@TomAugspurger

Describe the issue:

Over in rapidsai/dask-upstream-testing#40 (comment), I'm debugging an issue in cuml that cropped up between dask 2025.2.0 and main.

Minimal Complete Verifiable Example:

The following ran with dask / distributed 2025.2.0: uv run --with ipython --with dask[dataframe]==2025.2.0 --with distributed==2025.2.0 --with pyarrow ipython

from distributed import Client
import pandas as pd
import dask.dataframe as dd

client = Client(n_workers=2)

def gen(i):
    return pd.DataFrame({"A": [i]}, index=[i])

futures = [client.submit(gen, i) for i in range(3)]

meta = gen(0)[:0]
df = dd.from_delayed(futures, meta)

df.compute()

import gc

del futures
gc.collect()

df.compute()

With dask / distributed main it fails:

❯ uv run --with ipython  --with "dask[complete] @ git+https://github.com/dask/dask" --with "distributed @ git+https://github.com/dask/distributed" --with pyarrow ipython
 Updated https://github.com/dask/distributed (6691e27d)
 Updated https://github.com/dask/dask (0fa5e18d5)

...

---------------------------------------------------------------------------
FutureCancelledError                      Traceback (most recent call last)
Cell In[1], line 22
     19 del futures
     20 gc.collect()
---> 22 df.compute()

File ~/.cache/uv/archive-v0/n2LAuIxOxeQzXo5nnHTn7/lib/python3.13/site-packages/dask/base.py:373, in DaskMethodsMixin.compute(self, **kwargs)
    349 def compute(self, **kwargs):
    350     """Compute this dask collection
    351
    352     This turns a lazy Dask collection into its in-memory equivalent.
   (...)    371     dask.compute
    372     """
--> 373     (result,) = compute(self, traverse=False, **kwargs)
    374     return result

File ~/.cache/uv/archive-v0/n2LAuIxOxeQzXo5nnHTn7/lib/python3.13/site-packages/dask/base.py:681, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    678     expr = expr.optimize()
    679     keys = list(flatten(expr.__dask_keys__()))
--> 681     results = schedule(expr, keys, **kwargs)
    683 return repack(results)

File ~/.cache/uv/archive-v0/n2LAuIxOxeQzXo5nnHTn7/lib/python3.13/site-packages/distributed/client.py:2395, in Client._gather(self, futures, errors, direct, local_worker)
   2393     exception = st.exception
   2394     traceback = st.traceback
-> 2395     raise exception.with_traceback(traceback)
   2396 if errors == "skip":
   2397     bad_keys.add(key)

FutureCancelledError: ('repartitiontofewer-1204992dafb94ff37742a003f8f0410b', 0) cancelled for reason: lost dependencies.

I'm not quite sure what's going on, but it seems like something changed such that from_delayed no longer captures a reference to the Future object, and so when it's GC'd the task graph is no longer valid. I believe this is related to #8998, but I think they might be different enough to warrant separate issues.

As for what to do, I'm not sure... https://github.com/dask/dask/blob/0fa5e18d511c49f1a9cd5f98c675a9f6cd2fc02f/dask/dataframe/dask_expr/io/_delayed.py#L103-L108 explicitly calls out distributed.Future objects as being supported. So maybe this should continue to work. Alternatively, we can point people to dd.from_graph.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions