Skip to content

Commit b4c0cfb

Browse files
authored
Backport PR #2172 on branch 0.12.x (fix: remove global lock on zarr dense stores from dask) (#2180)
1 parent e30a8cc commit b4c0cfb

File tree

8 files changed

+50
-35
lines changed

8 files changed

+50
-35
lines changed

.github/workflows/test-cpu.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ jobs:
4343
strategy:
4444
matrix:
4545
env: ${{ fromJSON(needs.get-environments.outputs.envs) }}
46-
io_mark: ["zarr_io", "not zarr_io"]
46+
io_mark: ["zarr_io", "not zarr_io", "dask_distributed"] # dask_distributed should not be run with -n auto as it uses a client with processes
4747
env: # environment variables for use in codecov’s env_vars tagging
4848
ENV_NAME: ${{ matrix.env.name }}
4949
IO_MARK: ${{ matrix.io_mark }}
@@ -72,7 +72,7 @@ jobs:
7272
env:
7373
COVERAGE_PROCESS_START: ${{ github.workspace }}/pyproject.toml
7474
run: |
75-
hatch run ${{ matrix.env.name }}:run-cov -v --color=yes -n auto --junitxml=test-data/test-results.xml -m "${{ matrix.io_mark }}" ${{ matrix.env.args }}
75+
hatch run ${{ matrix.env.name }}:run-cov -v --color=yes ${{ matrix.io_mark != 'dask_distributed' && '-n auto' || '' }} --junitxml=test-data/test-results.xml -m "${{ matrix.io_mark }}" ${{ matrix.env.args }}
7676
hatch run ${{ matrix.env.name }}:cov-combine
7777
hatch run ${{ matrix.env.name }}:coverage xml
7878

docs/release-notes/2172.bug.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{func}`dask.array.store` was producing corrupted data with zarr v3 + distributed scheduler + a lock (which we used internally): see {ref}`dask/dask#12109`. Thus dense arrays were potentially being stored with corrupted data. The solution is to remove the lock for newer versions of dask but without the lock in older versions, it is impossible to store the data. Thus versions of dask older than `2025.4.0` will not be supported for writing dense data. {user}`ilan-gold`

pyproject.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,11 @@ testpaths = [
174174
]
175175
# For some reason this effects how logging is shown when tests are run
176176
xfail_strict = true
177-
markers = [ "gpu: mark test to run on GPU", "zarr_io: mark tests that involve zarr io" ]
177+
markers = [
178+
"gpu: mark test to run on GPU",
179+
"zarr_io: mark tests that involve zarr io",
180+
"dask_distributed: tests that need a distributed client with multiple processes",
181+
]
178182

179183
[tool.ruff]
180184
src = [ "src" ]

src/anndata/_io/specs/methods.py

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -495,31 +495,10 @@ def write_chunked_dense_array_to_group(
495495

496496
@_REGISTRY.register_write(ZarrGroup, views.DaskArrayView, IOSpec("array", "0.2.0"))
497497
@_REGISTRY.register_write(ZarrGroup, DaskArray, IOSpec("array", "0.2.0"))
498-
def write_basic_dask_zarr(
499-
f: ZarrGroup,
500-
k: str,
501-
elem: DaskArray,
502-
*,
503-
_writer: Writer,
504-
dataset_kwargs: Mapping[str, Any] = MappingProxyType({}),
505-
):
506-
import dask.array as da
507-
508-
dataset_kwargs = dataset_kwargs.copy()
509-
dataset_kwargs = zarr_v3_compressor_compat(dataset_kwargs)
510-
if is_zarr_v2():
511-
g = f.require_dataset(k, shape=elem.shape, dtype=elem.dtype, **dataset_kwargs)
512-
else:
513-
g = f.require_array(k, shape=elem.shape, dtype=elem.dtype, **dataset_kwargs)
514-
da.store(elem, g, lock=GLOBAL_LOCK)
515-
516-
517-
# Adding this separately because h5py isn't serializable
518-
# https://github.com/pydata/xarray/issues/4242
519498
@_REGISTRY.register_write(H5Group, views.DaskArrayView, IOSpec("array", "0.2.0"))
520499
@_REGISTRY.register_write(H5Group, DaskArray, IOSpec("array", "0.2.0"))
521-
def write_basic_dask_h5(
522-
f: H5Group,
500+
def write_basic_dask_dask_dense(
501+
f: ZarrGroup | H5Group,
523502
k: str,
524503
elem: DaskArray,
525504
*,
@@ -529,11 +508,23 @@ def write_basic_dask_h5(
529508
import dask.array as da
530509
import dask.config as dc
531510

532-
if dc.get("scheduler", None) == "dask.distributed":
511+
is_distributed = dc.get("scheduler", None) == "dask.distributed"
512+
is_h5 = isinstance(f, H5Group)
513+
if is_distributed and is_h5:
533514
msg = "Cannot write dask arrays to hdf5 when using distributed scheduler"
534515
raise ValueError(msg)
535516

536-
g = f.require_dataset(k, shape=elem.shape, dtype=elem.dtype, **dataset_kwargs)
517+
dataset_kwargs = dataset_kwargs.copy()
518+
if not is_h5:
519+
dataset_kwargs = zarr_v3_compressor_compat(dataset_kwargs)
520+
# See https://github.com/dask/dask/issues/12109
521+
if Version(version("dask")) < Version("2025.4.0") and is_distributed:
522+
msg = "Writing dense data with a distributed scheduler to zarr could produce corrupted data with a Lock and will error without one when dask is older than 2025.4.0: https://github.com/dask/dask/issues/12109"
523+
raise RuntimeError(msg)
524+
if is_zarr_v2() or is_h5:
525+
g = f.require_dataset(k, shape=elem.shape, dtype=elem.dtype, **dataset_kwargs)
526+
else:
527+
g = f.require_array(k, shape=elem.shape, dtype=elem.dtype, **dataset_kwargs)
537528
da.store(elem, g)
538529

539530

tests/conftest.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,14 @@ def local_cluster_addr(
118118
# Adapted from https://pytest-xdist.readthedocs.io/en/latest/how-to.html#making-session-scoped-fixtures-execute-only-once
119119
import dask.distributed as dd
120120

121-
def make_cluster() -> dd.LocalCluster:
122-
return dd.LocalCluster(n_workers=1, threads_per_worker=1)
121+
def make_cluster(worker_id: str) -> dd.LocalCluster:
122+
# If we're not using multiple pytest-xdist workers, let the cluster have multiple workers.
123+
return dd.LocalCluster(
124+
n_workers=1 if worker_id != "master" else 2, threads_per_worker=1
125+
)
123126

124127
if worker_id == "master":
125-
with make_cluster() as cluster:
128+
with make_cluster(worker_id) as cluster:
126129
yield cluster.scheduler_address
127130
return
128131

@@ -138,7 +141,7 @@ def make_cluster() -> dd.LocalCluster:
138141
yield address
139142
return
140143

141-
with make_cluster() as cluster:
144+
with make_cluster(worker_id) as cluster:
142145
fn.write_text(cluster.scheduler_address)
143146
lock.release()
144147
yield cluster.scheduler_address

tests/lazy/test_concat.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ def test_concat_to_memory_var(
218218

219219

220220
@pytest.mark.xdist_group("dask")
221+
@pytest.mark.dask_distributed
221222
def test_concat_data_with_cluster_to_memory(
222223
adata_remote: AnnData, join: Join_T, local_cluster_addr: str
223224
) -> None:

tests/test_dask.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44

55
from __future__ import annotations
66

7+
from importlib.metadata import version
8+
from pathlib import Path
79
from typing import TYPE_CHECKING
810

911
import numpy as np
1012
import pandas as pd
1113
import pytest
14+
from packaging.version import Version
1215
from scipy import sparse
1316

1417
import anndata as ad
@@ -107,6 +110,7 @@ def test_dask_write(adata, tmp_path, diskfmt):
107110

108111

109112
@pytest.mark.xdist_group("dask")
113+
@pytest.mark.dask_distributed
110114
def test_dask_distributed_write(
111115
adata: AnnData,
112116
tmp_path: Path,
@@ -126,8 +130,15 @@ def test_dask_distributed_write(
126130
adata.obsm["b"] = da.random.random((M, 10))
127131
adata.varm["a"] = da.random.random((N, 10))
128132
orig = adata
129-
if diskfmt == "h5ad":
130-
with pytest.raises(ValueError, match=r"Cannot write dask arrays to hdf5"):
133+
is_h5 = diskfmt == "h5ad"
134+
is_corrupted_dask = Version(version("dask")) < Version("2025.4.0")
135+
if is_corrupted_dask or is_h5:
136+
with pytest.raises(
137+
ValueError if is_h5 else RuntimeError,
138+
match=r"Cannot write dask arrays to hdf5"
139+
if is_h5
140+
else r"Writing dense data with a distributed scheduler to zarr",
141+
):
131142
ad.io.write_elem(g, "", orig)
132143
return
133144
ad.io.write_elem(g, "", orig)
@@ -140,6 +151,7 @@ def test_dask_distributed_write(
140151

141152
assert_equal(curr.varm["a"], orig.varm["a"])
142153
assert_equal(curr.obsm["a"], orig.obsm["a"])
154+
assert_equal(curr.X, orig.X)
143155

144156
assert isinstance(curr.X, np.ndarray)
145157
assert isinstance(curr.obsm["a"], np.ndarray)

tests/test_io_elementwise.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,11 @@ def test_read_lazy_subsets_nd_dask(store, n_dims, chunks):
334334

335335

336336
@pytest.mark.xdist_group("dask")
337+
@pytest.mark.dask_distributed
337338
def test_read_lazy_h5_cluster(
338-
sparse_format: Literal["csr", "csc"], tmp_path: Path, local_cluster_addr: str
339+
sparse_format: Literal["csr", "csc"],
340+
tmp_path: Path,
341+
local_cluster_addr: str,
339342
) -> None:
340343
import dask.distributed as dd
341344

0 commit comments

Comments
 (0)