-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathpyarrow.py
More file actions
36 lines (26 loc) · 1.07 KB
/
pyarrow.py
File metadata and controls
36 lines (26 loc) · 1.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator, Iterator
from typing import Any
RecordType = dict[Any, Any]
async def read_dataset_records(path: str, batch_size: int | None) -> AsyncIterator[RecordType]:
iterator = read_dataset_records_sync(path=path, batch_size=batch_size)
def get_next() -> RecordType | None:
try:
return next(iterator)
except StopIteration:
return None
while True:
item = await asyncio.to_thread(get_next)
if item is None:
return
yield item
def read_dataset_records_sync(path: str, batch_size: int | None) -> Iterator[RecordType]:
import pyarrow.dataset as pd # pylint: disable=import-outside-toplevel
# we need use kwargs method to preserve original default value
kwargs = {}
if batch_size is not None:
kwargs['batch_size'] = batch_size
dataset = pd.dataset(source=path, format='parquet')
for batch in dataset.to_batches(**kwargs): # type: ignore[arg-type]
yield from batch.to_pylist()