Skip to content

Commit 69a647f

Browse files
committed
Remove custom read-csv stuff
1 parent 77d0f89 commit 69a647f

File tree

4 files changed

+26
-193
lines changed

4 files changed

+26
-193
lines changed

ci/environment.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@ dependencies:
1313
- xarray
1414
- pip:
1515
- git+https://github.com/dask/distributed
16-
- git+https://github.com/dask/dask
16+
- git+https://github.com/phofl/dask@read-csv-legacy

dask_expr/_collection.py

Lines changed: 23 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5131,23 +5131,17 @@ def read_csv(
51315131
path,
51325132
*args,
51335133
header="infer",
5134-
dtype_backend=None,
51355134
storage_options=None,
51365135
**kwargs,
51375136
):
5138-
from dask_expr.io.csv import ReadCSV
5137+
from dask.dataframe.io.csv import read_csv as _read_csv
51395138

5140-
if not isinstance(path, str):
5141-
path = stringify_path(path)
5142-
return new_collection(
5143-
ReadCSV(
5144-
path,
5145-
dtype_backend=dtype_backend,
5146-
storage_options=storage_options,
5147-
kwargs=kwargs,
5148-
header=header,
5149-
dataframe_backend="pandas",
5150-
)
5139+
return _read_csv(
5140+
path,
5141+
*args,
5142+
header=header,
5143+
storage_options=storage_options,
5144+
**kwargs,
51515145
)
51525146

51535147

@@ -5156,23 +5150,18 @@ def read_table(
51565150
*args,
51575151
header="infer",
51585152
usecols=None,
5159-
dtype_backend=None,
51605153
storage_options=None,
51615154
**kwargs,
51625155
):
5163-
from dask_expr.io.csv import ReadTable
5156+
from dask.dataframe.io.csv import read_table as _read_table
51645157

5165-
if not isinstance(path, str):
5166-
path = stringify_path(path)
5167-
return new_collection(
5168-
ReadTable(
5169-
path,
5170-
columns=usecols,
5171-
dtype_backend=dtype_backend,
5172-
storage_options=storage_options,
5173-
kwargs=kwargs,
5174-
header=header,
5175-
)
5158+
return _read_table(
5159+
path,
5160+
*args,
5161+
header=header,
5162+
storage_options=storage_options,
5163+
usecols=usecols,
5164+
**kwargs,
51765165
)
51775166

51785167

@@ -5181,23 +5170,18 @@ def read_fwf(
51815170
*args,
51825171
header="infer",
51835172
usecols=None,
5184-
dtype_backend=None,
51855173
storage_options=None,
51865174
**kwargs,
51875175
):
5188-
from dask_expr.io.csv import ReadFwf
5176+
from dask.dataframe.io.csv import read_fwf as _read_fwf
51895177

5190-
if not isinstance(path, str):
5191-
path = stringify_path(path)
5192-
return new_collection(
5193-
ReadFwf(
5194-
path,
5195-
columns=usecols,
5196-
dtype_backend=dtype_backend,
5197-
storage_options=storage_options,
5198-
kwargs=kwargs,
5199-
header=header,
5200-
)
5178+
return _read_fwf(
5179+
path,
5180+
*args,
5181+
header=header,
5182+
storage_options=storage_options,
5183+
usecols=usecols,
5184+
**kwargs,
52015185
)
52025186

52035187

dask_expr/io/csv.py

Lines changed: 0 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -1,154 +1,3 @@
1-
import functools
2-
import operator
3-
4-
from dask._task_spec import Task
5-
from dask.typing import Key
6-
7-
from dask_expr._expr import Projection
8-
from dask_expr._util import _convert_to_list
9-
from dask_expr.io.io import BlockwiseIO, PartitionsFiltered
10-
11-
12-
class ReadCSV(PartitionsFiltered, BlockwiseIO):
13-
_parameters = [
14-
"filename",
15-
"columns",
16-
"header",
17-
"dtype_backend",
18-
"_partitions",
19-
"storage_options",
20-
"kwargs",
21-
"_series",
22-
"dataframe_backend",
23-
]
24-
_defaults = {
25-
"columns": None,
26-
"header": "infer",
27-
"dtype_backend": None,
28-
"kwargs": None,
29-
"_partitions": None,
30-
"storage_options": None,
31-
"_series": False,
32-
"dataframe_backend": "pandas",
33-
}
34-
_absorb_projections = True
35-
36-
@functools.cached_property
37-
def operation(self):
38-
from dask.dataframe.io import read_csv
39-
40-
return read_csv
41-
42-
@functools.cached_property
43-
def _ddf(self):
44-
from dask import config
45-
46-
# Temporary hack to simplify logic
47-
with config.set({"dataframe.backend": self.dataframe_backend}):
48-
kwargs = (
49-
{"dtype_backend": self.dtype_backend}
50-
if self.dtype_backend is not None
51-
else {}
52-
)
53-
if self.kwargs is not None:
54-
kwargs.update(self.kwargs)
55-
56-
columns = _convert_to_list(self.operand("columns"))
57-
if columns is None:
58-
pass
59-
elif "include_path_column" in self.kwargs:
60-
flag = self.kwargs["include_path_column"]
61-
if flag is True:
62-
column_to_remove = "path"
63-
elif isinstance(flag, str):
64-
column_to_remove = flag
65-
else:
66-
column_to_remove = None
67-
68-
columns = [c for c in columns if c != column_to_remove]
69-
70-
if not columns:
71-
meta = self.operation(
72-
self.filename,
73-
header=self.header,
74-
storage_options=self.storage_options,
75-
**kwargs,
76-
)._meta
77-
columns = [list(meta.columns)[0]]
78-
79-
usecols = kwargs.pop("usecols", None)
80-
if usecols is not None and columns is not None:
81-
columns = [col for col in columns if col in usecols]
82-
elif usecols:
83-
columns = usecols
84-
85-
return self.operation(
86-
self.filename,
87-
usecols=columns,
88-
header=self.header,
89-
storage_options=self.storage_options,
90-
**kwargs,
91-
)
92-
93-
@functools.cached_property
94-
def _meta(self):
95-
return self._ddf._meta
96-
97-
def _simplify_up(self, parent, dependents):
98-
if isinstance(parent, Projection):
99-
kwargs = self.kwargs
100-
# int usecols are positional, so block projections
101-
if kwargs.get("usecols", None) is not None and isinstance(
102-
kwargs.get("usecols")[0], int
103-
):
104-
return
105-
return super()._simplify_up(parent, dependents)
106-
107-
@functools.cached_property
108-
def columns(self):
109-
columns_operand = self.operand("columns")
110-
if columns_operand is None:
111-
try:
112-
return list(self._ddf._meta.columns)
113-
except AttributeError:
114-
return []
115-
else:
116-
return _convert_to_list(columns_operand)
117-
118-
def _divisions(self):
119-
return self._ddf.divisions
120-
121-
@functools.cached_property
122-
def _tasks(self):
123-
from dask._task_spec import convert_legacy_graph
124-
125-
return list(convert_legacy_graph(self._ddf.dask.to_dict()).values())
126-
127-
def _filtered_task(self, name: Key, index: int) -> Task:
128-
if self._series:
129-
return Task(name, operator.getitem, self._tasks[index], self.columns[0])
130-
t = self._tasks[index]
131-
if t.key != name:
132-
return Task(name, lambda x: x, t)
133-
return t
134-
135-
136-
class ReadTable(ReadCSV):
137-
@functools.cached_property
138-
def operation(self):
139-
from dask.dataframe.io import read_table
140-
141-
return read_table
142-
143-
144-
class ReadFwf(ReadCSV):
145-
@functools.cached_property
146-
def operation(self):
147-
from dask.dataframe.io import read_fwf
148-
149-
return read_fwf
150-
151-
1521
def to_csv(
1532
df,
1543
filename,

dask_expr/io/tests/test_io.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
read_parquet,
2424
)
2525
from dask_expr._expr import Expr, Replace
26-
from dask_expr.io import FromArray, FromMap, ReadCSV, ReadParquet, parquet
26+
from dask_expr.io import FromArray, FromMap, ReadParquet, parquet
2727
from dask_expr.tests._util import _backend_library
2828

2929
# Set DataFrame backend for this module
@@ -257,7 +257,7 @@ def test_to_dask_array(optimize):
257257

258258
@pytest.mark.parametrize(
259259
"fmt,read_func,read_cls",
260-
[("parquet", read_parquet, ReadParquet), ("csv", read_csv, ReadCSV)],
260+
[("parquet", read_parquet, ReadParquet), ("csv", read_csv, FromMap)],
261261
)
262262
def test_combine_similar(tmpdir, fmt, read_func, read_cls):
263263
pdf = pd.DataFrame(

0 commit comments

Comments
 (0)