Skip to content

Commit b3b1ed9

Browse files
authored
BUG: fix globbed csv to chunked dataframe (#481)
* BUG: fix globbed csv to chunked dataframe
1 parent f3c2012 commit b3b1ed9

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed

odo/backends/csv.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import pandas as pd
1717

18+
from dask.threaded import get as dsk_get
1819
import datashape
1920

2021
from datashape import discover, Record, Option
@@ -366,8 +367,23 @@ def resource_glob(uri, **kwargs):
366367
@convert.register(chunks(pd.DataFrame), (chunks(CSV), chunks(Temp(CSV))),
367368
cost=10.0)
368369
def convert_glob_of_csvs_to_chunks_of_dataframes(csvs, **kwargs):
369-
data = [partial(convert, chunks(pd.DataFrame), csv, **kwargs) for csv in csvs]
370-
return chunks(pd.DataFrame)(data)
370+
f = partial(convert, chunks(pd.DataFrame), **kwargs)
371+
372+
def df_gen():
373+
# build up a dask graph to run all of the `convert` calls concurrently
374+
375+
# use a list to hold the requested key names to ensure that we return
376+
# the results in the correct order
377+
p = []
378+
dsk = {}
379+
for n, csv_ in enumerate(csvs):
380+
key = 'p%d' % n
381+
dsk[key] = f, csv_
382+
p.append(key)
383+
384+
return concat(dsk_get(dsk, p))
385+
386+
return chunks(pd.DataFrame)(df_gen)
371387

372388

373389
@convert.register(Temp(CSV), (pd.DataFrame, chunks(pd.DataFrame)))

odo/backends/tests/test_csv.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,3 +462,19 @@ def test_string_n_convert(string_dshape):
462462
expected = pd.DataFrame(raw, columns=list('kn'))
463463
expected['k'] = pd.to_datetime(expected.k)
464464
tm.assert_frame_equal(result, expected)
465+
466+
467+
def test_globbed_csv_to_chunks_of_dataframe():
468+
header = 'a,b,c\n'
469+
d = {'a-1.csv': header + '1,2,3\n4,5,6\n',
470+
'a-2.csv': header + '7,8,9\n10,11,12\n'}
471+
472+
with filetexts(d):
473+
dfs = list(odo('a-*.csv', chunks(pd.DataFrame)))
474+
475+
assert len(dfs) == 2
476+
columns = 'a', 'b', 'c'
477+
tm.assert_frame_equal(dfs[0],
478+
pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=columns))
479+
tm.assert_frame_equal(dfs[1],
480+
pd.DataFrame([[7, 8, 9], [10, 11, 12]], columns=columns))

0 commit comments

Comments
 (0)