Skip to content

[WIP][POC] Basic from_arrow_dataset API #1175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5492,6 +5492,77 @@ def read_parquet(
)


def from_arrow_dataset(
dataset,
columns=None,
filters=None,
blocksize=None,
path_column=None,
fragment_to_table_options=None,
table_to_dataframe_options=None,
custom_backend_options=None,
):
"""Read a PyArrow Dataset into a Dask DataFrame

Parameters
----------
dataset : pyarrow.dataset.Dataset
Arrow Dataset object to convert to Dask DataFrame.
columns : str or list, default None
Field name(s) to read in as columns in the output. By default all
non-index fields will be read (as determined by the arrow Dataset
schema). Provide a single field name instead of a list to
read in the data as a Series.
filters : Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], default None
List of filters to apply, like ``[[('col1', '==', 0), ...], ...]``.
Using this argument will result in row-wise filtering of the final partitions.

Predicates can be expressed in disjunctive normal form (DNF). This means that
the inner-most tuple describes a single column predicate. These inner predicates
are combined with an AND conjunction into a larger predicate. The outer-most
list then combines all of the combined filters with an OR disjunction.

Predicates can also be expressed as a ``List[Tuple]``. These are evaluated
as an AND conjunction. To express OR in predicates, one must use the
(preferred for "pyarrow") ``List[List[Tuple]]`` notation.
blocksize : int, str or None, default None
The desired size of each output ``DataFrame`` partition in terms of
approximate (uncompressed) storage space. If None is specified
(the default) each file will be mapped to a distinct partition.
path_column: str, default None
Optional column name to use for additional path information. By default,
a path column will not be included in the output DataFrame.
fragment_to_table_options: dict, default None
Optional arguments for fragment-to-table conversion.
table_to_dataframe_options: dict, default None
Optional arguments for table-to-dataframe conversion.
custom_backend_options: dict, default None
Optional backend arguments. These options are passed to the underlying
``FromArrowDataset.read_fragments`` implementation.

Examples
--------
>>> from pyarrow.dataset import dataset # doctest: +SKIP
>>> ds = dataset('/my/parquet/dataset', format='parquet') # doctest: +SKIP
>>> df = dd.from_arrow_dataset(ds, blocksize="100MiB") # doctest: +SKIP
"""
from dask_expr.io.arrow import FromArrowDataset

return new_collection(
FromArrowDataset(
dataset,
columns=_convert_to_list(columns),
filters=filters,
blocksize=blocksize,
path_column=path_column,
fragment_to_table_options=fragment_to_table_options,
table_to_dataframe_options=table_to_dataframe_options,
custom_backend_options=custom_backend_options,
_series=isinstance(columns, str),
)
)


def concat(
dfs,
axis=0,
Expand Down
Loading
Loading