diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index db186de42..39d6ca5e1 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -5492,6 +5492,77 @@ def read_parquet( ) +def from_arrow_dataset( + dataset, + columns=None, + filters=None, + blocksize=None, + path_column=None, + fragment_to_table_options=None, + table_to_dataframe_options=None, + custom_backend_options=None, +): + """Read a PyArrow Dataset into a Dask DataFrame + + Parameters + ---------- + dataset : pyarrow.dataset.Dataset + Arrow Dataset object to convert to Dask DataFrame. + columns : str or list, default None + Field name(s) to read in as columns in the output. By default all + non-index fields will be read (as determined by the arrow Dataset + schema). Provide a single field name instead of a list to + read in the data as a Series. + filters : Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], default None + List of filters to apply, like ``[[('col1', '==', 0), ...], ...]``. + Using this argument will result in row-wise filtering of the final partitions. + + Predicates can be expressed in disjunctive normal form (DNF). This means that + the inner-most tuple describes a single column predicate. These inner predicates + are combined with an AND conjunction into a larger predicate. The outer-most + list then combines all of the combined filters with an OR disjunction. + + Predicates can also be expressed as a ``List[Tuple]``. These are evaluated + as an AND conjunction. To express OR in predicates, one must use the + (preferred for "pyarrow") ``List[List[Tuple]]`` notation. + blocksize : int, str or None, default None + The desired size of each output ``DataFrame`` partition in terms of + approximate (uncompressed) storage space. If None is specified + (the default) each file will be mapped to a distinct partition. + path_column: str, default None + Optional column name to use for additional path information. By default, + a path column will not be included in the output DataFrame. + fragment_to_table_options: dict, default None + Optional arguments for fragment-to-table conversion. + table_to_dataframe_options: dict, default None + Optional arguments for table-to-dataframe conversion. + custom_backend_options: dict, default None + Optional backend arguments. These options are passed to the underlying + ``FromArrowDataset.read_fragments`` implementation. + + Examples + -------- + >>> from pyarrow.dataset import dataset # doctest: +SKIP + >>> ds = dataset('/my/parquet/dataset', format='parquet') # doctest: +SKIP + >>> df = dd.from_arrow_dataset(ds, blocksize="100MiB") # doctest: +SKIP + """ + from dask_expr.io.arrow import FromArrowDataset + + return new_collection( + FromArrowDataset( + dataset, + columns=_convert_to_list(columns), + filters=filters, + blocksize=blocksize, + path_column=path_column, + fragment_to_table_options=fragment_to_table_options, + table_to_dataframe_options=table_to_dataframe_options, + custom_backend_options=custom_backend_options, + _series=isinstance(columns, str), + ) + ) + + def concat( dfs, axis=0, diff --git a/dask_expr/io/arrow.py b/dask_expr/io/arrow.py new file mode 100644 index 000000000..9c24ee886 --- /dev/null +++ b/dask_expr/io/arrow.py @@ -0,0 +1,502 @@ +from __future__ import annotations + +import enum +import math +from enum import IntEnum +from functools import cached_property + +import numpy as np +import pyarrow as pa +import pyarrow.parquet as pq +from dask._task_spec import Task +from dask.typing import Key +from dask.utils import funcname, parse_bytes +from fsspec.utils import read_block + +from dask_expr._expr import Index, Projection, determine_column_projection +from dask_expr._util import _convert_to_list, _tokenize_deterministic +from dask_expr.io import BlockwiseIO, PartitionsFiltered +from dask_expr.io.parquet import _maybe_adjust_cpu_count + + +class PartitionFlavor(IntEnum): + """Flavor of file:partition mapping.""" + + SINGLE_FILE = enum.auto() # 1:1 mapping between files and partitions + SPLIT_FILES = enum.auto() # Split each file into >1 partition + FUSED_FILES = enum.auto() # Fuse multiple files into each partition + + +class PartitionPlan: + """Partition-mappiing plan.""" + + __slots__ = ("factor", "flavor", "count") + factor: int + flavor: PartitionFlavor + count: int + + def __init__(self, factor: int, flavor: PartitionFlavor, file_count: int) -> None: + if flavor == PartitionFlavor.SINGLE_FILE and factor != 1: + raise ValueError(f"Expected factor == 1 for {flavor}, got: {factor}") + self.factor = factor + self.flavor = flavor + if flavor == PartitionFlavor.SINGLE_FILE: + self.count = file_count + elif flavor == PartitionFlavor.SPLIT_FILES: + self.count = file_count * factor + elif flavor == PartitionFlavor.FUSED_FILES: + self.count = math.ceil(file_count / factor) + else: + raise ValueError(f"{flavor} not a supported PartitionFlavor") + + +def _pa_filters(filters): + # Simple utility to covert filters to + # a pyarrow-compatible expression + if filters is None: + return None + else: + return pq.filters_to_expression(filters) + + +class FromArrowDataset(PartitionsFiltered, BlockwiseIO): + _parameters = [ + "dataset", + "columns", + "filters", + "blocksize", + "path_column", + "fragment_to_table_options", + "table_to_dataframe_options", + "custom_backend_options", + "_partitions", + "_series", + ] + _defaults = { + "columns": None, + "filters": None, + "blocksize": None, + "path_column": None, + "fragment_to_table_options": None, + "table_to_dataframe_options": None, + "custom_backend_options": None, + "_partitions": None, + "_series": False, + } + + _absorb_projections = True + _filter_passthrough = False + _scan_options = None + + @property + def columns(self): + columns_operand = self.operand("columns") + if columns_operand is None: + return list(self._meta.columns) + else: + return _convert_to_list(columns_operand) + + @cached_property + def pa_filters(self): + return _pa_filters(self.filters) + + @cached_property + def _name(self): + return "from-dataset-" + _tokenize_deterministic( + funcname(type(self)), *self.operands[:-1] + ) + + @cached_property + def _meta(self): + schema = self.dataset.schema + if self.path_column is not None: + if self.path_column in schema.names: + raise ValueError(f"{self.path_column} column already exists in schema.") + schema = schema.append(pa.field(self.path_column, pa.string())) + meta = schema.empty_table().to_pandas() + columns = _convert_to_list(self.operand("columns")) + if self._series: + assert len(columns) > 0 + return meta[columns[0]] + elif columns is not None: + return meta[columns] + return meta + + @cached_property + def _mean_file_size(self): + return np.mean( + [ + frag.filesystem.get_file_info(frag.path).size + for frag in self.fragments[:3] + ] + ) + + @cached_property + def _plan(self) -> PartitionPlan: + num_files = len(self.fragments) + plan = PartitionPlan( + factor=1, + flavor=PartitionFlavor.SINGLE_FILE, + file_count=num_files, + ) # Default plan + blocksize = self.operand("blocksize") + if blocksize is not None: + blocksize = parse_bytes(blocksize) + # TODO: Use metadata for Parquet + file_size = self._mean_file_size + if file_size > blocksize: + # Split large files + plan = PartitionPlan( + factor=math.ceil(file_size / blocksize), + flavor=PartitionFlavor.SPLIT_FILES, + file_count=num_files, + ) + else: + # Aggregate small files + plan = PartitionPlan( + factor=max(int(blocksize / file_size), 1), + flavor=PartitionFlavor.FUSED_FILES, + file_count=num_files, + ) + return plan + + def _divisions(self): + return (None,) * (self._plan.count + 1) + + @classmethod + def _table_to_dataframe(cls, table): + return table.to_pandas() + + @cached_property + def fragments(self): + if self.pa_filters is not None: + return np.array(list(self.dataset.get_fragments(filter=self.pa_filters))) + return np.array(list(self.dataset.get_fragments())) + + @classmethod + def read_fragments( + cls, + fragments, + columns, + filters, + schema, + path_column, + fragment_to_table_options, + table_to_dataframe_options, + custom_backend_options, + split_range, + ): + """Read list of fragments into DataFrame partitions.""" + fragment_to_table_options = fragment_to_table_options or {} + table_to_dataframe_options = table_to_dataframe_options or {} + if custom_backend_options: + raise ValueError(f"Unsupported options: {custom_backend_options}") + tables = [ + cls._fragment_to_table( + fragment, + filters=filters, + columns=columns, + schema=schema, + split_range=split_range, + path_column=path_column, + **fragment_to_table_options, + ) + for fragment in fragments + ] + return cls._table_to_dataframe( + pa.concat_tables( + tables, + promote_options="permissive", + ) + if len(tables) > 1 + else tables[0], + **table_to_dataframe_options, + ) + + def _filtered_task(self, name: Key, index: int) -> Task: + columns = self.columns.copy() + schema = self.dataset.schema.remove_metadata() + flavor = self._plan.flavor + if flavor == PartitionFlavor.SPLIT_FILES: + splits = self._plan.factor + stride = 1 + frag_index = int(index / splits) + split_range = (index % splits, splits) + else: + stride = self._plan.factor + frag_index = index * stride + split_range = None + return Task( + name, + self.read_fragments, + self.fragments[frag_index : frag_index + stride], + columns=columns, + filters=self.filters, + schema=schema, + path_column=self.path_column, + fragment_to_table_options=self.fragment_to_table_options, + table_to_dataframe_options=self.table_to_dataframe_options, + custom_backend_options=self.custom_backend_options, + split_range=split_range, + ) + + @classmethod + def _partial_fragment_to_table( + cls, + fragment, + schema, + filters, + split_index, + split_count, + options, + ): + # Partial-file read (default/catch-all logic) + filters = _pa_filters(filters) + total_rows = fragment.count_rows(filter=filters) + n_rows = int(total_rows / split_count) + skip_rows = n_rows * split_index + if split_index == (split_count - 1): + end = total_rows + else: + end = skip_rows + n_rows + return fragment.take( + range(skip_rows, end), + filter=filters, + **options, + ) + + @classmethod + def _fragment_to_table( + cls, + fragment, + filters, + columns, + schema, + split_range, + path_column, + **fragment_to_table_options, + ): + _maybe_adjust_cpu_count() + options = { + "columns": ( + columns + if path_column is None + else [c for c in columns if c != path_column] + ), + "batch_size": 10_000_000, + "fragment_scan_options": cls._scan_options, + "use_threads": True, + } + options.update(fragment_to_table_options) + if split_range: + table = cls._partial_fragment_to_table( + fragment, + schema, + filters, + *split_range, + options, + ) + else: + table = fragment.to_table( + schema=schema, + filter=_pa_filters(filters), + **options, + ) + if path_column is None: + return table + else: + return table.append_column( + path_column, pa.array([fragment.path] * len(table), pa.string()) + ) + + @property + def _fusion_compression_factor(self): + # TODO: Deal with column-projection adjustments + return 1 + + def _simplify_up(self, parent, dependents): + if isinstance(parent, Index): + # Column projection + columns = determine_column_projection(self, parent, dependents) + columns = [col for col in self.columns if col in columns] + if set(columns) == set(self.columns): + return + return Index( + self.substitute_parameters({"columns": columns, "_series": False}) + ) + + if isinstance(parent, Projection): + return super()._simplify_up(parent, dependents) + + def _simplify_down(self): + file_format = self.dataset.format.default_extname + if file_format == "csv": + return FromArrowDatasetCSV(*self.operands) + elif file_format == "json": + return FromArrowDatasetJSON(*self.operands) + elif file_format == "parquet": + return FromArrowDatasetParquet(*self.operands) + + +class FromArrowDatasetCSV(FromArrowDataset): + @classmethod + def _partial_fragment_to_table( + cls, + fragment, + schema, + filters, + split_index, + split_count, + options, + ): + # Calculate byte range for this read + path = fragment.path + filesystem = fragment.filesystem + size = filesystem.get_file_info(path).size + nbytes = size // split_count + offset = nbytes * split_index + if split_index == (split_count - 1): + nbytes = size - offset + + # Handle header and delimiter + add_header = b"" + row_delimiter = b"\n" + scan_options = fragment.format.default_fragment_scan_options + column_names = scan_options.column_names + skip_rows = scan_options.skip_rows + if split_index: + if not column_names and not skip_rows: + add_header = _read_byte_block( + path, + filesystem, + 0, + 1, + delimiter=row_delimiter, + ) + for _ in range(skip_rows): + add_header += row_delimiter + + # Read partial fragment + return fragment.format.make_fragment( + pa.py_buffer( + add_header + + _read_byte_block( + path, + filesystem, + offset, + nbytes, + delimiter=row_delimiter, + ) + ) + ).to_table( + filter=filters, + **options, + ) + + +class FromArrowDatasetJSON(FromArrowDataset): + @classmethod + def _partial_fragment_to_table( + cls, + fragment, + schema, + filters, + split_index, + split_count, + options, + ): + # Calculate byte range for this read + path = fragment.path + filesystem = fragment.filesystem + size = filesystem.get_file_info(path).size + nbytes = size // split_count + offset = nbytes * split_index + if split_index == (split_count - 1): + nbytes = size - offset + + # Read partial fragment + return fragment.format.make_fragment( + pa.py_buffer( + _read_byte_block( + path, + filesystem, + offset, + nbytes, + delimiter=b"\n", + ) + ) + ).to_table( + filter=filters, + **options, + ) + + +class FromArrowDatasetParquet(FromArrowDataset): + _scan_options = pa.dataset.ParquetFragmentScanOptions( + pre_buffer=True, + cache_options=pa.CacheOptions( + hole_size_limit=parse_bytes("4 MiB"), + range_size_limit=parse_bytes("32.00 MiB"), + ), + ) + + @classmethod + def _partial_fragment_to_table( + cls, + fragment, + filters, + schema, + split_index, + split_count, + options, + ): + # Parquet-specific partial read. + # (try to align reads with row-groups) + total_row_groups = fragment.num_row_groups + if total_row_groups < split_count: + # Cannot align with row-groups. + # Use catch-all logic + return FromArrowDataset._partial_fragment_to_table( + fragment, + filters, + schema, + split_index, + split_count, + options, + ) + + # Align with row-groups + rg_stride = total_row_groups // split_count + skip_rgs = rg_stride * split_index + rgs = list(range(total_row_groups)) + new_rgs = rgs[skip_rgs : skip_rgs + rg_stride] + if split_index == (split_count - 1): + new_rgs = rgs[skip_rgs:] + filters = _pa_filters(filters) + return fragment.subset( + fiter=filters, + schema=schema, + row_group_ids=new_rgs, + ).to_table( + schema=schema, + filter=filters, + **options, + ) + + +def _read_byte_block( + path, + filesystem, + offset, + nbytes, + delimiter=None, +): + # Use fsspec to read in a delimited byte range + with filesystem.open_input_file(path) as f: + block = read_block( + f, + offset, + nbytes, + delimiter, + ) + return block