Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ repos:
- tornado
- pyarrow
- urllib3
- git+https://github.com/dask/dask
- git+https://github.com/phofl/dask.git@io-tasks
# - git+https://github.com/dask/dask
- git+https://github.com/dask/zict

# Increase this value to clear the cache on GitHub actions if nothing else in this file
Expand Down
3 changes: 2 additions & 1 deletion continuous_integration/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ dependencies:
# Temporary fix for https://github.com/jupyterlab/jupyterlab/issues/17012
- httpx<0.28.0
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/phofl/dask.git@io-tasks
# - git+https://github.com/dask/dask
- git+https://github.com/dask/dask-expr
- git+https://github.com/dask/zict
- git+https://github.com/dask/crick # Only tested here
Expand Down
3 changes: 2 additions & 1 deletion continuous_integration/environment-3.11.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ dependencies:
# Temporary fix for https://github.com/jupyterlab/jupyterlab/issues/17012
- httpx<0.28.0
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/phofl/dask.git@io-tasks
# - git+https://github.com/dask/dask
- git+https://github.com/dask/dask-expr
- git+https://github.com/dask/zict
# Revert after https://github.com/dask/distributed/issues/8614 is fixed
Expand Down
3 changes: 2 additions & 1 deletion continuous_integration/environment-3.12.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ dependencies:
# Temporary fix for https://github.com/jupyterlab/jupyterlab/issues/17012
- httpx<0.28.0
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/phofl/dask.git@io-tasks
# - git+https://github.com/dask/dask
- git+https://github.com/dask/dask-expr
- git+https://github.com/dask/zict
# Revert after https://github.com/dask/distributed/issues/8614 is fixed
Expand Down
3 changes: 2 additions & 1 deletion continuous_integration/environment-3.13.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ dependencies:
# Temporary fix for https://github.com/jupyterlab/jupyterlab/issues/17012
- httpx<0.28.0
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/phofl/dask.git@io-tasks
# - git+https://github.com/dask/dask
- git+https://github.com/dask/dask-expr
- git+https://github.com/dask/zict
# Revert after https://github.com/dask/distributed/issues/8614 is fixed
Expand Down
3 changes: 2 additions & 1 deletion continuous_integration/environment-mindeps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ dependencies:
# Distributed depends on the latest version of Dask
- pip
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/phofl/dask.git@io-tasks
# - git+https://github.com/dask/dask
# test dependencies
- pytest
- pytest-cov
Expand Down
4 changes: 4 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3083,6 +3083,10 @@ def is_rootish(self, ts: TaskState) -> bool:
and have few or no dependencies. Tasks may also be explicitly marked as rootish
to override this heuristic.
"""
# Check explicitly marked data producer tasks
if ts.run_spec and ts.run_spec.data_producer_task:
return True

if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions:
return False
tg = ts.group
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/_rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ def partial_concatenate(
)
rec_cat_arg[old_partial_index] = t.ref()
else:
rec_cat_arg[old_partial_index] = TaskRef((input_name,) + old_global_index) # type: ignore[call-overload]
rec_cat_arg[old_partial_index] = TaskRef((input_name,) + old_global_index)

concat_task = Task(
(rechunk_name(token),) + global_new_index,
Expand Down
38 changes: 38 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import dask
from dask import bag, delayed
from dask.base import DaskMethodsMixin
from dask.core import flatten
from dask.highlevelgraph import HighLevelGraph, MaterializedLayer
from dask.utils import parse_timedelta, tmpfile, typename
Expand Down Expand Up @@ -5317,3 +5318,40 @@ async def test_alias_resolving_break_queuing(c, s, a):
while not s.tasks:
await asyncio.sleep(0.01)
assert sum([s.is_rootish(v) for v in s.tasks.values()]) == 18


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_data_producer_tasks(c, s, a):
from dask._task_spec import DataNode, Task, TaskRef

def func(*args):
return 100

class MyArray(DaskMethodsMixin):
def __dask_graph__(self):
return {
"a": DataNode("a", 10),
"b": Task("b", func, TaskRef("a"), data_producer_task=True),
"c": Task("c", func, TaskRef("b")),
"d": Task("d", func, TaskRef("c")),
}

def __dask_keys__(self):
return ["d"]

def __dask_postcompute__(self):
return func, ()

arr = MyArray()
x = c.compute(arr)
while not s.tasks:
await asyncio.sleep(0.01)
assert (
sum(
[
s.is_rootish(v) and v.run_spec.data_producer_task
for v in s.tasks.values()
]
)
== 2
)
Loading