-
Notifications
You must be signed in to change notification settings - Fork 7.3k
Description
What happened + What you expected to happen
ray.data.read_parquet() fails with ArrowNotImplementedError: Nested data conversions not implemented for chunked array outputs when reading Parquet files containing nested column types (list, map, struct with nested fields).
The error occurs in the ReadParquet operator before any user code executes. The expectation is that read_parquet() should be able to read Parquet files with nested types without error.
Stack Trace
ray.exceptions.RayTaskError(ArrowNotImplementedError): ray::ReadParquet() (pid=2617, ip=10.150.213.97)
File "ray/data/_internal/execution/operators/map_transformer.py", line 102, in call
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "ray/data/_internal/execution/operators/map_transformer.py", line 84, in shapeblocks
yield from self._post_process(results)
File "ray/data/_internal/execution/operators/map_transformer.py", line 412, in applytransform
for result in results:
File "ray/data/_internal/planner/plan_read_op.py", line 107, in do_read
yield from read_task()
File "ray/data/datasource/datasource.py", line 218, in call
yield from result
File "ray/data/_internal/datasource/parquet_datasource.py", line 472, in read_fragments
for table in iterate_with_retry(
File "ray/data/_internal/util.py", line 1434, in iterate_with_retry
raise e from None
File "ray/data/_internal/util.py", line 1415, in iterate_with_retry
for item_index, item in enumerate(iterable):
File "ray/data/_internal/datasource/parquet_datasource.py", line 527, in read_batches_from
for batch in fragment.to_batches(
File "pyarrow/_dataset.pyx", line 3934, in _iterator
File "pyarrow/_dataset.pyx", line 3524, in pyarrow._dataset.TaggedRecordBatchIterator.next
File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: Nested data conversions not implemented for chunked array outputs
Likely Root Cause
upstream PyArrow limitation:
- https://issues.apache.org/jira/browse/ARROW-5030
- [Python] read_row_group fails with Nested data conversions not implemented for chunked array outputs apache/arrow#21526
- [C++][Parquet] Support nested data conversions for chunked array apache/arrow#32723
When Parquet row groups contain nested columns whose binary/string data exceeds PyArrow's internal chunking threshold (~2GB), the binary array builder splits the data into chunks. The nested array reconstruction code (WrapIntoListArray) cannot handle chunked arrays, raising ArrowNotImplementedError.
Versions / Dependencies
Ray: 2.50.0
PyArrow: 19.0.1
Python: 3.9
Reproduction script
import os
import random
import string
import tempfile
import pyarrow as pa
import pyarrow.parquet as pq
def random_bytes(length: int) -> bytes:
return os.urandom(length)
def random_string(length: int) -> str:
return "".join(random.choices(string.ascii_letters + string.digits, k=length))
def create_large_nested_parquet(output_dir: str) -> str:
os.makedirs(output_dir, exist_ok=True)
num_rows = 2500
large_payload_size = 100_000
items_per_row = 10
print(f"Creating {num_rows} rows with ~{items_per_row * large_payload_size / 1024 / 1024:.1f}MB "
f"nested data per row...")
print(f"Target row group size: ~{num_rows * items_per_row * large_payload_size / 1024 / 1024 / 1024:.1f}GB")
template_ids = [f"tmpl_{i:06d}" for i in range(num_rows)]
page_indices = [i % 10 for i in range(num_rows)]
export_ids = [f"exp_{(i // 100):04d}" for i in range(num_rows)]
print("Generating media data (this may take a minute)...")
media_data = []
for i in range(num_rows):
row_items = [
{
"url": f"https://cdn.example.com/{random_string(50)}.bin",
"content": random_string(large_payload_size),
"width": random.randint(100, 4000),
"height": random.randint(100, 4000),
"mime_type": "application/octet-stream",
}
for _ in range(items_per_row)
]
media_data.append(row_items)
if (i + 1) % 500 == 0:
print(f" Generated {i + 1}/{num_rows} rows...")
print("Generating fonts data...")
fonts_data = [
[
{
"family": random_string(200),
"data": random_string(50_000),
"style": "normal",
"weight": 400,
}
for _ in range(5)
]
for _ in range(num_rows)
]
schema = pa.schema([
("template_id", pa.string()),
("page_index", pa.int32()),
("export_id", pa.string()),
("media", pa.list_(pa.struct([
("url", pa.string()),
("content", pa.string()),
("width", pa.int32()),
("height", pa.int32()),
("mime_type", pa.string()),
]))),
("fonts", pa.list_(pa.struct([
("family", pa.string()),
("data", pa.string()),
("style", pa.string()),
("weight", pa.int32()),
]))),
])
table = pa.table(
{
"template_id": template_ids,
"page_index": page_indices,
"export_id": export_ids,
"media": media_data,
"fonts": fonts_data,
},
schema=schema,
)
file_path = os.path.join(output_dir, "part-00000.parquet")
print(f"Writing parquet (single row group)...")
pq.write_table(
table,
file_path,
row_group_size=num_rows,
use_dictionary=False,
)
file_size_mb = os.path.getsize(file_path) / 1024 / 1024
print(f"Wrote {file_path} ({num_rows} rows, {file_size_mb:.0f}MB on disk)")
return output_dir
def test_pyarrow_direct(data_dir: str) -> bool:
import pyarrow.dataset as ds
print("\n=== Test 1: PyArrow fragment.to_batches() directly ===")
dataset = ds.dataset(data_dir, format="parquet")
for fragment in dataset.get_fragments():
for batch_size in [100, 500, 1000, 2500]:
try:
count = 0
for batch in fragment.to_batches(batch_size=batch_size):
count += batch.num_rows
print(f" batch_size={batch_size}: OK ({count} rows)")
except Exception as e:
print(f" batch_size={batch_size}: ERROR: {type(e).__name__}: {e}")
return True
return False
def test_pyarrow_read_table(data_dir: str) -> bool:
print("\n=== Test 2: pq.read_table() directly ===")
try:
import glob
for f in sorted(glob.glob(os.path.join(data_dir, "*.parquet"))):
table = pq.read_table(f)
print(f" {f}: OK ({table.num_rows} rows)")
return False
except Exception as e:
print(f" ERROR: {type(e).__name__}: {e}")
return True
def test_pyarrow_read_table_to_pandas(data_dir: str) -> bool:
print("\n=== Test 3: pq.read_table().to_pandas() ===")
try:
import glob
for f in sorted(glob.glob(os.path.join(data_dir, "*.parquet"))):
table = pq.read_table(f)
df = table.to_pandas()
print(f" {f}: OK ({len(df)} rows)")
return False
except Exception as e:
print(f" ERROR: {type(e).__name__}: {e}")
return True
def test_ray_data(data_dir: str) -> bool:
import ray
import ray.data
ray.init(ignore_reinit_error=True)
print("\n=== Test 4: ray.data.read_parquet() ===")
ctx = ray.data.DataContext.get_current()
ctx.target_max_block_size = int(1.5 * 1024 * 1024 * 1024)
try:
ds = ray.data.read_parquet(data_dir)
for batch in ds.iter_batches(batch_format="pyarrow", batch_size=100):
pass
print(f" OK: Iterated all batches without error")
print(f" Ray: {ray.__version__}, PyArrow: {pa.__version__}")
return False
except Exception as e:
print(f" ERROR (reproduced!): {type(e).__name__}: {e}")
print(f" Ray: {ray.__version__}, PyArrow: {pa.__version__}")
return True
def test_ray_data_map_batches_pandas(data_dir: str) -> bool:
import ray
import ray.data
ray.init(ignore_reinit_error=True)
print("\n=== Test 5: ray.data.read_parquet() + map_batches(pandas) ===")
print(" (This matches the customer's code path)")
ctx = ray.data.DataContext.get_current()
ctx.target_max_block_size = int(1.5 * 1024 * 1024 * 1024)
def identity_fn(batch):
return batch
try:
ds = ray.data.read_parquet(data_dir)
result = ds.map_batches(identity_fn, batch_format="pandas", batch_size=500)
count = result.count()
print(f" OK: Processed {count} rows without error")
return False
except Exception as e:
print(f" ERROR (reproduced!): {type(e).__name__}: {e}")
return True
if __name__ == "__main__":
print(f"PyArrow version: {pa.__version__}")
print()
with tempfile.TemporaryDirectory(prefix="repro_13727_") as tmpdir:
data_dir = os.path.join(tmpdir, "nested_parquet")
create_large_nested_parquet(data_dir)
reproduced = False
reproduced = test_pyarrow_direct(data_dir) or reproduced
reproduced = test_pyarrow_read_table(data_dir) or reproduced
reproduced = test_pyarrow_read_table_to_pandas(data_dir) or reproduced
reproduced = test_ray_data(data_dir) or reproduced
reproduced = test_ray_data_map_batches_pandas(data_dir) or reproduced
if reproduced:
print("\n" + "=" * 60)
print("Bug REPRODUCED!")
print("=" * 60)
else:
print("\n" + "=" * 60)
print("Bug did NOT reproduce.")
print()
print("The ~2GB threshold may not have been reached.")
print("To increase data size, edit the script and increase:")
print(" - num_rows (currently 2500)")
print(" - large_payload_size (currently 100KB)")
print(" - items_per_row (currently 10)")
print()
print("Or try increasing to 5000 rows:")
print(" num_rows = 5000 # ~5GB nested data")
print("=" * 60)
Issue Severity
None
Metadata
Metadata
Assignees
Labels
Type
Projects
Status