From 80109dc57ad4f3c9767ba118d7bd2eaf765023d4 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 10 Dec 2024 13:31:16 -0800 Subject: [PATCH 1/5] add basic from_arrow_dataset api --- dask_expr/_collection.py | 28 ++++++ dask_expr/io/arrow.py | 209 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 237 insertions(+) create mode 100644 dask_expr/io/arrow.py diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 9ece42f61..16b07c74c 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -5508,6 +5508,34 @@ def read_parquet( ) +def from_arrow_dataset( + dataset, + columns=None, + filters=None, + blocksize="default", +): + """Read a PyArrow Dataset into a Dask DataFrame""" + import pyarrow.parquet as pq + + from dask_expr.io.arrow import FromArrowDataset + + if filters is not None: + filters = pq.filters_to_expression(filters) + + if blocksize == "default": + blocksize = None + + return new_collection( + FromArrowDataset( + dataset, + columns=_convert_to_list(columns), + filters=filters, + blocksize=blocksize, + _series=isinstance(columns, str), + ) + ) + + def concat( dfs, axis=0, diff --git a/dask_expr/io/arrow.py b/dask_expr/io/arrow.py new file mode 100644 index 000000000..d5027cb05 --- /dev/null +++ b/dask_expr/io/arrow.py @@ -0,0 +1,209 @@ +from __future__ import annotations + +import math +from functools import cached_property + +import numpy as np +import pyarrow as pa +from dask._task_spec import Task +from dask.typing import Key +from dask.utils import funcname, parse_bytes + +from dask_expr._util import _convert_to_list, _tokenize_deterministic +from dask_expr.io import BlockwiseIO, PartitionsFiltered +from dask_expr.io.parquet import _maybe_adjust_cpu_count + + +class FromArrowDataset(PartitionsFiltered, BlockwiseIO): + _parameters = [ + "dataset", + "columns", + "filters", + "blocksize", + "_partitions", + "_series", + ] + _defaults = { + "columns": None, + "filters": None, + "blocksize": None, + "_partitions": None, + "_series": False, + } + + _absorb_projections = True + _filter_passthrough = False + _scan_options = None + + @property + def columns(self): + columns_operand = self.operand("columns") + if columns_operand is None: + return list(self._meta.columns) + else: + return _convert_to_list(columns_operand) + + @cached_property + def _name(self): + return ( + "from-dataset" + + "-" + + _tokenize_deterministic(funcname(type(self)), *self.operands[:-1]) + ) + + @cached_property + def _meta(self): + meta = self.dataset.schema.empty_table().to_pandas() + columns = _convert_to_list(self.operand("columns")) + if self._series: + assert len(columns) > 0 + return meta[columns[0]] + elif columns is not None: + return meta[columns] + return meta + + @cached_property + def _mean_file_size(self): + return np.mean( + [ + frag.filesystem.get_file_info(frag.path).size + for frag in self.fragments[:3] + ] + ) + + @cached_property + def _plan(self): + # TODO: Use metadata for Parquet + num_files = len(self.fragments) + splits, stride = 1, 1 + blocksize = self.operand("blocksize") + if blocksize: + file_size = self._mean_file_size + if file_size > blocksize: + # Split large files + splits = math.ceil(file_size / blocksize) + else: + # Aggregate small files + stride = max(int(blocksize / file_size), 1) + if splits > 1: + count = num_files * splits + else: + count = math.ceil(num_files / stride) + return { + "count": count, + "splits": splits, + "stride": stride, + } + + def _divisions(self): + return (None,) * (self._plan["count"] + 1) + + @staticmethod + def _table_to_pandas(table): + return table.to_pandas() + + @cached_property + def fragments(self): + if self.filters is not None: + return np.array(list(self.dataset.get_fragments(filter=self.filters))) + return np.array(list(self.dataset.get_fragments())) + + @classmethod + def _read_fragments( + cls, + fragments, + columns, + filters, + schema, + fragment_to_table_options, + table_to_pandas_options, + split_range, + ): + fragment_to_table_options = fragment_to_table_options or {} + table_to_pandas_options = table_to_pandas_options or {} + return cls._table_to_pandas( + pa.concat_tables( + [ + cls._fragment_to_table( + fragment, + filters=filters, + columns=columns, + schema=schema, + split_range=split_range, + **fragment_to_table_options, + ) + for fragment in fragments + ], + promote_options="permissive", + ), + **table_to_pandas_options, + ) + + def _filtered_task(self, name: Key, index: int) -> Task: + columns = self.columns.copy() + schema = self.dataset.schema.remove_metadata() + splits, stride = self._plan["splits"], self._plan["stride"] + if splits > 1: + frag_index = int(index / splits) + split_range = (index % splits, splits) + else: + frag_index = index * stride + split_range = None + return Task( + name, + self._read_fragments, + self.fragments[frag_index : frag_index + stride], + columns=columns, + filters=self.filters, + schema=schema, + fragment_to_table_options=None, + table_to_pandas_options=None, + split_range=split_range, + ) + + @classmethod + def _fragment_to_table( + cls, + fragment, + filters, + columns, + schema, + split_range, + **fragment_to_table_options, + ): + _maybe_adjust_cpu_count() + options = { + "columns": columns, + "filter": filters, + "batch_size": 10_000_000, + "fragment_scan_options": cls._scan_options, + "use_threads": True, + } + options.update(fragment_to_table_options) + if split_range: + total_rows = fragment.count_rows(filter=filters) + n_rows = int(total_rows / split_range[1]) + skip_rows = n_rows * split_range[0] + if split_range[0] == (split_range[1] - 1): + end = total_rows + else: + end = skip_rows + n_rows + return fragment.take( + range(skip_rows, end), + **options, + ) + else: + return fragment.to_table( + schema=schema, + **options, + ) + + +class FromArrowDatasetParquet(FromArrowDataset): + _scan_options = pa.dataset.ParquetFragmentScanOptions( + pre_buffer=True, + cache_options=pa.CacheOptions( + hole_size_limit=parse_bytes("4 MiB"), + range_size_limit=parse_bytes("32.00 MiB"), + ), + ) From b7a1f6f97dd037153f23a8288d42e1fef55513d2 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 13 Dec 2024 07:54:57 -0800 Subject: [PATCH 2/5] add path_column support --- dask_expr/_collection.py | 18 ++-- dask_expr/io/arrow.py | 176 +++++++++++++++++++++++++++++++-------- 2 files changed, 152 insertions(+), 42 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 16b07c74c..84e7ccff3 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -5513,24 +5513,24 @@ def from_arrow_dataset( columns=None, filters=None, blocksize="default", + path_column=None, + fragment_to_table_options=None, + table_to_pandas_options=None, + custom_backend_options=None, ): """Read a PyArrow Dataset into a Dask DataFrame""" - import pyarrow.parquet as pq - from dask_expr.io.arrow import FromArrowDataset - if filters is not None: - filters = pq.filters_to_expression(filters) - - if blocksize == "default": - blocksize = None - return new_collection( FromArrowDataset( dataset, columns=_convert_to_list(columns), filters=filters, - blocksize=blocksize, + blocksize=(None if blocksize == "default" else blocksize), + path_column=path_column, + fragment_to_table_options=fragment_to_table_options, + table_to_pandas_options=table_to_pandas_options, + custom_backend_options=custom_backend_options, _series=isinstance(columns, str), ) ) diff --git a/dask_expr/io/arrow.py b/dask_expr/io/arrow.py index d5027cb05..b0ac1aa95 100644 --- a/dask_expr/io/arrow.py +++ b/dask_expr/io/arrow.py @@ -1,25 +1,73 @@ from __future__ import annotations +import enum import math +from enum import IntEnum from functools import cached_property import numpy as np import pyarrow as pa +import pyarrow.parquet as pq from dask._task_spec import Task from dask.typing import Key from dask.utils import funcname, parse_bytes +from dask_expr._expr import Index, Projection, determine_column_projection from dask_expr._util import _convert_to_list, _tokenize_deterministic from dask_expr.io import BlockwiseIO, PartitionsFiltered from dask_expr.io.parquet import _maybe_adjust_cpu_count +class PartitionFlavor(IntEnum): + """Flavor of file:partition mapping.""" + + SINGLE_FILE = enum.auto() # 1:1 mapping between files and partitions + SPLIT_FILES = enum.auto() # Split each file into >1 partition + FUSED_FILES = enum.auto() # Fuse multiple files into each partition + + +class PartitionPlan: + """Partition-mappiing plan.""" + + __slots__ = ("factor", "flavor", "count") + factor: int + flavor: PartitionFlavor + count: int + + def __init__(self, factor: int, flavor: PartitionFlavor, file_count: int) -> None: + if flavor == PartitionFlavor.SINGLE_FILE and factor != 1: + raise ValueError(f"Expected factor == 1 for {flavor}, got: {factor}") + self.factor = factor + self.flavor = flavor + if flavor == PartitionFlavor.SINGLE_FILE: + self.count = file_count + elif flavor == PartitionFlavor.SPLIT_FILES: + self.count = file_count * factor + elif flavor == PartitionFlavor.FUSED_FILES: + self.count = math.ceil(file_count / factor) + else: + raise ValueError(f"{flavor} not a supported PartitionFlavor") + + +def _pa_filters(filters): + # Simple utility to covert filters to + # a pyarrow-compatible expression + if filters is None: + return None + else: + return pq.filters_to_expression(filters) + + class FromArrowDataset(PartitionsFiltered, BlockwiseIO): _parameters = [ "dataset", "columns", "filters", "blocksize", + "path_column", + "fragment_to_table_options", + "table_to_pandas_options", + "custom_backend_options", "_partitions", "_series", ] @@ -27,6 +75,10 @@ class FromArrowDataset(PartitionsFiltered, BlockwiseIO): "columns": None, "filters": None, "blocksize": None, + "path_column": None, + "fragment_to_table_options": None, + "table_to_pandas_options": None, + "custom_backend_options": None, "_partitions": None, "_series": False, } @@ -43,17 +95,24 @@ def columns(self): else: return _convert_to_list(columns_operand) + @cached_property + def pa_filters(self): + return _pa_filters(self.filters) + @cached_property def _name(self): - return ( - "from-dataset" - + "-" - + _tokenize_deterministic(funcname(type(self)), *self.operands[:-1]) + return "from-dataset-" + _tokenize_deterministic( + funcname(type(self)), *self.operands[:-1] ) @cached_property def _meta(self): - meta = self.dataset.schema.empty_table().to_pandas() + schema = self.dataset.schema + if self.path_column is not None: + if self.path_column in schema.names: + raise ValueError(f"{self.path_column} column already exists in schema.") + schema = schema.append(pa.field(self.path_column, pa.string())) + meta = schema.empty_table().to_pandas() columns = _convert_to_list(self.operand("columns")) if self._series: assert len(columns) > 0 @@ -72,31 +131,36 @@ def _mean_file_size(self): ) @cached_property - def _plan(self): - # TODO: Use metadata for Parquet + def _plan(self) -> PartitionPlan: num_files = len(self.fragments) - splits, stride = 1, 1 + plan = PartitionPlan( + factor=1, + flavor=PartitionFlavor.SINGLE_FILE, + file_count=num_files, + ) # Default plan blocksize = self.operand("blocksize") - if blocksize: + if blocksize is not None: + blocksize = parse_bytes(blocksize) + # TODO: Use metadata for Parquet file_size = self._mean_file_size if file_size > blocksize: # Split large files - splits = math.ceil(file_size / blocksize) + plan = PartitionPlan( + factor=math.ceil(file_size / blocksize), + flavor=PartitionFlavor.SPLIT_FILES, + file_count=num_files, + ) else: # Aggregate small files - stride = max(int(blocksize / file_size), 1) - if splits > 1: - count = num_files * splits - else: - count = math.ceil(num_files / stride) - return { - "count": count, - "splits": splits, - "stride": stride, - } + plan = PartitionPlan( + factor=max(int(blocksize / file_size), 1), + flavor=PartitionFlavor.FUSED_FILES, + file_count=num_files, + ) + return plan def _divisions(self): - return (None,) * (self._plan["count"] + 1) + return (None,) * (self._plan.count + 1) @staticmethod def _table_to_pandas(table): @@ -104,23 +168,28 @@ def _table_to_pandas(table): @cached_property def fragments(self): - if self.filters is not None: - return np.array(list(self.dataset.get_fragments(filter=self.filters))) + if self.pa_filters is not None: + return np.array(list(self.dataset.get_fragments(filter=self.pa_filters))) return np.array(list(self.dataset.get_fragments())) @classmethod - def _read_fragments( + def read_fragments( cls, fragments, columns, filters, schema, + path_column, fragment_to_table_options, table_to_pandas_options, + custom_backend_options, split_range, ): + """Read list of fragments into DataFrame partitions.""" fragment_to_table_options = fragment_to_table_options or {} table_to_pandas_options = table_to_pandas_options or {} + if custom_backend_options: + raise ValueError(f"Unsupported options: {custom_backend_options}") return cls._table_to_pandas( pa.concat_tables( [ @@ -130,6 +199,7 @@ def _read_fragments( columns=columns, schema=schema, split_range=split_range, + path_column=path_column, **fragment_to_table_options, ) for fragment in fragments @@ -142,22 +212,27 @@ def _read_fragments( def _filtered_task(self, name: Key, index: int) -> Task: columns = self.columns.copy() schema = self.dataset.schema.remove_metadata() - splits, stride = self._plan["splits"], self._plan["stride"] - if splits > 1: + flavor = self._plan.flavor + if flavor == PartitionFlavor.SPLIT_FILES: + splits = self._plan.factor + stride = 1 frag_index = int(index / splits) split_range = (index % splits, splits) else: + stride = self._plan.factor frag_index = index * stride split_range = None return Task( name, - self._read_fragments, + self.read_fragments, self.fragments[frag_index : frag_index + stride], columns=columns, filters=self.filters, schema=schema, - fragment_to_table_options=None, - table_to_pandas_options=None, + path_column=self.path_column, + fragment_to_table_options=self.fragment_to_table_options, + table_to_pandas_options=self.table_to_pandas_options, + custom_backend_options=self.custom_backend_options, split_range=split_range, ) @@ -169,12 +244,17 @@ def _fragment_to_table( columns, schema, split_range, + path_column, **fragment_to_table_options, ): _maybe_adjust_cpu_count() options = { - "columns": columns, - "filter": filters, + "columns": ( + columns + if path_column is None + else [c for c in columns if c != path_column] + ), + "filter": _pa_filters(filters), "batch_size": 10_000_000, "fragment_scan_options": cls._scan_options, "use_threads": True, @@ -188,15 +268,45 @@ def _fragment_to_table( end = total_rows else: end = skip_rows + n_rows - return fragment.take( + table = fragment.take( range(skip_rows, end), **options, ) else: - return fragment.to_table( + table = fragment.to_table( schema=schema, **options, ) + if path_column is None: + return table + else: + return table.append_column( + path_column, pa.array([fragment.path] * len(table), pa.string()) + ) + + @property + def _fusion_compression_factor(self): + # TODO: Deal with column-projection adjustments + return 1 + + def _simplify_up(self, parent, dependents): + if isinstance(parent, Index): + # Column projection + columns = determine_column_projection(self, parent, dependents) + columns = [col for col in self.columns if col in columns] + if set(columns) == set(self.columns): + return + return Index( + self.substitute_parameters({"columns": columns, "_series": False}) + ) + + if isinstance(parent, Projection): + return super()._simplify_up(parent, dependents) + + def _simplify_down(self): + file_format = self.dataset.format.default_extname + if file_format == "parquet": + return FromArrowDatasetParquet(*self.operands) class FromArrowDatasetParquet(FromArrowDataset): From 69207482b6061efc791639f4a6a4d5e0c5e74300 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 13 Dec 2024 11:13:35 -0800 Subject: [PATCH 3/5] add docstring --- dask_expr/_collection.py | 53 ++++++++++++++++++-- dask_expr/io/arrow.py | 104 +++++++++++++++++++++++++++++++-------- 2 files changed, 132 insertions(+), 25 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 84e7ccff3..ffe78b9b9 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -5512,13 +5512,56 @@ def from_arrow_dataset( dataset, columns=None, filters=None, - blocksize="default", + blocksize=None, path_column=None, fragment_to_table_options=None, - table_to_pandas_options=None, + table_to_dataframe_options=None, custom_backend_options=None, ): - """Read a PyArrow Dataset into a Dask DataFrame""" + """Read a PyArrow Dataset into a Dask DataFrame + + Parameters + ---------- + dataset : pyarrow.dataset.Dataset + Arrow Dataset object to convert to Dask DataFrame. + columns : str or list, default None + Field name(s) to read in as columns in the output. By default all + non-index fields will be read (as determined by the arrow Dataset + schema). Provide a single field name instead of a list to + read in the data as a Series. + filters : Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], default None + List of filters to apply, like ``[[('col1', '==', 0), ...], ...]``. + Using this argument will result in row-wise filtering of the final partitions. + + Predicates can be expressed in disjunctive normal form (DNF). This means that + the inner-most tuple describes a single column predicate. These inner predicates + are combined with an AND conjunction into a larger predicate. The outer-most + list then combines all of the combined filters with an OR disjunction. + + Predicates can also be expressed as a ``List[Tuple]``. These are evaluated + as an AND conjunction. To express OR in predicates, one must use the + (preferred for "pyarrow") ``List[List[Tuple]]`` notation. + blocksize : int, str or None, default None + The desired size of each output ``DataFrame`` partition in terms of + approximate (uncompressed) storage space. If None is specified + (the default) each file will be mapped to a distinct partition. + path_column: str, default None + Optional column name to use for additional path information. By default, + a path column will not be included in the output DataFrame. + fragment_to_table_options: dict, default None + Optional arguments for fragment-to-table conversion. + table_to_dataframe_options: dict, default None + Optional arguments for table-to-dataframe conversion. + custom_backend_options: dict, default None + Optional backend arguments. These options are passed to the underlying + ``FromArrowDataset.read_fragments`` implementation. + + Examples + -------- + >>> from pyarrow.dataset import dataset # doctest: +SKIP + >>> ds = dataset('/my/parquet/dataset', format='parquet') # doctest: +SKIP + >>> df = dd.from_arrow_dataset(ds, blocksize="100MiB") # doctest: +SKIP + """ from dask_expr.io.arrow import FromArrowDataset return new_collection( @@ -5526,10 +5569,10 @@ def from_arrow_dataset( dataset, columns=_convert_to_list(columns), filters=filters, - blocksize=(None if blocksize == "default" else blocksize), + blocksize=blocksize, path_column=path_column, fragment_to_table_options=fragment_to_table_options, - table_to_pandas_options=table_to_pandas_options, + table_to_dataframe_options=table_to_dataframe_options, custom_backend_options=custom_backend_options, _series=isinstance(columns, str), ) diff --git a/dask_expr/io/arrow.py b/dask_expr/io/arrow.py index b0ac1aa95..477c3dd96 100644 --- a/dask_expr/io/arrow.py +++ b/dask_expr/io/arrow.py @@ -66,7 +66,7 @@ class FromArrowDataset(PartitionsFiltered, BlockwiseIO): "blocksize", "path_column", "fragment_to_table_options", - "table_to_pandas_options", + "table_to_dataframe_options", "custom_backend_options", "_partitions", "_series", @@ -77,7 +77,7 @@ class FromArrowDataset(PartitionsFiltered, BlockwiseIO): "blocksize": None, "path_column": None, "fragment_to_table_options": None, - "table_to_pandas_options": None, + "table_to_dataframe_options": None, "custom_backend_options": None, "_partitions": None, "_series": False, @@ -162,8 +162,8 @@ def _plan(self) -> PartitionPlan: def _divisions(self): return (None,) * (self._plan.count + 1) - @staticmethod - def _table_to_pandas(table): + @classmethod + def _table_to_dataframe(cls, table): return table.to_pandas() @cached_property @@ -181,16 +181,16 @@ def read_fragments( schema, path_column, fragment_to_table_options, - table_to_pandas_options, + table_to_dataframe_options, custom_backend_options, split_range, ): """Read list of fragments into DataFrame partitions.""" fragment_to_table_options = fragment_to_table_options or {} - table_to_pandas_options = table_to_pandas_options or {} + table_to_dataframe_options = table_to_dataframe_options or {} if custom_backend_options: raise ValueError(f"Unsupported options: {custom_backend_options}") - return cls._table_to_pandas( + return cls._table_to_dataframe( pa.concat_tables( [ cls._fragment_to_table( @@ -206,7 +206,7 @@ def read_fragments( ], promote_options="permissive", ), - **table_to_pandas_options, + **table_to_dataframe_options, ) def _filtered_task(self, name: Key, index: int) -> Task: @@ -231,11 +231,36 @@ def _filtered_task(self, name: Key, index: int) -> Task: schema=schema, path_column=self.path_column, fragment_to_table_options=self.fragment_to_table_options, - table_to_pandas_options=self.table_to_pandas_options, + table_to_dataframe_options=self.table_to_dataframe_options, custom_backend_options=self.custom_backend_options, split_range=split_range, ) + @classmethod + def _partial_fragment_to_table( + cls, + fragment, + schema, + filters, + split_index, + split_count, + options, + ): + # Partial-file read (default/catch-all logic) + filters = _pa_filters(filters) + total_rows = fragment.count_rows(filter=filters) + n_rows = int(total_rows / split_count) + skip_rows = n_rows * split_index + if split_index == (split_count - 1): + end = total_rows + else: + end = skip_rows + n_rows + return fragment.take( + range(skip_rows, end), + filter=filters, + **options, + ) + @classmethod def _fragment_to_table( cls, @@ -254,27 +279,23 @@ def _fragment_to_table( if path_column is None else [c for c in columns if c != path_column] ), - "filter": _pa_filters(filters), "batch_size": 10_000_000, "fragment_scan_options": cls._scan_options, "use_threads": True, } options.update(fragment_to_table_options) if split_range: - total_rows = fragment.count_rows(filter=filters) - n_rows = int(total_rows / split_range[1]) - skip_rows = n_rows * split_range[0] - if split_range[0] == (split_range[1] - 1): - end = total_rows - else: - end = skip_rows + n_rows - table = fragment.take( - range(skip_rows, end), - **options, + table = cls._partial_fragment_to_table( + fragment, + schema, + filters, + *split_range, + options, ) else: table = fragment.to_table( schema=schema, + filter=_pa_filters(filters), **options, ) if path_column is None: @@ -317,3 +338,46 @@ class FromArrowDatasetParquet(FromArrowDataset): range_size_limit=parse_bytes("32.00 MiB"), ), ) + + @classmethod + def _partial_fragment_to_table( + cls, + fragment, + filters, + schema, + split_index, + split_count, + options, + ): + # Parquet-specific partial read. + # (try to align reads with row-groups) + total_row_groups = fragment.num_row_groups + if total_row_groups < split_count: + # Cannot align with row-groups. + # Use catch-all logic + return FromArrowDataset._partial_fragment_to_table( + fragment, + filters, + schema, + split_index, + split_count, + options, + ) + + # Align with row-groups + rg_stride = total_row_groups // split_count + skip_rgs = rg_stride * split_index + rgs = list(range(total_row_groups)) + new_rgs = rgs[skip_rgs : skip_rgs + rg_stride] + if split_index == (split_count - 1): + new_rgs = rgs[skip_rgs:] + filters = _pa_filters(filters) + return fragment.subset( + fiter=filters, + schema=schema, + row_group_ids=new_rgs, + ).to_table( + schema=schema, + filter=filters, + **options, + ) From 930777c5d119a1c4eed2fe8e63f230f0199b4566 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 13 Dec 2024 11:19:59 -0800 Subject: [PATCH 4/5] maybe avoid concat --- dask_expr/io/arrow.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/dask_expr/io/arrow.py b/dask_expr/io/arrow.py index 477c3dd96..1f0a40e15 100644 --- a/dask_expr/io/arrow.py +++ b/dask_expr/io/arrow.py @@ -190,22 +190,25 @@ def read_fragments( table_to_dataframe_options = table_to_dataframe_options or {} if custom_backend_options: raise ValueError(f"Unsupported options: {custom_backend_options}") + tables = [ + cls._fragment_to_table( + fragment, + filters=filters, + columns=columns, + schema=schema, + split_range=split_range, + path_column=path_column, + **fragment_to_table_options, + ) + for fragment in fragments + ] return cls._table_to_dataframe( pa.concat_tables( - [ - cls._fragment_to_table( - fragment, - filters=filters, - columns=columns, - schema=schema, - split_range=split_range, - path_column=path_column, - **fragment_to_table_options, - ) - for fragment in fragments - ], + tables, promote_options="permissive", - ), + ) + if len(tables) > 1 + else tables, **table_to_dataframe_options, ) From e37ca796300e93015b6ef17f25fbf1f8e486f306 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 16 Dec 2024 09:38:42 -0800 Subject: [PATCH 5/5] use byte ranges for csv and json --- dask_expr/io/arrow.py | 120 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 118 insertions(+), 2 deletions(-) diff --git a/dask_expr/io/arrow.py b/dask_expr/io/arrow.py index 1f0a40e15..9c24ee886 100644 --- a/dask_expr/io/arrow.py +++ b/dask_expr/io/arrow.py @@ -11,6 +11,7 @@ from dask._task_spec import Task from dask.typing import Key from dask.utils import funcname, parse_bytes +from fsspec.utils import read_block from dask_expr._expr import Index, Projection, determine_column_projection from dask_expr._util import _convert_to_list, _tokenize_deterministic @@ -208,7 +209,7 @@ def read_fragments( promote_options="permissive", ) if len(tables) > 1 - else tables, + else tables[0], **table_to_dataframe_options, ) @@ -329,10 +330,107 @@ def _simplify_up(self, parent, dependents): def _simplify_down(self): file_format = self.dataset.format.default_extname - if file_format == "parquet": + if file_format == "csv": + return FromArrowDatasetCSV(*self.operands) + elif file_format == "json": + return FromArrowDatasetJSON(*self.operands) + elif file_format == "parquet": return FromArrowDatasetParquet(*self.operands) +class FromArrowDatasetCSV(FromArrowDataset): + @classmethod + def _partial_fragment_to_table( + cls, + fragment, + schema, + filters, + split_index, + split_count, + options, + ): + # Calculate byte range for this read + path = fragment.path + filesystem = fragment.filesystem + size = filesystem.get_file_info(path).size + nbytes = size // split_count + offset = nbytes * split_index + if split_index == (split_count - 1): + nbytes = size - offset + + # Handle header and delimiter + add_header = b"" + row_delimiter = b"\n" + scan_options = fragment.format.default_fragment_scan_options + column_names = scan_options.column_names + skip_rows = scan_options.skip_rows + if split_index: + if not column_names and not skip_rows: + add_header = _read_byte_block( + path, + filesystem, + 0, + 1, + delimiter=row_delimiter, + ) + for _ in range(skip_rows): + add_header += row_delimiter + + # Read partial fragment + return fragment.format.make_fragment( + pa.py_buffer( + add_header + + _read_byte_block( + path, + filesystem, + offset, + nbytes, + delimiter=row_delimiter, + ) + ) + ).to_table( + filter=filters, + **options, + ) + + +class FromArrowDatasetJSON(FromArrowDataset): + @classmethod + def _partial_fragment_to_table( + cls, + fragment, + schema, + filters, + split_index, + split_count, + options, + ): + # Calculate byte range for this read + path = fragment.path + filesystem = fragment.filesystem + size = filesystem.get_file_info(path).size + nbytes = size // split_count + offset = nbytes * split_index + if split_index == (split_count - 1): + nbytes = size - offset + + # Read partial fragment + return fragment.format.make_fragment( + pa.py_buffer( + _read_byte_block( + path, + filesystem, + offset, + nbytes, + delimiter=b"\n", + ) + ) + ).to_table( + filter=filters, + **options, + ) + + class FromArrowDatasetParquet(FromArrowDataset): _scan_options = pa.dataset.ParquetFragmentScanOptions( pre_buffer=True, @@ -384,3 +482,21 @@ def _partial_fragment_to_table( filter=filters, **options, ) + + +def _read_byte_block( + path, + filesystem, + offset, + nbytes, + delimiter=None, +): + # Use fsspec to read in a delimited byte range + with filesystem.open_input_file(path) as f: + block = read_block( + f, + offset, + nbytes, + delimiter, + ) + return block