Skip to content
This repository was archived by the owner on Feb 8, 2023. It is now read-only.

Commit 3564752

Browse files
authored
BUG: Fix read_parquet with latest pyarrow (#135)
1 parent 634e1ef commit 3564752

File tree

4 files changed

+43
-32
lines changed

4 files changed

+43
-32
lines changed

.github/workflows/platform-ci.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ jobs:
5555
if: ${{ matrix.with-kubernetes }}
5656
with:
5757
driver: none
58+
kubernetes-version: v1.23.12
5859
uses: medyagh/setup-minikube@master
5960

6061
- name: Install dependencies
@@ -105,7 +106,7 @@ jobs:
105106
rm -fr /tmp/etcd-$ETCD_VER-linux-amd64.tar.gz /tmp/etcd-download-test
106107
fi
107108
if [ -n "$WITH_RAY" ] || [ -n "$WITH_RAY_DAG" ] || [ -n "$WITH_RAY_DEPLOY" ]; then
108-
pip install "xgboost_ray" "protobuf<4"
109+
pip install "xgboost_ray" "protobuf<4" "sqlalchemy<2"
109110
# Ray Datasets need pyarrow>=6.0.1
110111
pip install "pyarrow>=6.0.1"
111112
fi

azure-pipelines.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ jobs:
6464
else
6565
pip install numpy scipy cython
6666
fi
67+
if [[ "$(mars.test.module)" == "dataframe" ]]; then
68+
pip install sqlalchemy\<2
69+
fi
70+
6771
pip install -e ".[dev,extra]"
6872
pip install virtualenv flaky
6973

mars/dataframe/datasource/read_parquet.py

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
# limitations under the License.
1616

1717
import os
18-
import pickle
19-
from typing import List, Tuple
18+
from typing import Dict
2019
from urllib.parse import urlparse
2120

2221
import numpy as np
@@ -44,7 +43,6 @@
4443
StringField,
4544
Int32Field,
4645
Int64Field,
47-
BytesField,
4846
)
4947
from ...utils import is_object_dtype, lazy_import
5048
from ..arrays import ArrowStringDtype
@@ -116,8 +114,8 @@ def read_group_to_pandas(
116114
def read_partitioned_to_pandas(
117115
self,
118116
f,
119-
partitions: "pq.ParquetPartitions",
120-
partition_keys: List[Tuple],
117+
partitions: Dict,
118+
partition_keys: Dict,
121119
columns=None,
122120
nrows=None,
123121
use_arrow_dtype=None,
@@ -126,11 +124,10 @@ def read_partitioned_to_pandas(
126124
raw_df = self.read_to_pandas(
127125
f, columns=columns, nrows=nrows, use_arrow_dtype=use_arrow_dtype, **kwargs
128126
)
129-
for i, (name, index) in enumerate(partition_keys):
130-
dictionary = partitions[i].dictionary
131-
value = dictionary[index]
132-
raw_df[name] = pd.Series(
133-
value.as_py(),
127+
for col, value in partition_keys.items():
128+
dictionary = partitions[col]
129+
raw_df[col] = pd.Series(
130+
value,
134131
dtype=pd.CategoricalDtype(categories=dictionary.tolist()),
135132
index=raw_df.index,
136133
)
@@ -249,8 +246,8 @@ def read_group_to_cudf(
249246
def read_partitioned_to_cudf(
250247
cls,
251248
file,
252-
partitions: "pq.ParquetPartitions",
253-
partition_keys: List[Tuple],
249+
partitions: Dict,
250+
partition_keys: Dict,
254251
columns=None,
255252
nrows=None,
256253
**kwargs,
@@ -262,11 +259,13 @@ def read_partitioned_to_cudf(
262259
t = t.slice(0, nrows) if nrows is not None else t
263260
t = pa.table(t.columns, names=t.column_names)
264261
raw_df = cudf.DataFrame.from_arrow(t)
265-
for i, (name, index) in enumerate(partition_keys):
266-
dictionary = partitions[i].dictionary
267-
codes = cudf.core.column.as_column(index, length=len(raw_df))
268-
raw_df[name] = cudf.core.column.build_categorical_column(
269-
categories=dictionary.tolist(),
262+
for col, value in partition_keys.items():
263+
dictionary = partitions[col].tolist()
264+
codes = cudf.core.column.as_column(
265+
dictionary.index(value), length=len(raw_df)
266+
)
267+
raw_df[col] = cudf.core.column.build_categorical_column(
268+
categories=dictionary,
270269
codes=codes,
271270
size=codes.size,
272271
offset=codes.offset,
@@ -295,8 +294,8 @@ class DataFrameReadParquet(
295294
merge_small_files = BoolField("merge_small_files")
296295
merge_small_file_options = DictField("merge_small_file_options")
297296
# for chunk
298-
partitions = BytesField("partitions", default=None)
299-
partition_keys = ListField("partition_keys", default=None)
297+
partitions = DictField("partitions", default=None)
298+
partition_keys = DictField("partition_keys", default=None)
300299
num_group_rows = Int64Field("num_group_rows", default=None)
301300
# as read meta may be too time-consuming when number of files is large,
302301
# thus we only read first file to get row number and raw file size
@@ -325,21 +324,30 @@ def _tile_partitioned(cls, op: "DataFrameReadParquet"):
325324
out_df = op.outputs[0]
326325
shape = (np.nan, out_df.shape[1])
327326
dtypes = cls._to_arrow_dtypes(out_df.dtypes, op)
328-
dataset = pq.ParquetDataset(op.path)
327+
dataset = pq.ParquetDataset(op.path, use_legacy_dataset=False)
329328

330329
path_prefix = _parse_prefix(op.path)
331330

332331
chunk_index = 0
333332
out_chunks = []
334333
first_chunk_row_num, first_chunk_raw_bytes = None, None
335-
for i, piece in enumerate(dataset.pieces):
334+
for i, fragment in enumerate(dataset.fragments):
336335
chunk_op = op.copy().reset_key()
337-
chunk_op.path = chunk_path = path_prefix + piece.path
338-
chunk_op.partitions = pickle.dumps(dataset.partitions)
339-
chunk_op.partition_keys = piece.partition_keys
336+
chunk_op.path = chunk_path = path_prefix + fragment.path
337+
relpath = os.path.relpath(chunk_path, op.path)
338+
partition_keys = dict(
339+
tuple(s.split("=")) for s in relpath.split(os.sep)[:-1]
340+
)
341+
chunk_op.partition_keys = partition_keys
342+
chunk_op.partitions = dict(
343+
zip(
344+
dataset.partitioning.schema.names, dataset.partitioning.dictionaries
345+
)
346+
)
340347
if i == 0:
341-
first_chunk_raw_bytes = file_size(chunk_path, op.storage_options)
342-
first_chunk_row_num = piece.get_metadata().num_rows
348+
first_row_group = fragment.row_groups[0]
349+
first_chunk_raw_bytes = first_row_group.total_byte_size
350+
first_chunk_row_num = first_row_group.num_rows
343351
chunk_op.first_chunk_row_num = first_chunk_row_num
344352
chunk_op.first_chunk_raw_bytes = first_chunk_raw_bytes
345353
new_chunk = chunk_op.new_chunk(
@@ -458,11 +466,10 @@ def _tile(cls, op: "DataFrameReadParquet"):
458466
def _execute_partitioned(cls, ctx, op: "DataFrameReadParquet"):
459467
out = op.outputs[0]
460468
engine = get_engine(op.engine)
461-
partitions = pickle.loads(op.partitions)
462469
with open_file(op.path, storage_options=op.storage_options) as f:
463470
ctx[out.key] = engine.read_partitioned_to_pandas(
464471
f,
465-
partitions,
472+
op.partitions,
466473
op.partition_keys,
467474
columns=op.columns,
468475
nrows=op.nrows,
@@ -516,10 +523,9 @@ def _cudf_read_parquet(cls, ctx: dict, op: "DataFrameReadParquet"):
516523

517524
try:
518525
if op.partitions is not None:
519-
partitions = pickle.loads(op.partitions)
520526
ctx[out.key] = engine.read_partitioned_to_cudf(
521527
file,
522-
partitions,
528+
op.partitions,
523529
op.partition_keys,
524530
columns=op.columns,
525531
nrows=op.nrows,

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ dev =
6767
black
6868
extra =
6969
pillow>=7.0.0
70-
pyarrow>=0.11.0,!=0.16.*
70+
pyarrow>=5.0.0
7171
lz4>=1.0.0
7272
fsspec>=2022.7.1,!=2022.8.0
7373
kubernetes =

0 commit comments

Comments
 (0)