-
-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathcsv.py
156 lines (131 loc) · 3.92 KB
/
csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import functools
import operator
from dask_expr._util import _convert_to_list
from dask_expr.io.io import BlockwiseIO, PartitionsFiltered
class ReadCSV(PartitionsFiltered, BlockwiseIO):
_parameters = [
"filename",
"columns",
"header",
"dtype_backend",
"_partitions",
"storage_options",
"kwargs",
"_cwd", # needed for tokenization
"_series",
]
_defaults = {
"columns": None,
"header": "infer",
"dtype_backend": None,
"kwargs": None,
"_partitions": None,
"storage_options": None,
"_series": False,
"_cwd": None,
}
_absorb_projections = True
@functools.cached_property
def operation(self):
from dask.dataframe.io import read_csv
return read_csv
@functools.cached_property
def _ddf(self):
# Temporary hack to simplify logic
kwargs = (
{"dtype_backend": self.dtype_backend}
if self.dtype_backend is not None
else {}
)
if self.kwargs is not None:
kwargs.update(self.kwargs)
columns = _convert_to_list(self.operand("columns"))
if columns is None:
pass
elif "include_path_column" in self.kwargs:
flag = self.kwargs["include_path_column"]
if flag is True:
column_to_remove = "path"
elif isinstance(flag, str):
column_to_remove = flag
else:
column_to_remove = None
columns = [c for c in columns if c != column_to_remove]
if not columns:
meta = self.operation(
self.filename,
header=self.header,
storage_options=self.storage_options,
**kwargs,
)._meta
columns = [list(meta.columns)[0]]
return self.operation(
self.filename,
usecols=columns,
header=self.header,
storage_options=self.storage_options,
**kwargs,
)
@functools.cached_property
def _meta(self):
return self._ddf._meta
@functools.cached_property
def columns(self):
columns_operand = self.operand("columns")
if columns_operand is None:
try:
return list(self._ddf._meta.columns)
except AttributeError:
return []
else:
return _convert_to_list(columns_operand)
def _divisions(self):
return self._ddf.divisions
@functools.cached_property
def _tasks(self):
return list(self._ddf.dask.to_dict().values())
def _filtered_task(self, index: int):
if self._series:
return (operator.getitem, self._tasks[index], self.columns[0])
return self._tasks[index]
class ReadTable(ReadCSV):
@functools.cached_property
def operation(self):
from dask.dataframe.io import read_table
return read_table
class ReadFwf(ReadCSV):
@functools.cached_property
def operation(self):
from dask.dataframe.io import read_fwf
return read_fwf
def to_csv(
df,
filename,
single_file=False,
encoding="utf-8",
mode="wt",
name_function=None,
compression=None,
compute=True,
scheduler=None,
storage_options=None,
header_first_partition_only=None,
compute_kwargs=None,
**kwargs,
):
from dask.dataframe.io.csv import to_csv as _to_csv
return _to_csv(
df.to_dask_dataframe(),
filename,
single_file=single_file,
encoding=encoding,
mode=mode,
name_function=name_function,
compression=compression,
compute=compute,
scheduler=scheduler,
storage_options=storage_options,
header_first_partition_only=header_first_partition_only,
compute_kwargs=compute_kwargs,
**kwargs,
)