Skip to content

Commit 308bc0c

Browse files
committed
fix: error on missing delete files instead of silent corruption
When a delete file referenced in catalog metadata is missing from storage, queries previously returned all rows silently (including deleted ones). Now the Parquet read error is caught and wrapped with a clear message identifying the missing delete file and suggesting possible causes. Closes #52
1 parent b3b25a7 commit 308bc0c

2 files changed

Lines changed: 132 additions & 1 deletion

File tree

src/table.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,17 @@ impl DuckLakeTable {
306306
.collect::<Vec<_>>()
307307
.await
308308
.into_iter()
309-
.collect::<DataFusionResult<Vec<_>>>()?;
309+
.collect::<DataFusionResult<Vec<_>>>()
310+
.map_err(|e| {
311+
if is_object_store_not_found(&e) {
312+
DataFusionError::Execution(format!(
313+
"Delete file '{}' referenced in catalog metadata was not found. This may indicate catalog corruption or that the file was deleted outside of DuckLake.",
314+
resolved_delete_path
315+
))
316+
} else {
317+
e
318+
}
319+
})?;
310320

311321
// Extract all positions from all batches
312322
let mut positions = HashSet::new();
@@ -637,3 +647,18 @@ fn extract_deleted_positions_from_batch(
637647

638648
Ok(())
639649
}
650+
651+
/// Check if a DataFusion error is caused by an object store NotFound error.
652+
fn is_object_store_not_found(err: &DataFusionError) -> bool {
653+
if let DataFusionError::ObjectStore(os_err) = err {
654+
return matches!(os_err.as_ref(), object_store::Error::NotFound { .. });
655+
}
656+
let mut source = std::error::Error::source(err);
657+
while let Some(e) = source {
658+
if let Some(os_err) = e.downcast_ref::<object_store::Error>() {
659+
return matches!(os_err, object_store::Error::NotFound { .. });
660+
}
661+
source = e.source();
662+
}
663+
false
664+
}

tests/missing_delete_file_tests.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#![cfg(feature = "metadata-duckdb")]
2+
//! Tests for missing delete file error handling (issue #52)
3+
4+
mod common;
5+
6+
use std::sync::Arc;
7+
8+
use datafusion::error::Result as DataFusionResult;
9+
use datafusion::prelude::*;
10+
use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider};
11+
use tempfile::TempDir;
12+
13+
fn create_catalog(path: &str) -> DataFusionResult<Arc<DuckLakeCatalog>> {
14+
let provider = DuckdbMetadataProvider::new(path)
15+
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
16+
let catalog = DuckLakeCatalog::new(provider)
17+
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
18+
Ok(Arc::new(catalog))
19+
}
20+
21+
fn remove_delete_files(data_dir: &std::path::Path) -> Vec<std::path::PathBuf> {
22+
let mut removed = Vec::new();
23+
if let Ok(entries) = std::fs::read_dir(data_dir) {
24+
for entry in entries.flatten() {
25+
let path = entry.path();
26+
if path.is_dir() {
27+
removed.extend(remove_delete_files(&path));
28+
} else if let Some(name) = path.file_name().and_then(|n| n.to_str())
29+
&& name.contains("delete")
30+
&& name.ends_with(".parquet")
31+
{
32+
std::fs::remove_file(&path).expect("Failed to remove delete file");
33+
removed.push(path);
34+
}
35+
}
36+
}
37+
removed
38+
}
39+
40+
#[tokio::test]
41+
async fn test_missing_delete_file_returns_error() -> DataFusionResult<()> {
42+
let temp_dir =
43+
TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
44+
let catalog_path = temp_dir.path().join("missing_delete.ducklake");
45+
common::create_catalog_with_deletes(&catalog_path).map_err(common::to_datafusion_error)?;
46+
let removed = remove_delete_files(temp_dir.path());
47+
assert!(
48+
!removed.is_empty(),
49+
"Should have removed at least one delete file"
50+
);
51+
let catalog = create_catalog(&catalog_path.to_string_lossy())?;
52+
let ctx = SessionContext::new();
53+
ctx.register_catalog("test", catalog);
54+
let df = ctx
55+
.sql("SELECT * FROM test.main.products ORDER BY id")
56+
.await?;
57+
let result = df.collect().await;
58+
assert!(result.is_err(), "Expected error for missing delete file");
59+
let err_msg = result.unwrap_err().to_string();
60+
assert!(
61+
err_msg.contains("not found"),
62+
"Error should mention not found, got: {}",
63+
err_msg
64+
);
65+
assert!(
66+
err_msg.contains("Delete file"),
67+
"Error should mention Delete file, got: {}",
68+
err_msg
69+
);
70+
Ok(())
71+
}
72+
73+
#[tokio::test]
74+
async fn test_delete_files_work_normally() -> DataFusionResult<()> {
75+
let temp_dir =
76+
TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
77+
let catalog_path = temp_dir.path().join("normal_deletes.ducklake");
78+
common::create_catalog_with_deletes(&catalog_path).map_err(common::to_datafusion_error)?;
79+
let catalog = create_catalog(&catalog_path.to_string_lossy())?;
80+
let ctx = SessionContext::new();
81+
ctx.register_catalog("test", catalog);
82+
let df = ctx
83+
.sql("SELECT id FROM test.main.products ORDER BY id")
84+
.await?;
85+
let results = df.collect().await?;
86+
let total_rows: usize = results.iter().map(|b| b.num_rows()).sum();
87+
assert_eq!(total_rows, 3, "Should have 3 rows after deletes");
88+
Ok(())
89+
}
90+
91+
#[tokio::test]
92+
async fn test_missing_delete_file_count_query_errors() -> DataFusionResult<()> {
93+
let temp_dir =
94+
TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
95+
let catalog_path = temp_dir.path().join("missing_delete_count.ducklake");
96+
common::create_catalog_with_deletes(&catalog_path).map_err(common::to_datafusion_error)?;
97+
let removed = remove_delete_files(temp_dir.path());
98+
assert!(!removed.is_empty());
99+
let catalog = create_catalog(&catalog_path.to_string_lossy())?;
100+
let ctx = SessionContext::new();
101+
ctx.register_catalog("test", catalog);
102+
let df = ctx.sql("SELECT COUNT(*) FROM test.main.products").await?;
103+
let result = df.collect().await;
104+
assert!(result.is_err(), "COUNT should error on missing delete file");
105+
Ok(())
106+
}

0 commit comments

Comments
 (0)