Skip to content

Commit 920f8a9

Browse files
authored
fix: use threads scheduler in da.store (#2183)
1 parent 52344db commit 920f8a9

3 files changed

Lines changed: 3 additions & 25 deletions

File tree

docs/release-notes/2183.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Unblock version restriction on `dask` distributed writing by using threading scheduler always (see {pr}`2172`) {user}`ilan-gold`

src/anndata/_io/specs/methods.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -506,26 +506,16 @@ def write_basic_dask_dask_dense(
506506
dataset_kwargs: Mapping[str, Any] = MappingProxyType({}),
507507
):
508508
import dask.array as da
509-
import dask.config as dc
510-
511-
is_distributed = dc.get("scheduler", None) == "dask.distributed"
512-
is_h5 = isinstance(f, H5Group)
513-
if is_distributed and is_h5:
514-
msg = "Cannot write dask arrays to hdf5 when using distributed scheduler"
515-
raise ValueError(msg)
516509

517510
dataset_kwargs = dataset_kwargs.copy()
511+
is_h5 = isinstance(f, H5Group)
518512
if not is_h5:
519513
dataset_kwargs = zarr_v3_compressor_compat(dataset_kwargs)
520-
# See https://github.com/dask/dask/issues/12109
521-
if Version(version("dask")) < Version("2025.4.0") and is_distributed:
522-
msg = "Writing dense data with a distributed scheduler to zarr could produce corrupted data with a Lock and will error without one when dask is older than 2025.4.0: https://github.com/dask/dask/issues/12109"
523-
raise RuntimeError(msg)
524514
if is_zarr_v2() or is_h5:
525515
g = f.require_dataset(k, shape=elem.shape, dtype=elem.dtype, **dataset_kwargs)
526516
else:
527517
g = f.require_array(k, shape=elem.shape, dtype=elem.dtype, **dataset_kwargs)
528-
da.store(elem, g)
518+
da.store(elem, g, scheduler="threads")
529519

530520

531521
@_REGISTRY.register_read(H5Array, IOSpec("array", "0.2.0"))

tests/test_dask.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,12 @@
44

55
from __future__ import annotations
66

7-
from importlib.metadata import version
87
from pathlib import Path
98
from typing import TYPE_CHECKING
109

1110
import numpy as np
1211
import pandas as pd
1312
import pytest
14-
from packaging.version import Version
1513

1614
import anndata as ad
1715
from anndata._core.anndata import AnnData
@@ -135,17 +133,6 @@ def test_dask_distributed_write(
135133
adata.obsm["b"] = da.random.random((M, 10))
136134
adata.varm["a"] = da.random.random((N, 10))
137135
orig = adata
138-
is_h5 = diskfmt == "h5ad"
139-
is_corrupted_dask = Version(version("dask")) < Version("2025.4.0")
140-
if is_corrupted_dask or is_h5:
141-
with pytest.raises(
142-
ValueError if is_h5 else RuntimeError,
143-
match=r"Cannot write dask arrays to hdf5"
144-
if is_h5
145-
else r"Writing dense data with a distributed scheduler to zarr",
146-
):
147-
ad.io.write_elem(g, "", orig)
148-
return
149136
ad.io.write_elem(g, "", orig)
150137
# TODO: See https://github.com/zarr-developers/zarr-python/issues/2716
151138
g = as_group(pth, mode="r")

0 commit comments

Comments
 (0)