Skip to content

Commit 05d258e

Browse files
Fix _search_for_backing_files_recursively() to support multifile parquet (#1127)
fix _search_for_backing_files_recursively() to support multifile parquet
1 parent d8bf265 commit 05d258e

1 file changed

Lines changed: 40 additions & 11 deletions

File tree

src/spatialdata/_io/_utils.py

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
from spatialdata._core.spatialdata import SpatialData
2929
from spatialdata._io.format import RasterFormatType, RasterFormatV01, RasterFormatV02, RasterFormatV03
30+
from spatialdata._logging import logger
3031
from spatialdata._utils import get_pyramid_levels
3132
from spatialdata.models._utils import (
3233
MappingToCoordinateSystem_t,
@@ -357,17 +358,45 @@ def _search_for_backing_files_recursively(subgraph: Any, files: list[str]) -> No
357358
# This occurs when for example points and images are mixed, the main task still starts with
358359
# read_parquet, but the execution happens through a subgraph which we iterate over to get the
359360
# actual read_parquet task.
360-
for task in v.args[0].values():
361-
# Recursively go through tasks, this is required because differences between dask versions.
362-
piece_dict = _find_piece_dict(task)
363-
if isinstance(piece_dict, dict) and "piece" in piece_dict:
364-
parquet_file, check0, check1 = piece_dict["piece"] # type: ignore[misc]
365-
if not parquet_file.endswith(".parquet") or check0 is not None or check1 is not None:
366-
raise ValueError(
367-
f"Unable to parse the parquet file from the dask subgraph {subgraph}. Please "
368-
f"report this bug."
369-
)
370-
files.append(os.path.realpath(parquet_file))
361+
#
362+
# v.args[0] has two known shapes:
363+
# dict – keys are task keys, values are Task objects (classic subgraph case)
364+
# list – list of piece dicts produced when aggregate_files=True aggregates multiple
365+
# parquet files into one partition; check0/check1 are row-group selectors
366+
# ([0], []) rather than None, so only the file extension is validated.
367+
args0 = v.args[0]
368+
if isinstance(args0, dict):
369+
for task in args0.values():
370+
# Recursively go through tasks, this is required because differences between dask
371+
# versions.
372+
piece_dict = _find_piece_dict(task)
373+
if isinstance(piece_dict, dict) and "piece" in piece_dict:
374+
parquet_file, check0, check1 = piece_dict["piece"] # type: ignore[misc]
375+
if (
376+
not parquet_file.endswith(".parquet")
377+
or check0 is not None
378+
or check1 is not None
379+
):
380+
raise ValueError(
381+
f"Unable to parse the parquet file from the dask subgraph {subgraph}. "
382+
f"Please report this bug."
383+
)
384+
files.append(os.path.realpath(parquet_file))
385+
elif isinstance(args0, list):
386+
for item in args0:
387+
if isinstance(item, dict) and "piece" in item:
388+
parquet_file = item["piece"][0]
389+
if not parquet_file.endswith(".parquet"):
390+
raise ValueError(
391+
f"Unable to parse the parquet file from the dask subgraph {subgraph}. "
392+
f"Please report this bug."
393+
)
394+
files.append(os.path.realpath(parquet_file))
395+
else:
396+
logger.warning(
397+
f"Unexpected type {type(args0)} for v.args[0] in the read_parquet task graph. "
398+
f"Backing files may not be detected correctly. Please report this as a bug."
399+
)
371400

372401

373402
def _backed_elements_contained_in_path(path: Path, object: SpatialData | SpatialElement | AnnData) -> list[bool]:

0 commit comments

Comments
 (0)