Skip to content

[zephyr] Fix load_parquet memory: use ParquetFile, drop dataset API#4344

Merged
ravwojdyla merged 9 commits intomainfrom
rav-fixup-parquet-reader
Apr 6, 2026
Merged

[zephyr] Fix load_parquet memory: use ParquetFile, drop dataset API#4344
ravwojdyla merged 9 commits intomainfrom
rav-fixup-parquet-reader

Conversation

@ravwojdyla-agent
Copy link
Copy Markdown
Contributor

Summary

  • Use pq.ParquetFile.read_row_group() for all load_parquet paths (plain, filter, row-range, combined), eliminating pyarrow.dataset entirely
  • Fixes upstream memory leak (apache/arrow#39808) where dataset.to_batches() accumulates the entire file in Arrow's memory pool (8.7 GB RSS for a 2 GB file)
  • Filter pushdown is applied post-hoc via table.filter() per row group — loses statistics-based row-group skipping but keeps O(row_group) memory instead of O(file)

Closes #4325

Test plan

  • New test_readers.py with 7 tests: plain read, column projection, row range, filter, filter+row-range, empty file, and a guard that pyarrow.dataset is never imported
  • Verify on a real large parquet file that RSS stays low

🤖 Generated with Claude Code

ravwojdyla and others added 2 commits April 1, 2026 17:45
…aset API

pyarrow.dataset.to_batches() loads the entire file into Arrow's memory
pool upfront (~8.7GB RSS for a 2GB parquet file). ParquetFile.read_row_group()
reads one row group at a time (~0.3GB RSS for the same file).

Use the fast path for the common case (no filter, no row range). Fall back
to dataset API only when filter pushdown or row-range reads are needed.

Also fix write_batch -> write_table in parquet writer (_accumulate_tables
yields Table, not RecordBatch).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Extend the row-group-by-row-group ParquetFile approach to handle
filter and row-range reads too, eliminating pyarrow.dataset entirely.
This avoids the upstream memory leak (apache/arrow#39808) for all
load_parquet call sites, not just the plain-read fast path.

Add test_readers.py covering plain, column projection, row-range,
filter, combined filter+row-range, empty files, and a guard that
pyarrow.dataset is never imported.

Closes #4325

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@ravwojdyla-agent ravwojdyla-agent added bug Something isn't working agent-generated Created by automation/agent labels Apr 2, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 3a16feb2da

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread lib/zephyr/src/zephyr/readers.py Outdated
schema_names = dataset.schema.names
if not schema_names:
return
table = pf.read_row_group(i, columns=columns)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Apply filter before projecting row-group columns

load_parquet now reads each row group with columns=columns and applies table.filter(pa_filter) afterward. In pushed-down pipelines that combine filter + select (e.g. .filter(col("score") > 70).select("id")), _compute_file_pushdown can pass columns=["id"] while the predicate still references score, so this path drops the predicate column before filtering and causes table.filter to fail (or mis-evaluate). The prior dataset.to_table(columns=..., filter=...) flow did not have this ordering problem because the scanner could read predicate columns without projecting them to output.

Useful? React with 👍 / 👎.

ravwojdyla and others added 2 commits April 1, 2026 18:36
…huffle

Extract a shared `iter_parquet_row_groups` utility that reads Parquet
files row-group-by-row-group via `pq.ParquetFile`. Supports column
projection, row ranges, post-read filtering, and statistics-based row
group skipping via `equality_predicates`.

Refactor both `load_parquet` and `ScatterParquetIterator` to use it,
eliminating `pyarrow.dataset` from the entire zephyr read path.
The scatter reader uses equality predicates on (shard_idx, chunk_idx)
for statistics-based pushdown — equivalent to dataset Scanner predicate
pushdown since the scatter writer writes one pair per row group.

Also handle the case where a filter references columns not in the
projection by reading the union and projecting after filtering.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…oup)

Compares memory usage (RSS, Arrow pool) and wall-time for reading
Parquet files via pyarrow.dataset vs pq.ParquetFile.read_row_group,
with and without filters. Generates synthetic files of configurable size.

Usage: uv run python tests/benchmark_parquet_reader.py --size-gb 2

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@ravwojdyla-agent

This comment was marked as low quality.

ravwojdyla and others added 2 commits April 1, 2026 18:54
Use os.urandom-based text columns so on-disk size tracks target.
Add --row-group-mb CLI flag for controlling row group size.
Estimate rows from on-disk bytes (not Arrow bytes) for accurate targeting.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Subprocess isolation gives accurate RSS/Arrow-pool numbers — the
previous in-process approach let memory from earlier readers inflate
later baselines.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@ravwojdyla-agent
Copy link
Copy Markdown
Contributor Author

🤖 ## Benchmark (updated): subprocess-isolated, 1 GB file, ~100 MB row groups

Previous numbers had cross-contamination from running readers in the same process. Updated to spawn a fresh subprocess per reader:

0.99 GB on disk (1,013,128 rows, 10 row groups of ~100 MB each):

Reader Rows Wall (s) RSS delta (GB) Arrow pool (MB)
dataset 1,013,128 0.68 2.132 12.9
parquet_file 1,013,128 0.16 0.251 0.0
iter_row_groups 1,013,128 0.16 0.337 0.0
dataset+filter 507,194 0.01 0.055 0.1
row_groups+filter 507,194 0.17 0.051 0.0

Unfiltered reads

  • 4.25x faster wall-time (0.16s vs 0.68s)
  • 6.3x less RSS (0.34 GB vs 2.13 GB) — dataset allocates >2x the file size
  • Arrow pool: dataset leaks 12.9 MB, row-group reader leaks nothing

Filtered reads (select 2 numeric columns, filter score > 50)

  • dataset+filter is faster (0.01s vs 0.17s) because it has true row-level predicate pushdown during decode
  • Our row_groups+filter reads the full row group then filters post-hoc — correct and memory-friendly, but slower for selective filters on small column subsets
  • Memory is comparable and negligible for both

Takeaway

The big win is the unfiltered / full-column read path which is the common case for Zephyr ETL pipelines. The filtered path trades some wall-time for dramatically better memory behavior in the unfiltered case and eliminates the pyarrow.dataset memory leak entirely.

Reproduce: uv run python lib/zephyr/tests/benchmark_parquet_reader.py --size-gb 1 --row-group-mb 100

@ravwojdyla
Copy link
Copy Markdown
Contributor

Filtered reads (select 2 numeric columns, filter score > 50)

These are unfortunate, I was promised predicate pushdown!

The benchmark tho randomly shuffles the values, so we should try something more representative of the actual workflow where whole row groups can be skipped.

ravwojdyla and others added 2 commits April 1, 2026 19:31
Write files with constant shard_idx and 3 chunk_idx values per row
group, matching the scatter file layout. Compare dataset Scanner
predicate pushdown vs iter_parquet_row_groups with equality_predicates
+ row_filter for selecting a single (shard, chunk) pair.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Each row group has exactly one (shard_idx, chunk_idx) pair with
exact statistics (min == max), matching the real scatter writer.

Five readers:
- dataset_full / rowgroups_full: full scan comparison
- dataset_scatter / rowgroups_scatter: single-chunk read with pushdown
- rowgroups_no_skip: baseline without statistics skipping

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@ravwojdyla-agent
Copy link
Copy Markdown
Contributor Author

🤖 ## Benchmark (final): exact scatter layout, 1 GB, subprocess-isolated

Rewrote the benchmark to match the real scatter file layout: 10 shards × 3 chunks = 30 row groups, each with exactly one (shard_idx, chunk_idx) pair (min == max in statistics). 0.99 GB on disk, ~28K rows per row group.

Full scan (the load_parquet path)

Reader Rows Wall (s) RSS delta (GB)
dataset_full 846,510 0.78 1.076
rowgroups_full 846,510 0.17 0.222

4.6x faster, 4.8x less memory. This is the main win — most zephyr ETL pipelines do unfiltered full scans.

Scatter read (the ScatterParquetIterator path) — select 1 of 30 row groups

Reader Rows Wall (s) RSS (GB) How it works
dataset_scatter 28,217 0.00 0.027 Scanner predicate pushdown (row-level)
rowgroups_scatter 28,217 0.00 0.024 Statistics skip (row-group-level, exact)
rowgroups_no_skip 28,217 0.19 0.094 No skipping — reads all 30 RGs, filters post-hoc

dataset_scatter and rowgroups_scatter are equivalent for the scatter layout because each row group has exactly one (shard_idx, chunk_idx) — statistics are exact so row-group-level skipping = row-level pushdown. The no_skip baseline shows the cost without any pushdown.

Summary

  • For full scans: massive win in both speed and memory
  • For scatter reads: parity with dataset Scanner thanks to exact statistics — no row-level pushdown needed when the scatter writer guarantees one pair per row group
  • pyarrow.dataset is now fully eliminated from the zephyr read path

Reproduce: uv run python lib/zephyr/tests/benchmark_parquet_reader.py --size-gb 1 --row-group-mb 100

@ravwojdyla ravwojdyla requested a review from rjpower April 2, 2026 01:53
@ravwojdyla
Copy link
Copy Markdown
Contributor

@rjpower ptal

return True


def iter_parquet_row_groups(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the duplicate eq_predictes and row_filter feels easy to accidentally screw up. can we extract the row equality predictes?

  def _extract_equality_predicates(
      expr: pc.Expression, schema: pa.Schema
  ) -> dict[str, object]:
      """Extract field==scalar pairs from a pc.Expression via Substrait."""
      msg = ee_pb2.ExtendedExpression()
      msg.ParseFromString(bytes(expr.to_substrait(schema, allow_arrow_extensions=True)))

      func_names = {
          ext.extension_function.function_anchor: ext.extension_function.name
          for ext in msg.extensions
          if ext.HasField("extension_function")
      }
      field_names = list(msg.base_schema.names)
      result: dict[str, object] = {}

      def _walk(node):
          if not node.HasField("scalar_function"):
              return
          sf = node.scalar_function
          fname = func_names.get(sf.function_reference, "")
          if fname == "and":
              for arg in sf.arguments:
                  if arg.HasField("value"):
                      _walk(arg.value)
          elif fname == "equal" and len(sf.arguments) == 2:
              field_arg = lit_arg = None
              for a in (a.value for a in sf.arguments):
                  if a.HasField("selection"):
                      field_arg = a
                  elif a.HasField("literal"):
                      lit_arg = a
              if field_arg and lit_arg:
                  idx = field_arg.selection.direct_reference.struct_field.field
                  lit = lit_arg.literal
                  for typ in ("i64", "i32", "i16", "i8", "fp64", "fp32", "string", "boolean"):
                      if lit.HasField(typ):
                          result[field_names[idx]] = getattr(lit, typ)
                          break

      _walk(msg.referred_expr[0].expression)
      return result

alternatively, i'd suggest getting rid of the row filter for now -- just pick one?

@ravwojdyla
Copy link
Copy Markdown
Contributor

@claude address above, keep it simple

@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Apr 3, 2026

Claude finished @ravwojdyla's task in 3m 57s —— View job


Addressing review feedback

  • Read current readers.py and shuffle.py to understand the duplicate eq_predicates / row_filter logic
  • Simplify: remove row_filter from iter_parquet_row_groups, move post-hoc filtering to load_parquet
  • This also fixes the Codex P1 about filter-before-projection ordering
  • Run pre-commit and tests (all 11 tests pass ✅)
  • Commit and push

What changed: Removed the row_filter parameter from iter_parquet_row_groups entirely. The function now only has equality_predicates for statistics-based row-group skipping. Post-hoc filtering (including the filter-column-union + project-after logic) is handled in load_parquet at the call site.

No caller used both parameters simultaneously — load_parquet used row_filter, ScatterParquetIterator used equality_predicates — so this is a clean separation with no behavioral change.

Move post-hoc filtering responsibility to the caller (load_parquet)
instead of the shared iter_parquet_row_groups function. This eliminates
the duplicate row_filter/equality_predicates parameters and fixes the
filter-before-projection ordering bug flagged by Codex.

No caller used both parameters simultaneously — load_parquet uses
row_filter, ScatterParquetIterator uses equality_predicates — so the
simplification has no behavioral impact.

Co-authored-by: Rafal Wojdyla <ravwojdyla@users.noreply.github.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@ravwojdyla ravwojdyla merged commit 55d8e2b into main Apr 6, 2026
36 checks passed
@ravwojdyla ravwojdyla deleted the rav-fixup-parquet-reader branch April 6, 2026 18:37
Helw150 pushed a commit that referenced this pull request Apr 8, 2026
…4344)

## Summary

- Use `pq.ParquetFile.read_row_group()` for **all** `load_parquet` paths
(plain, filter, row-range, combined), eliminating `pyarrow.dataset`
entirely
- Fixes upstream memory leak
([apache/arrow#39808](apache/arrow#39808))
where `dataset.to_batches()` accumulates the entire file in Arrow's
memory pool (8.7 GB RSS for a 2 GB file)
- Filter pushdown is applied post-hoc via `table.filter()` per row group
— loses statistics-based row-group skipping but keeps O(row_group)
memory instead of O(file)

Closes #4325

## Test plan

- [x] New `test_readers.py` with 7 tests: plain read, column projection,
row range, filter, filter+row-range, empty file, and a guard that
`pyarrow.dataset` is never imported
- [ ] Verify on a real large parquet file that RSS stays low

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Rafal Wojdyla <ravwojdyla@gmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com>
Co-authored-by: Rafal Wojdyla <ravwojdyla@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

agent-generated Created by automation/agent bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[zephyr] load_parquet uses 8.7 GB RSS for a 2 GB file due to pyarrow.dataset API

3 participants