Skip to content

Commit 37a5116

Browse files
authored
Introduce ToBackend expression (#1115)
1 parent 2b8f765 commit 37a5116

File tree

3 files changed

+32
-10
lines changed

3 files changed

+32
-10
lines changed

Diff for: dask_expr/_backends.py

+16-10
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
import pandas as pd
55
from dask.backends import CreationDispatch
66
from dask.dataframe.backends import DataFrameBackendEntrypoint
7+
from dask.dataframe.dispatch import to_pandas_dispatch
78

89
from dask_expr._dispatch import get_collection_type
10+
from dask_expr._expr import ToBackend
911

1012
try:
1113
import sparse
@@ -32,25 +34,29 @@
3234
)
3335

3436

37+
class ToPandasBackend(ToBackend):
38+
@staticmethod
39+
def operation(df, options):
40+
return to_pandas_dispatch(df, **options)
41+
42+
def _simplify_down(self):
43+
if isinstance(self.frame._meta, (pd.DataFrame, pd.Series, pd.Index)):
44+
# We already have pandas data
45+
return self.frame
46+
47+
3548
class PandasBackendEntrypoint(DataFrameBackendEntrypoint):
3649
"""Pandas-Backend Entrypoint Class for Dask-Expressions
3750
3851
Note that all DataFrame-creation functions are defined
3952
and registered 'in-place'.
4053
"""
4154

42-
@classmethod
43-
def to_backend_dispatch(cls):
44-
from dask.dataframe.dispatch import to_pandas_dispatch
45-
46-
return to_pandas_dispatch
47-
4855
@classmethod
4956
def to_backend(cls, data, **kwargs):
50-
if isinstance(data._meta, (pd.DataFrame, pd.Series, pd.Index)):
51-
# Already a pandas-backed collection
52-
return data
53-
return data.map_partitions(cls.to_backend_dispatch(), **kwargs)
57+
from dask_expr._collection import new_collection
58+
59+
return new_collection(ToPandasBackend(data, kwargs))
5460

5561

5662
dataframe_creation_dispatch.register_backend("pandas", PandasBackendEntrypoint())

Diff for: dask_expr/_expr.py

+7
Original file line numberDiff line numberDiff line change
@@ -1303,6 +1303,13 @@ def operation(df):
13031303
return df.copy(deep=True)
13041304

13051305

1306+
class ToBackend(Elemwise):
1307+
_parameters = ["frame", "options"]
1308+
_projection_passthrough = True
1309+
_filter_passthrough = True
1310+
_preserves_partitioning_information = True
1311+
1312+
13061313
class RenameSeries(Elemwise):
13071314
_parameters = ["frame", "index", "sorted_index"]
13081315
_defaults = {"sorted_index": False}

Diff for: dask_expr/tests/test_collection.py

+9
Original file line numberDiff line numberDiff line change
@@ -2665,3 +2665,12 @@ def test_empty_from_pandas_projection():
26652665
df["foo"] = from_pandas(foo, npartitions=1)
26662666
pdf["foo"] = foo
26672667
assert_eq(df["foo"], pdf["foo"])
2668+
2669+
2670+
def test_to_backend_simplify():
2671+
with dask.config.set({"dataframe.backend": "pandas"}):
2672+
df = from_dict({"x": [1, 2, 3], "y": [4, 5, 6]}, npartitions=2)
2673+
df2 = df.to_backend("pandas")[["y"]]
2674+
assert str(df2.expr) != str(df[["y"]].expr)
2675+
df3 = df2.simplify()
2676+
assert str(df3.expr) == str(df[["y"]].expr)

0 commit comments

Comments
 (0)