Skip to content

feat: add ignore_corrupt_files option to read_parquet, read_csv and read_iceberg#6520

Merged
rohitkulshreshtha merged 4 commits into
Eventual-Inc:mainfrom
chenghuichen:ignore_corrupt
Jun 10, 2026
Merged

feat: add ignore_corrupt_files option to read_parquet, read_csv and read_iceberg#6520
rohitkulshreshtha merged 4 commits into
Eventual-Inc:mainfrom
chenghuichen:ignore_corrupt

Conversation

@chenghuichen

@chenghuichen chenghuichen commented Mar 30, 2026

Copy link
Copy Markdown
Contributor

Motivation

Large data lakes accumulate corrupt or missing files over time. Without a skip option, a single bad file aborts an entire overnight batch job.

A simple skip flag, however, is itself a data quality hazard — a job that appears to succeed while quietly dropping data is worse than one that fails loudly. This PR treats observability as a first-class requirement: every skipped file is surfaced as structured data via df.skipped_files: list[tuple[str, str]], available after any executing action. Pipeline code can iterate the (path, reason) pairs directly to alert, dead-letter queue, or audit log — skipped files are never silently discarded. The design goal is errors visible, impact contained, tooling to fix.

What gets skipped

Category Examples
Invalid format Bad magic bytes, truncated footer, mismatched row/column counts
Corrupt data Unreadable row group, bad CSV encoding, wrong field count
Missing file Deleted between listing and reading (e.g. concurrent compaction)

Network errors, timeouts, and permission errors are never swallowed — those should be retried or fixed, not silenced.

Observability

  • WARNING log per skipped file (path + reason)
  • df.skipped_files: list[tuple[str, str]] available after any executing action, for alerting or dead-letter queuing

What this PR does not do

  • No row-level _corrupt_record column (Parquet is binary columnar — there is no "raw corrupt row string" to preserve; for CSV, file-level skip is the right granularity for the ignore_corrupt_files semantic)
  • No global session config — ignore_corrupt_files is a per-call parameter; a session default can be added later if there is demand
  • Connectors backed by python_factory_func_scan_task (Lance, Paimon LSM-merge fallback, etc) are not covered. The SkippedFilesCollector lives in the Rust execution context and is unreachable from the Python callsite where those reads happen. A follow-up PR will introduce a sideband mechanism for this.

Tests

tests/io/test_ignore_corrupt_files.py — Parquet, CSV, and Iceberg cases covering: corrupt files skipped, default raises, schema inference fallback, correct COUNT(*), and df.skipped_files populated/empty as expected.

Docs

docs/connectors/ignore-corrupt-files.md

Related Issues

Closes #6468

@chenghuichen chenghuichen requested a review from a team as a code owner March 30, 2026 02:26
@github-actions github-actions Bot added the feat label Mar 30, 2026
@greptile-apps

greptile-apps Bot commented Mar 30, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds an ignore_corrupt_files option to read_parquet, read_csv, and read_iceberg, implementing two-level error filtering (file-open and per-chunk) in the Rust execution engine with a shared Arc<Mutex<Vec<(path, reason)>>> collector that surfaces skipped files as df.skipped_corrupt_files after collection.

  • Rust layer: ParquetSourceConfig and CsvSourceConfig gain ignore_corrupt_files; the collector is threaded through ScanTaskSourceforward_scan_task_streamscan_task_reader → the per-format readers. ExecutionStats serialises the list via bincode for the distributed path.
  • Python layer: df.skipped_corrupt_files property on DataFrame delegates to ExecutionMetadata._py.skipped_corrupt_files; the property raises if the frame hasn't been collected, which matches the stated contract.
  • Schema inference: the glob scan operator now loops over remaining files when the first match is corrupt, returning a clear "all files corrupt" error instead of the previous misleading GlobNoMatch.

Confidence Score: 3/5

The happy-path (single collect() with ignore_corrupt_files=False) is unchanged. The two issues found are scoped to the new feature itself: the parquet corruption classifier is too permissive for IO errors, and the skipped-file snapshot in multi-input-id plan scenarios includes data from concurrent queries.

Two real defects in the new feature's core contracts: is_parquet_corrupt would silently absorb non-corruption DaftError::IoError variants (e.g. BrokenPipe, Interrupted) that should surface as errors — contradicting the stated 'network errors are never swallowed' invariant. Additionally, the skipped_corrupt_files snapshot in try_finish's should_remove=false branch is plan-scoped rather than query-scoped, so in streaming or concurrent-query scenarios the caller sees skipped files that belong to other queries sharing the same plan fingerprint.

src/daft-parquet/src/read.rs (is_parquet_corrupt IoError arm) and src/daft-local-execution/src/run.rs (try_finish should_remove=false branch)

Important Files Changed

Filename Overview
src/daft-parquet/src/read.rs Adds ignore_corrupt_files and skipped_corrupt_files to ParquetReadOptions; implements two-level corrupt-file filtering (open errors and per-chunk errors); adds stream_parquet_count_pushdown with skip logic. is_parquet_corrupt uses an overly broad blocklist for DaftError::IoError compared to is_csv_corrupt's allowlist approach.
src/daft-local-execution/src/run.rs Adds skipped_corrupt_files mutex to PlanState; propagates it through pipeline translation and surfaced in try_finish. In the should_remove=false branch, snapshot is plan-scoped (all concurrent input_ids), not query-scoped, which can contaminate results in streaming/multi-batch scenarios.
daft/dataframe/dataframe.py Adds skipped_corrupt_files property to DataFrame; raises if not materialized. Contains a redundant logger = logging.getLogger(__name__) at line 16 (already defined at line 88) inserted mid-import-block.
src/daft-csv/src/read.rs Adds two-level corrupt-file filtering (file-open and per-chunk) for CSV; is_csv_corrupt correctly uses an allowlist (UnexpectedEof only for IoError). Per-chunk errors add the same path multiple times but deduplication in with_skipped_corrupt_files retains only the first reason.
src/daft-local-plan/src/results.rs Adds skipped_corrupt_files field to ExecutionStats; dedup in with_skipped_corrupt_files uses path as the key (ignores reason for duplicates). Encode/decode correctly serialises the new field via bincode.
src/daft-scan/src/glob.rs Schema-inference fallback now loops over remaining glob matches when the first file is corrupt; returns a clear 'all corrupt' error if no readable file is found. Previously noted GlobNoMatch issue has been fixed in this PR.
src/daft-local-execution/src/sources/scan_task.rs Threads SkippedCorruptFilesCollector through ScanTaskSource, forward_scan_task_stream, and stream_scan_task to scan_task_reader::read_scan_task. Logic is clean; the Arc+Mutex collector is correctly cloned per spawned task.
src/daft-local-execution/src/sources/scan_task_reader.rs Dispatches scan tasks to parquet/csv readers with the skipped_corrupt_files collector; JSON, WARC, and Text paths don't receive the collector (no ignore_corrupt_files support) which is consistent with the feature scope.

Sequence Diagram

sequenceDiagram
    participant Py as Python (DataFrame)
    participant NE as NativeExecutor
    participant PS as PlanState
    participant STS as ScanTaskSource
    participant Reader as Parquet/CSV Reader
    participant Mutex as skipped_corrupt_files (Arc<Mutex>)

    Py->>NE: run(plan, input_id)
    NE->>PS: "create PlanState with Arc<Mutex>"
    PS->>STS: ScanTaskSource::new(skipped_corrupt_files.clone())
    loop For each scan task
        STS->>Reader: read_scan_task(url, skipped_corrupt_files.clone())
        alt "File is corrupt and ignore_corrupt_files=true"
            Reader->>Mutex: lock().push((path, reason))
            Reader-->>STS: Ok(empty_stream)
        else File readable
            Reader-->>STS: Ok(record_batch_stream)
        end
    end
    Py->>NE: try_finish(fingerprint, input_id)
    NE->>PS: task_handle.await (wait for all scan tasks)
    PS->>Mutex: lock().clone() to skipped
    NE-->>Py: ExecutionStats with skipped_corrupt_files
    Py->>Py: df.skipped_corrupt_files returns metadata.skipped_corrupt_files
Loading

Reviews (2): Last reviewed commit: "ignore corrupt files" | Re-trigger Greptile

Comment thread src/daft-csv/src/read.rs Outdated
Comment thread src/daft-scan/src/glob.rs
Comment thread src/daft-parquet/src/read.rs Outdated
@chenghuichen chenghuichen changed the title feat: add ignore_corrupt_files option to read_parquet and read_csv feat: add ignore_corrupt_files option to read_parquet, read_csv and read_iceberg Mar 30, 2026
@rohitkulshreshtha

Copy link
Copy Markdown
Contributor

Hi @chenghuichen , thanks for putting this together!

A simple skip flag, however, is itself a data quality hazard — a job that appears to succeed while quietly dropping data is worse than one that fails loudly.

Fully agree with you.

It seems like the main problem here is job failure when the data is bad. Would something like #6446 be of interest to you?

For example, if it was possible to resume processing in a way that skips the rows that have been already processed, would you be OK failing the job? This would mean that the source needs to be fixed outside of daft before daft can process the data.

I ask because although highlighting skipped files is a step towards not failing silently, failing the job is still louder and may still be preferable. For example, from the perspective of a new daft user that has no idea about file skipping.

@chenghuichen

Copy link
Copy Markdown
Contributor Author

Hi @chenghuichen , thanks for putting this together!

A simple skip flag, however, is itself a data quality hazard — a job that appears to succeed while quietly dropping data is worse than one that fails loudly.

Fully agree with you.

It seems like the main problem here is job failure when the data is bad. Would something like #6446 be of interest to you?

For example, if it was possible to resume processing in a way that skips the rows that have been already processed, would you be OK failing the job? This would mean that the source needs to be fixed outside of daft before daft can process the data.

I ask because although highlighting skipped files is a step towards not failing silently, failing the job is still louder and may still be preferable. For example, from the perspective of a new daft user that has no idea about file skipping.

Thanks for the pointer — #6446 is a more principled solution to the underlying problem. Happy to close this PR if the team decides that's the right direction for #6468.

@everySympathy

everySympathy commented Mar 31, 2026

Copy link
Copy Markdown
Collaborator

Hi @chenghuichen , thanks for putting this together!

A simple skip flag, however, is itself a data quality hazard — a job that appears to succeed while quietly dropping data is worse than one that fails loudly.

Fully agree with you.

It seems like the main problem here is job failure when the data is bad. Would something like #6446 be of interest to you?

For example, if it was possible to resume processing in a way that skips the rows that have been already processed, would you be OK failing the job? This would mean that the source needs to be fixed outside of daft before daft can process the data.

I ask because although highlighting skipped files is a step towards not failing silently, failing the job is still louder and may still be preferable. For example, from the perspective of a new daft user that has no idea about file skipping.

Hi @rohitkulshreshtha @chenghuichen
If we use skip_existing in #5931, maybe this PR is still needed.
Especially if the files are written to disk, not object store, then we need the ignore_error flag to load all-ready processed keys into memory while not raising error.

Suppose our daft job A failed.
Job A write a lot of parquet files, some are successful and complete, some are incomplete and damaged because Job A failed in the process.
Then we re-launch a Job B, read the processed key columns from the parquet files written by Job A. We gave the directory to read_parquet(). However, there are some damaged/incorrect parquet files, so read_parquet will raise an error. If we have an ignore_error flag, then read_parquet will skip the damaged files, and return the key columns in the complete/successful parquet files.

@madvart madvart requested a review from desmondcheongzx April 7, 2026 20:35
@caican00

caican00 commented Apr 20, 2026

Copy link
Copy Markdown
Contributor

maybe read_json is also a critical scenario and should ideally provide support for JSON parsing. Consider the following malformed JSON object:

{
  "name": "lele",
  "age": 10

This is syntactically invalid. When read_json attempts to read it, a serialization error will inevitably occur. If the task lacks the ability to skip corrupted JSON files, the entire task will fail.

One might argue that users could simply delete the corrupted files manually. However, when dealing with a large number of damaged files, how can users identify each one individually? Moreover, as data consumers, deleting source data files is often not permitted — it's an illegitimate operation.

@desmondcheongzx

desmondcheongzx commented Apr 21, 2026

Copy link
Copy Markdown
Collaborator

Thanks for pushing on this @chenghuichen, and sorry for the long silence. To directly answer your 3/31 question: we don't want to close this, we want to land it.

On the point @rohitkulshreshtha raised about Checkpoint V2 (#6446): looking at the proposal against this PR, I don't think they're substitutes. Checkpoint V2 filters rows by key via an anti-join over source inputs the store has seen succeed; it has no notion of whether a source file is parseable. The scenarios this PR targets (truncated parquet footer, missing file from concurrent compaction, damaged output from a prior Daft write that crashed mid-flight, which is the case @everySympathy flagged) hit the same corrupt file on every re-run, including runs that use a checkpoint. The two features compose rather than compete: checkpointing resumes progress, ignore_corrupt_files avoids re-aborting on the same bad file.

The silent-data-loss concern is real, but the design here addresses it head-on: default is False, opt-in per call, df.skipped_files surfaces every skip as structured data for alerting or dead-letter, and only file-level corruption errors are swallowed (network, permission, and timeout errors still propagate).

On @caican00's read_json request: valid case, but let's keep this PR scoped to parquet/csv/iceberg. @caican00 feel free open a follow-up issue so we don't lose it.

The remaining blocker before I do a full review pass is a rebase onto main. Feel free to ping me once that's done!

@chenghuichen

Copy link
Copy Markdown
Contributor Author

Thanks for the detailed feedback @desmondcheongzx! Rebased onto main and ready for review.

One small naming change also included: renamed df.skipped_filesdf.skipped_corrupt_files to make it clear these are specifically files skipped due to corruption.

@chenghuichen

Copy link
Copy Markdown
Contributor Author

@caican00 Thanks for the read_json use case — totally valid. Happy to add JSON support in a follow-up once this lands.

@desmondcheongzx desmondcheongzx left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for rebasing @chenghuichen! Took a closer look at your PR. Good stuff!

Left some comments that I think we should address

Comment thread src/daft-parquet/src/read.rs Outdated
Comment thread docs/connectors/ignore-corrupt-files.md Outdated
Comment thread docs/connectors/generic-file-source-options.md
@codspeed-hq

codspeed-hq Bot commented Apr 24, 2026

Copy link
Copy Markdown

Merging this PR will not alter performance

✅ 40 untouched benchmarks
⏩ 10 skipped benchmarks1


Comparing chenghuichen:ignore_corrupt (1639b94) with main (deb59f8)

Open in CodSpeed

Footnotes

  1. 10 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

@chenghuichen

Copy link
Copy Markdown
Contributor Author

@desmondcheongzx All updated.

@chenghuichen

Copy link
Copy Markdown
Contributor Author

Rebased again.

@madvart

madvart commented May 12, 2026

Copy link
Copy Markdown
Contributor

Hey @desmondcheongzx :) - Would you be able to have a look at this? Thanks!

@madvart

madvart commented May 26, 2026

Copy link
Copy Markdown
Contributor

@desmondcheongzx - Checking to see if you are able to review this. Thanks!

@chenghuichen

chenghuichen commented May 28, 2026

Copy link
Copy Markdown
Contributor Author

Rebased again. Reopened for re-trigger CI.

Comment thread src/daft-parquet/src/read.rs Outdated
Comment thread src/daft-parquet/src/read.rs Outdated
Comment thread src/daft-parquet/src/read.rs
Comment thread src/daft-parquet/src/read.rs
Comment thread tests/io/test_ignore_corrupt_files.py Outdated
Comment on lines +177 to +184
// Accumulate skipped files from completed tasks so they are available in export_metrics().
if let TaskEvent::Completed { ref stats, .. } = event
&& !stats.skipped_corrupt_files.is_empty()
&& let Ok(mut v) = self.skipped_corrupt_files.lock()
{
v.extend(stats.skipped_corrupt_files.iter().cloned());
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cross-task accumulation (and the dedup in export_metrics) is the distributed half of the feature, but all the tests run on the local runner — so this path isn't exercised. Since the whole point is surfacing skips reliably, the distributed merge feels like the riskiest place to leave uncovered. Is there a good way to get a test on it here, or does that need the multi-worker harness? Happy to defer if it's disproportionate, just want to make sure it's a conscious call rather than an accidental gap.

@rohitkulshreshtha

Copy link
Copy Markdown
Contributor

Nicely done overall. Thanks for this contribution.

Some non-blocking minor things:

Code

  • daft/dataframe/dataframe.py:16logger = logging.getLogger(__name__) sits in the middle of the import block (between import warnings and from collections.abc ...); ruff/isort will likely flag it. Move below the imports.
  • daft-csv/src/lib.rs:94,101 — the two unreachable!() after the io-kind check bet on csv/csv_async's internal invariant; a _ => External(...) fallback would avoid a hard panic if that ever shifts.
  • daft-scan/src/glob.rs:204-282 vs 304-382 — the Parquet and CSV schema-inference fallbacks are ~near-identical 80-line blocks. Could factor into one helper parameterized over the per-file inference call, so future fixes land once.
  • CorruptFile falls into the _ => DaftCoreException arm (common/error python.rs), so the all-corrupt raise shows the raw DaftError::CorruptFile ... prefix. Cosmetic.

Docs

  • read_iceberg gained the ignore_corrupt_files param but it's missing from the Args: docstring (parquet/csv document it).
  • The new page is titled "Generic File Source Options" / "apply to all file-based readers", but only parquet, csv, iceberg support it — read_json/read_warc/read_text don't. Slight overclaim.
  • test_ignore_corrupt_files.py module docstring mentions read_lance, but there are no lance tests (and the PR says lance is out of scope).
  • Worth a docs note: enabling ignore_corrupt_files disables count pushdown, so df.count() on a large Parquet dataset becomes a full read rather than metadata-only.

@chenghuichen chenghuichen force-pushed the ignore_corrupt branch 2 times, most recently from 81d5a57 to db5b237 Compare June 5, 2026 14:05
@chenghuichen

Copy link
Copy Markdown
Contributor Author

@rohitkulshreshtha Thanks for the review!

Addressed the string-matching fragility, IoError denylist, and unreachable code — Error::Arrow now routes to CorruptFile at the source, is_parquet_corrupt uses an allowlist matching is_csv_corrupt, and the dead ignore_corrupt_files params are removed from stream_parquet_count_pushdown.

For partial reads: added a partial boolean to the tuple (path, reason, partial) so downstream consumers can distinguish fully-skipped files from files that emitted some rows before hitting corruption. Added a multi-row-group test that pins this behavior.

Count-pushdown stays disabled when ignore_corrupt_files is on — it only reads footer metadata and can't detect row-level corruption, so the counts would be wrong.

Distributed test coverage: the cross-task accumulation and dedup in StatisticsManager isn't covered yet — we'd need a multi-worker test harness that doesn't exist today. Will follow up in a separate PR.

@chenghuichen

Copy link
Copy Markdown
Contributor Author

CI failures are pre-existing flaky tests on main (HuggingFace 429 rate limits).

@rohitkulshreshtha rohitkulshreshtha left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this all looks good — the typed-error classification (Error::Arrow → CorruptFile + the UnexpectedEof allowlist) is much more robust than the string matching, the dead count-pushdown branch is gone, and the partial flag + multi-row-group test cover the partial-read case I was worried about.

On the distributed coverage: totally fair to follow up separately. Thanks for driving this all the way.

@rohitkulshreshtha rohitkulshreshtha enabled auto-merge (squash) June 9, 2026 22:11
@rohitkulshreshtha rohitkulshreshtha merged commit f04de35 into Eventual-Inc:main Jun 10, 2026
35 checks passed
@chenghuichen

Copy link
Copy Markdown
Contributor Author

Thanks, this all looks good — the typed-error classification (Error::Arrow → CorruptFile + the UnexpectedEof allowlist) is much more robust than the string matching, the dead count-pushdown branch is gone, and the partial flag + multi-row-group test cover the partial-read case I was worried about.

On the distributed coverage: totally fair to follow up separately. Thanks for driving this all the way.

Thank you very much!

chenghuichen added a commit to chenghuichen/Daft that referenced this pull request Jun 10, 2026
* origin/main: (115 commits)
  feat: add ignore_corrupt_files option to read_parquet, read_csv and read_iceberg (Eventual-Inc#6520)
  fix(deps): gate vllm to Linux so macOS/Windows resolve without CUDA wheels (Eventual-Inc#7095)
  fix: pass options in Gravitino PostgreSQL read method (Eventual-Inc#7047)
  feat(ray): Implement dynamic scale-in for RaySwordfishActor (Eventual-Inc#5903)
  feat(delta-lake): support column mapping for reads (Eventual-Inc#7005)
  feat(functions): add string distance/similarity functions (Eventual-Inc#7068)
  test(parquet): cover read_parquet edge cases (Eventual-Inc#7085)
  refactor(checkpoint): drop "seal" vocabulary from Rust API surface (Eventual-Inc#7078)
  fix(asof-join): use unknown clustering spec instead of hash           (Eventual-Inc#7075)
  docs: standardize Slack links to use daft.ai/slack (Eventual-Inc#7066)
  feat: add try_cast function for safe type conversion (Eventual-Inc#6960)
  refactor(file): rename File byte-range fields to position/size (Eventual-Inc#6747)
  fix(ray): configure worker startup timeout on runner (Eventual-Inc#7055)
  feat(shuffle): default flight shuffle compression to lz4 (Eventual-Inc#7071)
  feat(iceberg): support branch and tag reads (Eventual-Inc#7042)
  fix(shuffle): concat recordbatches before repartition (Eventual-Inc#7064)
  perf: update jemalloc 5.3.0 → 5.3.1 to fix muzzy decay performance bug (Eventual-Inc#7059)
  feat: thread assume_sorted_and_aligned_partitions parameter through ASOF join (Eventual-Inc#7067)
  fix(flight-shuffle): reduce coordinator memory to O(map_tasks + partitions) (Eventual-Inc#7056)
  refactor(distributed): rename needs_hash_repartition to can_skip_hash_repartition      (Eventual-Inc#7053)
  ...

# Conflicts:
#	daft/checkpoint.py
#	src/daft-distributed/src/pipeline_node/limit.rs
#	src/daft-distributed/src/pipeline_node/stage_checkpoint_keys.rs
#	src/daft-distributed/src/scheduling/task.rs
#	src/daft-local-execution/src/pipeline.rs
#	src/daft-local-execution/src/sinks/blocking_sink.rs
#	src/daft-local-execution/src/sources/scan_task.rs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add ignore error for Parquet/CSV/lance/iceberg

6 participants