Skip to content

Commit 763b33b

Browse files
hekaishengXuye (Chris) Qin
authored andcommitted
[BACKPORT] Support DataFrame read_csv (#807) (#826)
1 parent d418032 commit 763b33b

File tree

13 files changed

+517
-18
lines changed

13 files changed

+517
-18
lines changed

mars/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import os
1717
import sys
1818

19-
version_info = (0, 2, 2)
19+
version_info = (0, 2, 3)
2020
_num_index = max(idx if isinstance(v, int) else 0
2121
for idx, v in enumerate(version_info))
2222
__version__ = '.'.join(map(str, version_info[:_num_index + 1])) + \

mars/config.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,12 +303,14 @@ def validate(x):
303303
# the number of combined chunks in tree reduction or tree add
304304
default_options.register_option('combine_size', 4, validator=is_integer, serialize=True)
305305

306+
# the default chunk store size
307+
default_options.register_option('chunk_store_limit', 128 * 1024 ** 2, validator=is_numeric)
308+
306309
# deploy
307310
default_options.register_option('deploy.open_browser', True, validator=is_bool)
308311

309312
# Tensor
310313
default_options.register_option('tensor.chunk_size', None, validator=any_validator(is_null, is_integer), serialize=True)
311-
default_options.register_option('tensor.chunk_store_limit', 128 * 1024 ** 2, validator=is_numeric)
312314
default_options.register_option('tensor.rechunk.threshold', 4, validator=is_integer, serialize=True)
313315
default_options.register_option('tensor.rechunk.chunk_size_limit', int(1e8), validator=is_integer, serialize=True)
314316

@@ -373,3 +375,5 @@ def __setattr__(self, key, value):
373375

374376

375377
options = OptionsProxy()
378+
379+
options.redirect_option('tensor.chunk_store_limit', 'chunk_store_limit')

mars/dataframe/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
3-
# Copyright 1999-2018 Alibaba Group Holding Ltd.
3+
# Copyright 1999-2020 Alibaba Group Holding Ltd.
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
from .datasource.from_tensor import dataframe_from_tensor
2020
from .datasource.from_records import from_records
2121
from .utils import concat_tileable_chunks, get_fetch_op_cls, get_fuse_op_cls
22+
from .datasource.read_csv import read_csv
2223
from .fetch import DataFrameFetch, DataFrameFetchShuffle
2324

2425
from . import arithmetic
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
# Copyright 1999-2020 Alibaba Group Holding Ltd.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
from io import BytesIO
18+
19+
import pandas as pd
20+
import numpy as np
21+
22+
from ... import opcodes as OperandDef
23+
from ...config import options
24+
from ...utils import parse_readable_size, lazy_import
25+
from ...serialize import StringField, DictField, ListField, Int32Field, Int64Field, AnyField
26+
from ...filesystem import open_file, file_size, glob
27+
from ..utils import parse_index, build_empty_df
28+
from ..operands import DataFrameOperand, DataFrameOperandMixin, ObjectType
29+
30+
31+
cudf = lazy_import('cudf', globals=globals())
32+
33+
34+
def _find_chunk_start_end(f, offset, size):
35+
f.seek(offset)
36+
if f.tell() == 0:
37+
start = 0
38+
else:
39+
f.readline()
40+
start = f.tell()
41+
f.seek(offset + size)
42+
f.readline()
43+
end = f.tell()
44+
return start, end
45+
46+
47+
class DataFrameReadCSV(DataFrameOperand, DataFrameOperandMixin):
48+
_op_type_ = OperandDef.READ_CSV
49+
50+
_path = AnyField('path')
51+
_names = ListField('names')
52+
_sep = StringField('sep')
53+
_header = AnyField('header')
54+
_index_col = Int32Field('index_col')
55+
_compression = StringField('compression')
56+
_offset = Int64Field('offset')
57+
_size = Int64Field('size')
58+
59+
_storage_options = DictField('storage_options')
60+
61+
def __init__(self, path=None, names=None, sep=None, header=None, index_col=None, compression=None,
62+
offset=None, size=None, gpu=None, storage_options=None, **kw):
63+
super(DataFrameReadCSV, self).__init__(_path=path, _names=names, _sep=sep, _header=header,
64+
_index_col=index_col, _compression=compression,
65+
_offset=offset, _size=size, _gpu=gpu,
66+
_storage_options=storage_options,
67+
_object_type=ObjectType.dataframe, **kw)
68+
69+
@property
70+
def path(self):
71+
return self._path
72+
73+
@property
74+
def names(self):
75+
return self._names
76+
77+
@property
78+
def sep(self):
79+
return self._sep
80+
81+
@property
82+
def header(self):
83+
return self._header
84+
85+
@property
86+
def index_col(self):
87+
return self._index_col
88+
89+
@property
90+
def compression(self):
91+
return self._compression
92+
93+
@property
94+
def offset(self):
95+
return self._offset
96+
97+
@property
98+
def size(self):
99+
return self._size
100+
101+
@property
102+
def storage_options(self):
103+
return self._storage_options
104+
105+
@classmethod
106+
def _tile_compressed(cls, op):
107+
# Compression does not support break into small parts
108+
df = op.outputs[0]
109+
chunk_op = op.copy().reset_key()
110+
chunk_op._offset = 0
111+
chunk_op._size = file_size(op.path)
112+
shape = df.shape
113+
new_chunk = chunk_op.new_chunk(None, shape=shape, index=(0, 0), index_value=df.index_value,
114+
columns_value=df.columns_value, dtypes=df.dtypes)
115+
new_op = op.copy()
116+
nsplits = ((np.nan,), (df.shape[1],))
117+
return new_op.new_dataframes(None, df.shape, dtypes=df.dtypes,
118+
index_value=df.index_value,
119+
columns_value=df.columns_value,
120+
chunks=[new_chunk], nsplits=nsplits)
121+
122+
@classmethod
123+
def tile(cls, op):
124+
if op.compression:
125+
return cls._tile_compressed(op)
126+
127+
df = op.outputs[0]
128+
chunk_bytes = df.extra_params.chunk_bytes
129+
chunk_bytes = int(parse_readable_size(chunk_bytes)[0])
130+
131+
paths = op.path if isinstance(op.path, (tuple, list)) else [op.path]
132+
133+
out_chunks = []
134+
index_num = 0
135+
for path in paths:
136+
total_bytes = file_size(path)
137+
offset = 0
138+
for _ in range(int(np.ceil(total_bytes * 1.0 / chunk_bytes))):
139+
chunk_op = op.copy().reset_key()
140+
chunk_op._path = path
141+
chunk_op._offset = offset
142+
chunk_op._size = min(chunk_bytes, total_bytes - offset)
143+
shape = (np.nan, len(df.dtypes))
144+
new_chunk = chunk_op.new_chunk(None, shape=shape, index=(index_num, 0), index_value=df.index_value,
145+
columns_value=df.columns_value, dtypes=df.dtypes)
146+
out_chunks.append(new_chunk)
147+
index_num += 1
148+
offset += chunk_bytes
149+
150+
new_op = op.copy()
151+
nsplits = ((np.nan,) * len(out_chunks), (df.shape[1],))
152+
return new_op.new_dataframes(None, df.shape, dtypes=df.dtypes,
153+
index_value=df.index_value,
154+
columns_value=df.columns_value,
155+
chunks=out_chunks, nsplits=nsplits)
156+
157+
@classmethod
158+
def execute(cls, ctx, op):
159+
xdf = cudf if op.gpu else pd
160+
out_df = op.outputs[0]
161+
csv_kwargs = op.extra_params.copy()
162+
163+
with open_file(op.path, compression=op.compression, storage_options=op.storage_options) as f:
164+
if op.compression is not None:
165+
# As we specify names and dtype, we need to skip header rows
166+
csv_kwargs['skiprows'] = 1 if op.header == 'infer' else op.header
167+
df = xdf.read_csv(BytesIO(f.read()), sep=op.sep, names=op.names, index_col=op.index_col,
168+
dtype=out_df.dtypes.to_dict(), **csv_kwargs)
169+
else:
170+
start, end = _find_chunk_start_end(f, op.offset, op.size)
171+
f.seek(start)
172+
b = BytesIO(f.read(end - start))
173+
if end == start:
174+
# the last chunk may be empty
175+
df = build_empty_df(out_df.dtypes)
176+
else:
177+
if start == 0:
178+
# The first chunk contains header
179+
# As we specify names and dtype, we need to skip header rows
180+
csv_kwargs['skiprows'] = 1 if op.header == 'infer' else op.header
181+
df = xdf.read_csv(b, sep=op.sep, names=op.names, index_col=op.index_col,
182+
dtype=out_df.dtypes.to_dict(), **csv_kwargs)
183+
ctx[out_df.key] = df
184+
185+
def __call__(self, index_value=None, columns_value=None, dtypes=None, chunk_bytes=None):
186+
shape = (np.nan, len(dtypes))
187+
return self.new_dataframe(None, shape, dtypes=dtypes, index_value=index_value,
188+
columns_value=columns_value, chunk_bytes=chunk_bytes)
189+
190+
191+
def read_csv(path, names=None, sep=',', index_col=None, compression=None, header='infer', dtype=None,
192+
chunk_bytes=None, gpu=None, head_bytes='100k', head_lines=None, storage_options=None, **kwargs):
193+
"""
194+
Read comma-separated values (csv) file(s) into DataFrame.
195+
:param path: file path(s).
196+
:param names: List of column names to use. If file contains no header row,
197+
then you should explicitly pass header=None. Duplicates in this list are not allowed.
198+
:param sep:Delimiter to use, default is ','.
199+
:param index_col: Column(s) to use as the row labels of the DataFrame, either given as string name or column index.
200+
:param compression: For on-the-fly decompression of on-disk data.
201+
:param header: Row number(s) to use as the column names, and the start of the data.
202+
:param dtype: Data type for data or columns. E.g. {'a': np.float64, 'b': np.int32, 'c': 'Int64'}
203+
Use str or object together with suitable na_values settings to preserve and not interpret dtype.
204+
:param chunk_bytes: Number of chunk bytes.
205+
:param gpu: If read into cudf DataFrame.
206+
:param head_bytes: Number of bytes to use in the head of file, mainly for data inference.
207+
:param head_lines: Number of lines to use in the head of file, mainly for data inference.
208+
:param storage_options: Options for storage connection.
209+
:param kwargs:
210+
:return: Mars DataFrame.
211+
"""
212+
# infer dtypes and columns
213+
if isinstance(path, (list, tuple)):
214+
file_path = path[0]
215+
else:
216+
file_path = glob(path)[0]
217+
with open_file(file_path, compression=compression, storage_options=storage_options) as f:
218+
if head_lines is not None:
219+
b = b''.join([f.readline() for _ in range(head_lines)])
220+
else:
221+
head_bytes = int(parse_readable_size(head_bytes)[0])
222+
head_start, head_end = _find_chunk_start_end(f, 0, head_bytes)
223+
f.seek(head_start)
224+
b = f.read(head_end - head_start)
225+
mini_df = pd.read_csv(BytesIO(b), sep=sep, index_col=index_col, dtype=dtype, names=names, header=header)
226+
227+
if isinstance(mini_df.index, pd.RangeIndex):
228+
index_value = parse_index(pd.RangeIndex(0))
229+
else:
230+
index_value = parse_index(mini_df.index)
231+
columns_value = parse_index(mini_df.columns, store_data=True)
232+
if index_col and not isinstance(index_col, int):
233+
index_col = list(mini_df.columns).index(index_col)
234+
names = list(mini_df.columns)
235+
op = DataFrameReadCSV(path=path, names=names, sep=sep, header=header, index_col=index_col,
236+
compression=compression, gpu=gpu, storage_options=storage_options, **kwargs)
237+
chunk_bytes = chunk_bytes or options.chunk_store_limit
238+
return op(index_value=index_value, columns_value=columns_value,
239+
dtypes=mini_df.dtypes, chunk_bytes=chunk_bytes)

mars/dataframe/datasource/tests/test_datasource.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@
1313
# limitations under the License.
1414

1515
from weakref import ReferenceType
16-
import mars.tensor as mt
16+
import os
17+
import tempfile
18+
import shutil
1719

1820
import numpy as np
1921
import pandas as pd
2022

23+
import mars.tensor as mt
2124
from mars import opcodes as OperandDef
2225
from mars.graph import DAG
2326
from mars.tests.core import TestBase
@@ -26,6 +29,7 @@
2629
from mars.dataframe.datasource.series import from_pandas as from_pandas_series
2730
from mars.dataframe.datasource.from_tensor import dataframe_from_tensor, series_from_tensor
2831
from mars.dataframe.datasource.from_records import from_records
32+
from mars.dataframe.datasource.read_csv import read_csv, DataFrameReadCSV
2933

3034

3135
class Test(TestBase):
@@ -371,3 +375,23 @@ def testFromRecords(self):
371375
pd.testing.assert_index_equal(df.chunks[1].index_value.to_pandas(), pd.RangeIndex(3, 6))
372376
pd.testing.assert_index_equal(df.chunks[2].index_value.to_pandas(), pd.RangeIndex(6, 9))
373377
pd.testing.assert_index_equal(df.chunks[3].index_value.to_pandas(), pd.RangeIndex(9, 10))
378+
379+
def testReadCSV(self):
380+
tempdir = tempfile.mkdtemp()
381+
file_path = os.path.join(tempdir, 'test.csv')
382+
try:
383+
df = pd.DataFrame(np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]), columns=['a', 'b', 'c'], dtype=np.int64)
384+
df.to_csv(file_path)
385+
mdf = read_csv(file_path, index_col=0, chunk_bytes=10)
386+
self.assertIsInstance(mdf.op, DataFrameReadCSV)
387+
self.assertEqual(mdf.shape[1], 3)
388+
pd.testing.assert_index_equal(df.columns, mdf.columns_value.to_pandas())
389+
390+
mdf.tiles()
391+
self.assertEqual(len(mdf.chunks), 4)
392+
for chunk in mdf.chunks:
393+
pd.testing.assert_index_equal(df.columns, chunk.columns_value.to_pandas())
394+
pd.testing.assert_series_equal(df.dtypes, chunk.dtypes)
395+
finally:
396+
shutil.rmtree(tempdir)
397+

0 commit comments

Comments
 (0)