Skip to content

Commit 72a8e5a

Browse files
committed
ignore corrupt files
1 parent 04a0ab0 commit 72a8e5a

16 files changed

Lines changed: 94 additions & 91 deletions

File tree

daft/daft/__init__.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2298,7 +2298,7 @@ class PyExecutionStats:
22982298
def encode(self) -> bytes: ...
22992299
def to_recordbatch(self) -> PyRecordBatch: ...
23002300
@property
2301-
def skipped_files(self) -> list[tuple[str, str]]: ...
2301+
def skipped_corrupt_files(self) -> list[tuple[str, str]]: ...
23022302

23032303
class PyResultReceiver:
23042304
def __aiter__(self) -> PyResultReceiver: ...

daft/dataframe/dataframe.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ def metrics(self) -> RecordBatch | None:
186186
return self._metadata.to_recordbatch() if self._metadata else None
187187

188188
@property
189-
def skipped_files(self) -> list[tuple[str, str]]:
189+
def skipped_corrupt_files(self) -> list[tuple[str, str]]:
190190
"""Files skipped during the last execution due to ignore_corrupt_files=True.
191191
192192
Returns a list of ``(path, reason)`` tuples for every file that was skipped.
@@ -196,12 +196,12 @@ def skipped_files(self) -> list[tuple[str, str]]:
196196
197197
df = daft.read_parquet("s3://bucket/data/", ignore_corrupt_files=True)
198198
df.collect()
199-
for path, reason in df.skipped_files:
199+
for path, reason in df.skipped_corrupt_files:
200200
print(f"Skipped {path}: {reason}")
201201
"""
202202
if self._result_cache is None:
203-
raise ValueError("skipped_files is not available until the DataFrame has been collected")
204-
return self._metadata.skipped_files if self._metadata else []
203+
raise ValueError("skipped_corrupt_files is not available until the DataFrame has been collected")
204+
return self._metadata.skipped_corrupt_files if self._metadata else []
205205

206206
def pipe(
207207
self,
@@ -4884,12 +4884,12 @@ def _materialize_results(self) -> None:
48844884
assert result is not None
48854885
result.wait()
48864886
self._metadata.write_mermaid()
4887-
skipped = self._metadata.skipped_files if self._metadata else []
4887+
skipped = self._metadata.skipped_corrupt_files if self._metadata else []
48884888
if skipped:
48894889
paths = "\n".join(f" - {path}" for path, _ in skipped)
48904890
logger.warning(
48914891
"%d file(s) were skipped due to corruption or being missing "
4892-
"(ignore_corrupt_files=True). Use df.skipped_files for details.\n%s",
4892+
"(ignore_corrupt_files=True). Use df.skipped_corrupt_files for details.\n%s",
48934893
len(skipped),
48944894
paths,
48954895
)

daft/execution/metadata.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,12 @@ def to_recordbatch(self) -> RecordBatch:
103103
return RecordBatch._from_pyrecordbatch(self._py.to_recordbatch())
104104

105105
@property
106-
def skipped_files(self) -> list[tuple[str, str]]:
106+
def skipped_corrupt_files(self) -> list[tuple[str, str]]:
107107
"""Files skipped during execution due to ignore_corrupt_files=True.
108108
109109
Returns a list of (path, reason) tuples for every file that was skipped.
110110
"""
111-
return self._py.skipped_files
111+
return self._py.skipped_corrupt_files
112112

113113
def _plan_to_mermaid_string(self) -> str:
114114
"""Convert query_plan dict to mermaid diagram string (bottom-up)."""

daft/io/_csv.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def read_csv(
5050
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
5151
hive_partitioning: Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.
5252
ignore_corrupt_files: If True, corrupt or unreadable CSV files are silently skipped instead
53-
of raising an error. Skipped files are recorded in ``df.skipped_files`` after collection.
53+
of raising an error. Skipped files are recorded in ``df.skipped_corrupt_files`` after collection.
5454
Defaults to False.
5555
5656
Returns:

daft/io/_parquet.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def read_parquet(
4141
hive_partitioning: Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.
4242
coerce_int96_timestamp_unit: TimeUnit to coerce Int96 TimeStamps to. e.g.: [ns, us, ms], Defaults to None.
4343
ignore_corrupt_files: If True, corrupt or unreadable Parquet files are silently skipped
44-
instead of raising an error. Skipped files are recorded in ``df.skipped_files`` after
44+
instead of raising an error. Skipped files are recorded in ``df.skipped_corrupt_files`` after
4545
collection. Only genuine format errors (bad magic bytes, truncated footer, corrupt
4646
row-group data) are ignored; network errors and permission errors are still raised.
4747
Defaults to False.

docs/connectors/ignore-corrupt-files.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,32 +60,32 @@ import logging
6060
logging.basicConfig(level=logging.WARNING)
6161
```
6262

63-
### `df.skipped_files` — programmatic access
63+
### `df.skipped_corrupt_files` — programmatic access
6464

65-
After calling `.collect()`, the `skipped_files` property returns the list of skipped `(path, reason)` pairs as structured data, so your pipeline code can act on them:
65+
After calling `.collect()`, the `skipped_corrupt_files` property returns the list of skipped `(path, reason)` pairs as structured data, so your pipeline code can act on them:
6666

6767
```python
6868
df = daft.read_parquet("s3://my-bucket/data/**/*.parquet", ignore_corrupt_files=True)
6969
df.collect()
7070

71-
skipped = df.skipped_files # list[tuple[str, str]]
71+
skipped = df.skipped_corrupt_files # list[tuple[str, str]]
7272
for path, reason in skipped:
7373
print(f"Skipped: {path}\n Reason: {reason}")
7474
```
7575

76-
`skipped_files` is available after any action that triggers execution (`.collect()`, `.write_parquet()`, etc.).
76+
`skipped_corrupt_files` is available after any action that triggers execution (`.collect()`, `.write_parquet()`, etc.).
7777

7878
## Handling skipped files in production
7979

80-
Because `skipped_files` is plain Python data, you can plug it directly into your existing alerting or data-quality workflows:
80+
Because `skipped_corrupt_files` is plain Python data, you can plug it directly into your existing alerting or data-quality workflows:
8181

8282
```python
8383
import daft
8484

8585
df = daft.read_parquet("s3://my-bucket/nightly/**/*.parquet", ignore_corrupt_files=True)
8686
df.write_parquet("s3://my-bucket/processed/")
8787

88-
skipped = df.skipped_files
88+
skipped = df.skipped_corrupt_files
8989
if skipped:
9090
# Option 1: send an alert
9191
send_alert(f"{len(skipped)} file(s) skipped during nightly run", details=skipped)

src/daft-csv/src/read.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use tokio_util::io::StreamReader;
3030

3131
use crate::{CsvConvertOptions, CsvParseOptions, CsvReadOptions, metadata::read_csv_schema_single};
3232

33-
type SkippedFilesCollector = Option<std::sync::Arc<std::sync::Mutex<Vec<(String, String)>>>>;
33+
type SkippedCorruptFilesCollector = Option<std::sync::Arc<std::sync::Mutex<Vec<(String, String)>>>>;
3434

3535
trait ByteRecordChunkStream: Stream<Item = super::Result<Vec<csv_async::ByteRecord>>> {}
3636
impl<S> ByteRecordChunkStream for S where S: Stream<Item = super::Result<Vec<csv_async::ByteRecord>>>
@@ -171,7 +171,7 @@ pub async fn stream_csv(
171171
io_stats: Option<IOStatsRef>,
172172
max_chunks_in_flight: Option<usize>,
173173
ignore_corrupt_files: bool,
174-
skipped_files: SkippedFilesCollector,
174+
skipped_corrupt_files: SkippedCorruptFilesCollector,
175175
) -> DaftResult<BoxStream<'static, DaftResult<RecordBatch>>> {
176176
let (source_type, _) = parse_url(&uri)?;
177177
let is_compressed = CompressionCodec::from_uri(&uri).is_some();
@@ -207,10 +207,10 @@ pub async fn stream_csv(
207207
// Level 2: filter per-chunk errors that indicate format corruption
208208
// (e.g. bad encoding or wrong field count discovered mid-stream).
209209
let uri_for_warn = uri.clone();
210-
let skipped_files_inner = skipped_files.clone();
210+
let skipped_corrupt_files_inner = skipped_corrupt_files.clone();
211211
let filtered = stream.filter_map(move |result| {
212212
let uri_w = uri_for_warn.clone();
213-
let skipped = skipped_files_inner.clone();
213+
let skipped = skipped_corrupt_files_inner.clone();
214214
futures::future::ready(match result {
215215
Ok(batch) => Some(Ok(batch)),
216216
Err(ref e) if is_csv_corrupt(e) => {
@@ -233,7 +233,7 @@ pub async fn stream_csv(
233233
// Level 1: file-open / schema-inference errors (format error or truncated file).
234234
Err(ref e) if ignore_corrupt_files && is_csv_corrupt(e) => {
235235
log::warn!("Skipping unreadable/corrupt CSV file {uri}: {e}");
236-
if let Some(ref collector) = skipped_files
236+
if let Some(ref collector) = skipped_corrupt_files
237237
&& let Ok(mut v) = collector.lock()
238238
{
239239
v.push((uri, e.to_string()));

src/daft-distributed/src/statistics/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub type StatisticsManagerRef = Arc<StatisticsManager>;
100100
pub struct StatisticsManager {
101101
runtime_node_managers: Arc<HashMap<NodeID, RuntimeNodeManager>>,
102102
subscribers: Mutex<Vec<Box<dyn StatisticsSubscriber>>>,
103-
skipped_files: Mutex<Vec<(String, String)>>,
103+
skipped_corrupt_files: Mutex<Vec<(String, String)>>,
104104
}
105105

106106
impl StatisticsManager {
@@ -136,17 +136,17 @@ impl StatisticsManager {
136136
Ok(Arc::new(Self {
137137
runtime_node_managers,
138138
subscribers: Mutex::new(subscribers),
139-
skipped_files: Mutex::new(vec![]),
139+
skipped_corrupt_files: Mutex::new(vec![]),
140140
}))
141141
}
142142

143143
pub fn handle_event(&self, event: TaskEvent) -> DaftResult<()> {
144144
// Accumulate skipped files from completed tasks so they are available in export_metrics().
145145
if let TaskEvent::Completed { ref stats, .. } = event
146-
&& !stats.skipped_files.is_empty()
147-
&& let Ok(mut v) = self.skipped_files.lock()
146+
&& !stats.skipped_corrupt_files.is_empty()
147+
&& let Ok(mut v) = self.skipped_corrupt_files.lock()
148148
{
149-
v.extend(stats.skipped_files.iter().cloned());
149+
v.extend(stats.skipped_corrupt_files.iter().cloned());
150150
}
151151

152152
for node_id in &event.context().node_ids {
@@ -173,11 +173,11 @@ impl StatisticsManager {
173173
.values()
174174
.map(RuntimeNodeManager::export_snapshot)
175175
.collect();
176-
let skipped_files = self
177-
.skipped_files
176+
let skipped_corrupt_files = self
177+
.skipped_corrupt_files
178178
.lock()
179179
.map(|v| v.clone())
180180
.unwrap_or_default();
181-
ExecutionStats::new("".into(), nodes).with_skipped_files(skipped_files)
181+
ExecutionStats::new("".into(), nodes).with_skipped_corrupt_files(skipped_corrupt_files)
182182
}
183183
}

src/daft-local-execution/src/pipeline.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ pub struct BuilderContext {
263263
pub meter: Meter,
264264
context: HashMap<String, String>,
265265
shuffle_server: Option<(Arc<ShuffleFlightServer>, String)>,
266-
pub skipped_files: std::sync::Arc<std::sync::Mutex<Vec<(String, String)>>>,
266+
pub skipped_corrupt_files: std::sync::Arc<std::sync::Mutex<Vec<(String, String)>>>,
267267
}
268268

269269
impl BuilderContext {
@@ -283,7 +283,7 @@ impl BuilderContext {
283283
meter,
284284
context,
285285
shuffle_server,
286-
skipped_files: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
286+
skipped_corrupt_files: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
287287
}
288288
}
289289

@@ -400,7 +400,7 @@ fn physical_plan_to_pipeline(
400400
pushdowns.clone(),
401401
schema.clone(),
402402
cfg,
403-
Some(ctx.skipped_files.clone()),
403+
Some(ctx.skipped_corrupt_files.clone()),
404404
);
405405
SourceNode::new(
406406
Box::new(scan_task_source),

src/daft-local-execution/src/sources/scan_task.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ use crate::{
3030
},
3131
};
3232

33-
type SkippedFilesCollector = Option<Arc<std::sync::Mutex<Vec<(String, String)>>>>;
33+
type SkippedCorruptFilesCollector = Option<Arc<std::sync::Mutex<Vec<(String, String)>>>>;
3434

3535
pub struct ScanTaskSource {
3636
receiver: UnboundedReceiver<(InputId, Vec<ScanTaskRef>)>,
3737
source_config: Option<Arc<SourceConfig>>,
3838
pushdowns: Pushdowns,
3939
schema: SchemaRef,
4040
num_parallel_tasks: usize,
41-
skipped_files: SkippedFilesCollector,
41+
skipped_corrupt_files: SkippedCorruptFilesCollector,
4242
}
4343

4444
impl ScanTaskSource {
@@ -48,7 +48,7 @@ impl ScanTaskSource {
4848
pushdowns: Pushdowns,
4949
schema: SchemaRef,
5050
cfg: &DaftExecutionConfig,
51-
skipped_files: SkippedFilesCollector,
51+
skipped_corrupt_files: SkippedCorruptFilesCollector,
5252
) -> Self {
5353
let num_cpus = get_compute_pool_num_threads();
5454
let num_parallel_tasks = if cfg.scantask_max_parallel > 0 {
@@ -62,7 +62,7 @@ impl ScanTaskSource {
6262
pushdowns,
6363
schema,
6464
num_parallel_tasks,
65-
skipped_files,
65+
skipped_corrupt_files,
6666
}
6767
}
6868

@@ -75,7 +75,7 @@ impl ScanTaskSource {
7575
chunk_size: usize,
7676
schema: SchemaRef,
7777
maintain_order: bool,
78-
skipped_files: SkippedFilesCollector,
78+
skipped_corrupt_files: SkippedCorruptFilesCollector,
7979
) -> common_runtime::RuntimeTask<DaftResult<()>> {
8080
let io_runtime = get_io_runtime(true);
8181

@@ -123,7 +123,7 @@ impl ScanTaskSource {
123123
chunk_size,
124124
sender,
125125
input_id,
126-
skipped_files.clone(),
126+
skipped_corrupt_files.clone(),
127127
));
128128
}
129129

@@ -241,7 +241,7 @@ impl Source for ScanTaskSource {
241241
chunk_size,
242242
self.schema.clone(),
243243
maintain_order,
244-
self.skipped_files.clone(),
244+
self.skipped_corrupt_files.clone(),
245245
);
246246
let result_stream = output_receiver.into_stream().map(Ok);
247247
let combined_stream = combine_stream(result_stream, processor_task.map(|x| x?));
@@ -494,7 +494,7 @@ async fn forward_scan_task_stream(
494494
chunk_size: usize,
495495
sender: ScanTaskOutputSender,
496496
input_id: InputId,
497-
skipped_files: SkippedFilesCollector,
497+
skipped_corrupt_files: SkippedCorruptFilesCollector,
498498
) -> DaftResult<InputId> {
499499
let schema = scan_task.materialized_schema();
500500
let mut stream = stream_scan_task(
@@ -503,7 +503,7 @@ async fn forward_scan_task_stream(
503503
delete_map,
504504
maintain_order,
505505
chunk_size,
506-
skipped_files,
506+
skipped_corrupt_files,
507507
)
508508
.await?;
509509
let mut has_data = false;
@@ -557,7 +557,7 @@ async fn stream_scan_task(
557557
delete_map: Option<Arc<HashMap<String, Vec<i64>>>>,
558558
maintain_order: bool,
559559
chunk_size: usize,
560-
skipped_files: SkippedFilesCollector,
560+
skipped_corrupt_files: SkippedCorruptFilesCollector,
561561
) -> DaftResult<impl Stream<Item = DaftResult<MicroPartition>> + Send> {
562562
let pushdown_columns = scan_task
563563
.pushdowns
@@ -613,7 +613,7 @@ async fn stream_scan_task(
613613
delete_map,
614614
maintain_order,
615615
chunk_size,
616-
skipped_files,
616+
skipped_corrupt_files,
617617
)
618618
.await?;
619619

0 commit comments

Comments
 (0)