Skip to content

Commit f04de35

Browse files
authored
feat: add ignore_corrupt_files option to read_parquet, read_csv and read_iceberg (#6520)
### Motivation Large data lakes accumulate corrupt or missing files over time. Without a skip option, a single bad file aborts an entire overnight batch job. A simple skip flag, however, is itself a data quality hazard — a job that appears to succeed while quietly dropping data is worse than one that fails loudly. This PR treats observability as a first-class requirement: every skipped file is surfaced as structured data via `df.skipped_files: list[tuple[str, str]]`, available after any executing action. Pipeline code can iterate the `(path, reason)` pairs directly to alert, dead-letter queue, or audit log — skipped files are never silently discarded. The design goal is **errors visible, impact contained, tooling to fix**. ### What gets skipped | Category | Examples | |---|---| | Invalid format | Bad magic bytes, truncated footer, mismatched row/column counts | | Corrupt data | Unreadable row group, bad CSV encoding, wrong field count | | Missing file | Deleted between listing and reading (e.g. concurrent compaction) | Network errors, timeouts, and permission errors are never swallowed — those should be retried or fixed, not silenced. ### Observability - `WARNING` log per skipped file (path + reason) - `df.skipped_files: list[tuple[str, str]]` available after any executing action, for alerting or dead-letter queuing ### What this PR does not do - No row-level `_corrupt_record` column (Parquet is binary columnar — there is no "raw corrupt row string" to preserve; for CSV, file-level skip is the right granularity for the `ignore_corrupt_files` semantic) - No global session config — `ignore_corrupt_files` is a per-call parameter; a session default can be added later if there is demand - Connectors backed by `python_factory_func_scan_task` (Lance, Paimon LSM-merge fallback, etc) are not covered. The `SkippedFilesCollector` lives in the Rust execution context and is unreachable from the Python callsite where those reads happen. A follow-up PR will introduce a sideband mechanism for this. ### Tests `tests/io/test_ignore_corrupt_files.py` — Parquet, CSV, and Iceberg cases covering: corrupt files skipped, default raises, schema inference fallback, correct `COUNT(*)`, and `df.skipped_files` populated/empty as expected. ### Docs `docs/connectors/ignore-corrupt-files.md` ### Related Issues Closes #6468
1 parent deb59f8 commit f04de35

32 files changed

Lines changed: 1177 additions & 72 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

daft/daft/__init__.pyi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ class ParquetSourceConfig:
282282
field_id_mapping: dict[int, PyField] | None = None,
283283
row_groups: list[list[int]] | None = None,
284284
chunk_size: int | None = None,
285+
ignore_corrupt_files: bool = False,
285286
): ...
286287

287288
class CsvSourceConfig:
@@ -308,6 +309,7 @@ class CsvSourceConfig:
308309
comment: str | None,
309310
buffer_size: int | None = None,
310311
chunk_size: int | None = None,
312+
ignore_corrupt_files: bool = False,
311313
): ...
312314

313315
class JsonSourceConfig:
@@ -2420,6 +2422,8 @@ class PyExecutionStats:
24202422
def query_plan(self) -> str | None: ...
24212423
def encode(self) -> bytes: ...
24222424
def to_recordbatch(self) -> PyRecordBatch: ...
2425+
@property
2426+
def skipped_corrupt_files(self) -> list[tuple[str, str, bool]]: ...
24232427

24242428
class PyResultReceiver:
24252429
def __aiter__(self) -> PyResultReceiver: ...

daft/dataframe/dataframe.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,26 @@ def metrics(self) -> RecordBatch | None:
204204
else:
205205
return self._metadata.to_recordbatch() if self._metadata else None
206206

207+
@property
208+
def skipped_corrupt_files(self) -> list[tuple[str, str, bool]]:
209+
"""Files skipped during the last execution due to ignore_corrupt_files=True.
210+
211+
Returns a list of ``(path, reason, partial)`` tuples. ``partial`` is ``True``
212+
when some batches were already emitted before corruption was detected (the file
213+
was not fully skipped). Only available after ``.collect()``.
214+
215+
Example::
216+
217+
df = daft.read_parquet("s3://bucket/data/", ignore_corrupt_files=True)
218+
df.collect()
219+
for path, reason, partial in df.skipped_corrupt_files:
220+
tag = " (partial)" if partial else ""
221+
print(f"Skipped{tag} {path}: {reason}")
222+
"""
223+
if self._result_cache is None:
224+
raise ValueError("skipped_corrupt_files is not available until the DataFrame has been collected")
225+
return self._metadata.skipped_corrupt_files if self._metadata else []
226+
207227
def pipe(
208228
self,
209229
function: Callable[Concatenate["DataFrame", P], T],
@@ -5359,6 +5379,15 @@ def _materialize_results(self) -> None:
53595379
assert result is not None
53605380
result.wait()
53615381
self._metadata.write_mermaid()
5382+
skipped = self._metadata.skipped_corrupt_files if self._metadata else []
5383+
if skipped:
5384+
paths = "\n".join(f" - {path}{' (partial)' if partial else ''}" for path, _, partial in skipped)
5385+
logger.warning(
5386+
"%d file(s) were skipped due to corruption or being missing "
5387+
"(ignore_corrupt_files=True). Use df.skipped_corrupt_files for details.\n%s",
5388+
len(skipped),
5389+
paths,
5390+
)
53625391

53635392
@DataframePublicAPI
53645393
def collect(self, num_preview_rows: int | None = 8) -> "DataFrame":

daft/execution/metadata.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,16 @@ def encode(self) -> bytes:
102102
def to_recordbatch(self) -> RecordBatch:
103103
return RecordBatch._from_pyrecordbatch(self._py.to_recordbatch())
104104

105+
@property
106+
def skipped_corrupt_files(self) -> list[tuple[str, str, bool]]:
107+
"""Files skipped during execution due to ignore_corrupt_files=True.
108+
109+
Returns a list of (path, reason, partial) tuples. ``partial=True`` means
110+
some batches were already emitted before corruption was detected; the file
111+
was not fully skipped.
112+
"""
113+
return self._py.skipped_corrupt_files
114+
105115
def _plan_to_mermaid_string(self) -> str:
106116
"""Convert query_plan dict to mermaid diagram string (bottom-up)."""
107117
metrics = {int(item["id"]): item["stats"] for item in self.to_recordbatch().to_pylist()}

daft/io/_csv.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def read_csv(
3535
io_config: IOConfig | None = None,
3636
file_path_column: str | None = None,
3737
hive_partitioning: bool = False,
38+
ignore_corrupt_files: bool = False,
3839
_buffer_size: int | None = None,
3940
_chunk_size: int | None = None,
4041
checkpoint: "CheckpointConfig | None" = None,
@@ -57,6 +58,9 @@ def read_csv(
5758
checkpoint: Optional :class:`daft.CheckpointConfig` for progress tracking across runs. Bundles the
5859
checkpoint store, the source key column (``on=``), and optional anti-join tuning. Rows whose key
5960
already exists in the store are skipped on re-run. Requires the Ray runner.
61+
ignore_corrupt_files: If True, corrupt or unreadable CSV files are silently skipped instead
62+
of raising an error. Skipped files are recorded in ``df.skipped_corrupt_files`` after collection.
63+
Defaults to False.
6064
6165
Returns:
6266
DataFrame: parsed DataFrame
@@ -93,6 +97,7 @@ def read_csv(
9397
allow_variable_columns=allow_variable_columns,
9498
buffer_size=_buffer_size,
9599
chunk_size=_chunk_size,
100+
ignore_corrupt_files=ignore_corrupt_files,
96101
)
97102
file_format_config = FileFormatConfig.from_csv_config(csv_config)
98103
storage_config = StorageConfig(True, io_config)

daft/io/_parquet.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def read_parquet(
3030
file_path_column: str | None = None,
3131
hive_partitioning: bool = False,
3232
coerce_int96_timestamp_unit: str | TimeUnit | None = None,
33+
ignore_corrupt_files: bool = False,
3334
_multithreaded_io: bool | None = None,
3435
_chunk_size: int | None = None, # A hidden parameter for testing purposes.
3536
checkpoint: "CheckpointConfig | None" = None,
@@ -45,6 +46,11 @@ def read_parquet(
4546
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
4647
hive_partitioning: Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.
4748
coerce_int96_timestamp_unit: TimeUnit to coerce Int96 TimeStamps to. e.g.: [ns, us, ms], Defaults to None.
49+
ignore_corrupt_files: If True, corrupt or unreadable Parquet files are silently skipped
50+
instead of raising an error. Skipped files are recorded in ``df.skipped_corrupt_files`` after
51+
collection. Only genuine format errors (bad magic bytes, truncated footer, corrupt
52+
row-group data) are ignored; network errors and permission errors are still raised.
53+
Defaults to False.
4854
_multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
4955
the amount of system resources (number of connections and thread contention) when running in the Ray runner.
5056
Defaults to None, which will let Daft decide based on the runner it is currently using.
@@ -94,7 +100,12 @@ def read_parquet(
94100
raise ValueError("row_groups are only supported when reading multiple non-globbed/wildcarded files")
95101

96102
file_format_config = FileFormatConfig.from_parquet_config(
97-
ParquetSourceConfig(coerce_int96_timestamp_unit=pytimeunit, row_groups=row_groups, chunk_size=_chunk_size)
103+
ParquetSourceConfig(
104+
coerce_int96_timestamp_unit=pytimeunit,
105+
row_groups=row_groups,
106+
chunk_size=_chunk_size,
107+
ignore_corrupt_files=ignore_corrupt_files,
108+
)
98109
)
99110
storage_config = StorageConfig(multithreaded_io, io_config)
100111

daft/io/iceberg/_iceberg.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ def read_iceberg(
115115
tag: str | None = None,
116116
io_config: IOConfig | None = None,
117117
checkpoint: "CheckpointConfig | None" = None,
118+
ignore_corrupt_files: bool = False,
118119
) -> DataFrame:
119120
"""Create a DataFrame from an Iceberg table.
120121
@@ -130,6 +131,9 @@ def read_iceberg(
130131
checkpoint: Optional :class:`daft.CheckpointConfig` for progress tracking across runs. Bundles the
131132
checkpoint store, the source key column (``on=``), and optional anti-join tuning. Rows whose key
132133
already exists in the store are skipped on re-run. Requires the Ray runner.
134+
ignore_corrupt_files (bool): If True, silently skip corrupt or unreadable data files
135+
instead of raising an error. Skipped files are recorded in ``df.skipped_corrupt_files``
136+
after collection. Defaults to False.
133137
134138
Returns:
135139
DataFrame: a DataFrame with the schema converted from the specified Iceberg table
@@ -175,7 +179,9 @@ def read_iceberg(
175179
multithreaded_io = runners.get_or_create_runner().name != "ray"
176180
storage_config = StorageConfig(multithreaded_io, io_config)
177181

178-
iceberg_operator = IcebergScanOperator(table, snapshot_id=snapshot_id, storage_config=storage_config)
182+
iceberg_operator = IcebergScanOperator(
183+
table, snapshot_id=snapshot_id, storage_config=storage_config, ignore_corrupt_files=ignore_corrupt_files
184+
)
179185

180186
handle = ScanOperatorHandle.from_python_scan_operator(iceberg_operator)
181187
builder = LogicalPlanBuilder.from_tabular_scan(scan_operator=handle)

daft/io/iceberg/iceberg_scan.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ def __init__(
8787
iceberg_table: Table,
8888
snapshot_id: int | None,
8989
storage_config: StorageConfig,
90+
ignore_corrupt_files: bool = False,
9091
) -> None:
9192
super().__init__()
9293
iceberg_schema = (
@@ -96,6 +97,7 @@ def __init__(
9697
self._iceberg_schema = iceberg_schema
9798
self._snapshot_id = snapshot_id
9899
self._storage_config = storage_config
100+
self._ignore_corrupt_files = ignore_corrupt_files
99101
self._field_id_mapping = visit(iceberg_schema, SchemaFieldIdMappingVisitor())
100102
self._schema = convert_iceberg_schema(iceberg_schema)
101103
self._partition_keys = iceberg_partition_spec_to_fields(iceberg_schema, self._iceberg_table.spec())
@@ -200,7 +202,10 @@ def _create_regular_scan_tasks(self, pushdowns: PyPushdowns) -> Iterator[ScanTas
200202
file_format = file.file_format
201203
if file_format == "PARQUET":
202204
file_format_config = FileFormatConfig.from_parquet_config(
203-
ParquetSourceConfig(field_id_mapping=self._field_id_mapping)
205+
ParquetSourceConfig(
206+
field_id_mapping=self._field_id_mapping,
207+
ignore_corrupt_files=self._ignore_corrupt_files,
208+
)
204209
)
205210
else:
206211
# TODO: Support ORC and AVRO when we can read it

docs/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
* [Common Crawl](datasets/common-crawl.md)
3434
* Data Connectors
3535
* [Connectors](connectors/index.md)
36+
* [Generic File Source Options](connectors/generic-file-source-options.md)
3637
* [Custom Connectors](connectors/custom.md)
3738
* [Custom Catalogs](connectors/custom-catalogs.md)
3839
* [AWS Glue](connectors/glue.md)
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# Generic File Source Options
2+
3+
These options apply to `read_parquet`, `read_csv`, and `read_iceberg`. They are not tied to any single connector or format. Other readers (`read_json`, `read_warc`, `read_text`) do not support these options.
4+
5+
## Ignoring Corrupt Files
6+
7+
When reading large collections of files, some files may be unreadable — corrupt, truncated, or deleted between the time Daft lists them and the time it reads them. By default, Daft raises an error and halts the query. The `ignore_corrupt_files` option changes that behavior: qualifying files are silently skipped and the query continues with the remaining data.
8+
9+
### Enabling `ignore_corrupt_files`
10+
11+
Pass `ignore_corrupt_files=True` to any of the supported reader functions:
12+
13+
```python
14+
import daft
15+
16+
# Parquet / CSV (glob-based)
17+
df = daft.read_parquet("s3://my-bucket/data/**/*.parquet", ignore_corrupt_files=True)
18+
df = daft.read_csv("s3://my-bucket/data/**/*.csv", ignore_corrupt_files=True)
19+
20+
# Iceberg
21+
import pyiceberg
22+
table = pyiceberg.table.StaticTable.from_metadata("s3://bucket/iceberg/metadata.json")
23+
df = daft.read_iceberg(table, ignore_corrupt_files=True)
24+
25+
df.collect()
26+
```
27+
28+
### What counts as "corrupt"
29+
30+
Daft skips a file when it encounters a problem that is specific to the file itself and cannot be resolved by retrying:
31+
32+
| Category | Examples |
33+
|---|---|
34+
| **Invalid format** | Bad Parquet magic bytes, truncated footer, mismatched row/column counts |
35+
| **Corrupt data** | Unreadable row group, invalid CSV encoding, wrong field count in a row |
36+
| **Missing file** | File deleted between listing and reading (e.g. concurrent compaction or partition overwrite) |
37+
38+
Daft does **not** skip files for transient infrastructure problems, because those can and should be retried:
39+
40+
| Category | Examples |
41+
|---|---|
42+
| **Network errors** | Connection reset, read timeout, throttled I/O |
43+
| **Permission errors** | Access denied, insufficient credentials |
44+
45+
This distinction matters. Silently retrying a permission error would mask a misconfiguration that needs human attention.
46+
47+
### Observability: knowing what was skipped
48+
49+
`ignore_corrupt_files` is designed around the principle that **errors should be visible, not hidden**. Daft provides two complementary observability mechanisms.
50+
51+
#### Python warning logs
52+
53+
Daft emits a `WARNING`-level log message for every skipped file, including the file path and the reason:
54+
55+
```
56+
WARNING daft.io - Skipping corrupt Parquet file s3://my-bucket/data/bad.parquet: ...
57+
WARNING daft.io - Skipping corrupt CSV chunk in s3://my-bucket/data/partial.csv: ...
58+
```
59+
60+
You can see these with standard Python logging:
61+
62+
```python
63+
import logging
64+
logging.basicConfig(level=logging.WARNING)
65+
```
66+
67+
#### `df.skipped_corrupt_files` — programmatic access
68+
69+
After materializing the dataframe with `.collect()`, the `skipped_corrupt_files` property returns the list of skipped `(path, reason)` pairs as structured data, so your pipeline code can act on them:
70+
71+
```python
72+
df = daft.read_parquet("s3://my-bucket/data/**/*.parquet", ignore_corrupt_files=True)
73+
df.collect()
74+
75+
skipped = df.skipped_corrupt_files # list[tuple[str, str, bool]]
76+
for path, reason, partial in skipped:
77+
tag = " (partial)" if partial else ""
78+
print(f"Skipped{tag}: {path}\n Reason: {reason}")
79+
```
80+
81+
Each entry is a `(path, reason, partial)` tuple. When `partial` is `True`, some batches from the file were already emitted before the corruption was detected — the file was not fully skipped. This can happen when corruption appears in a later row group.
82+
83+
`skipped_corrupt_files` is available after calling `.collect()` on the dataframe. Other execution methods such as `.count_rows()` do not populate this property, because they operate on an internal dataframe rather than materializing the original one.
84+
85+
### Handling skipped files in production
86+
87+
Because `skipped_corrupt_files` is plain Python data, you can plug it directly into your existing alerting or data-quality workflows:
88+
89+
```python
90+
import daft
91+
92+
df = daft.read_parquet("s3://my-bucket/nightly/**/*.parquet", ignore_corrupt_files=True)
93+
df.write_parquet("s3://my-bucket/processed/")
94+
95+
skipped = df.skipped_corrupt_files
96+
if skipped:
97+
# Option 1: send an alert
98+
send_alert(f"{len(skipped)} file(s) skipped during nightly run", details=skipped)
99+
100+
# Option 2: push to a dead-letter queue for later reprocessing
101+
for path, reason, partial in skipped:
102+
dead_letter_queue.put({"path": path, "reason": reason, "partial": partial, "run": TODAY})
103+
```
104+
105+
This pattern — **errors visible, impact contained, tooling to fix** — lets automated batch jobs complete reliably while still surfacing problems for human review.
106+
107+
!!! warning "Do not use `ignore_corrupt_files` as a catch-all"
108+
This option is designed for files that are genuinely unreadable. It should not be used to suppress transient I/O errors (network issues, throttling) — Daft already retries those automatically. If you find yourself needing `ignore_corrupt_files` for a large fraction of your files, investigate the root cause rather than silencing the errors.
109+
110+
### Supported formats
111+
112+
| Format | File-level skip | Within-file error skip |
113+
|---|---|---|
114+
| Parquet (`read_parquet`) | Yes (bad footer, wrong magic bytes, file too small) | Yes (corrupt row group data) |
115+
| CSV (`read_csv`) | Yes (unreadable file, truncated) | Yes (bad encoding, wrong field count in chunk) |
116+
| Iceberg (`read_iceberg`) | Yes (data files go through the Rust Parquet reader) | Yes |
117+
118+
!!! note "Iceberg delete files"
119+
Corruption in Iceberg *delete files* is not covered. If a delete file is unreadable, Daft will raise an error regardless of `ignore_corrupt_files`. Delete files are small metadata structures and corruption there generally indicates a more serious catalog inconsistency.
120+
121+
!!! note "Count pushdown"
122+
When `ignore_corrupt_files` is enabled for Parquet, count pushdown is disabled. This means `df.count()` will read all row-group data instead of using the metadata-only optimization, which may be slower on large datasets.

0 commit comments

Comments
 (0)