Skip to content

Commit 007eaaa

Browse files
authored
Merge pull request #66 from hotdata-dev/fix/validate-numeric-metadata
fix: validate file_size_bytes and footer_size from metadata
2 parents 983854d + 02f04f6 commit 007eaaa

4 files changed

Lines changed: 245 additions & 20 deletions

File tree

src/table.rs

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,20 @@ use datafusion::execution::parquet_encryption::EncryptionFactory;
4949
pub const DELETE_FILE_PATH_COL: &str = "file_path";
5050
pub const DELETE_POS_COL: &str = "pos";
5151

52+
/// Validate and convert file_size_bytes from i64 (as stored in DuckLake metadata) to u64.
53+
///
54+
/// DuckLake stores file sizes as signed integers in SQL. A negative value indicates
55+
/// corrupt or invalid metadata. Without this check, a negative i64 cast to u64 would
56+
/// wrap to a huge value (e.g., -1 becomes u64::MAX), causing confusing downstream errors.
57+
pub(crate) fn validated_file_size(file_size_bytes: i64, file_path: &str) -> DataFusionResult<u64> {
58+
u64::try_from(file_size_bytes).map_err(|_| {
59+
DataFusionError::Execution(format!(
60+
"Invalid file_size_bytes ({}) for file '{}': value must be non-negative",
61+
file_size_bytes, file_path
62+
))
63+
})
64+
}
65+
5266
/// Returns the expected schema for DuckLake delete files
5367
///
5468
/// Delete files have a standard schema: (file_path: VARCHAR, pos: INT64)
@@ -281,10 +295,15 @@ impl DuckLakeTable {
281295
let resolved_delete_path = self.resolve_file_path(delete_file)?;
282296

283297
// Create PartitionedFile with footer size hint if available
284-
let mut pf =
285-
PartitionedFile::new(&resolved_delete_path, delete_file.file_size_bytes as u64);
286-
if let Some(footer_size) = delete_file.footer_size {
287-
pf = pf.with_metadata_size_hint(footer_size as usize);
298+
let mut pf = PartitionedFile::new(
299+
&resolved_delete_path,
300+
validated_file_size(delete_file.file_size_bytes, &resolved_delete_path)?,
301+
);
302+
if let Some(footer_size) = delete_file.footer_size
303+
&& footer_size > 0
304+
&& let Ok(hint) = usize::try_from(footer_size)
305+
{
306+
pf = pf.with_metadata_size_hint(hint);
288307
}
289308

290309
// Create file scan config for the delete file
@@ -345,13 +364,18 @@ impl DuckLakeTable {
345364
.iter()
346365
.map(|table_file| {
347366
let resolved_path = self.resolve_file_path(&table_file.file)?;
348-
let mut pf =
349-
PartitionedFile::new(&resolved_path, table_file.file.file_size_bytes as u64);
367+
let mut pf = PartitionedFile::new(
368+
&resolved_path,
369+
validated_file_size(table_file.file.file_size_bytes, &resolved_path)?,
370+
);
350371

351372
// Apply footer size hint if available from DuckLake metadata
352373
// This reduces I/O from 2 reads to 1 read per file (especially beneficial for S3/MinIO)
353-
if let Some(footer_size) = table_file.file.footer_size {
354-
pf = pf.with_metadata_size_hint(footer_size as usize);
374+
if let Some(footer_size) = table_file.file.footer_size
375+
&& footer_size > 0
376+
&& let Ok(hint) = usize::try_from(footer_size)
377+
{
378+
pf = pf.with_metadata_size_hint(hint);
355379
}
356380

357381
Ok(pf)
@@ -424,9 +448,15 @@ impl DuckLakeTable {
424448
let resolved_path = self.resolve_file_path(&table_file.file)?;
425449

426450
// Create PartitionedFile with footer size hint if available
427-
let mut pf = PartitionedFile::new(&resolved_path, table_file.file.file_size_bytes as u64);
428-
if let Some(footer_size) = table_file.file.footer_size {
429-
pf = pf.with_metadata_size_hint(footer_size as usize);
451+
let mut pf = PartitionedFile::new(
452+
&resolved_path,
453+
validated_file_size(table_file.file.file_size_bytes, &resolved_path)?,
454+
);
455+
if let Some(footer_size) = table_file.file.footer_size
456+
&& footer_size > 0
457+
&& let Ok(hint) = usize::try_from(footer_size)
458+
{
459+
pf = pf.with_metadata_size_hint(hint);
430460
}
431461

432462
// Use read_schema (with original Parquet names) for reading
@@ -663,3 +693,42 @@ fn is_object_store_not_found(err: &DataFusionError) -> bool {
663693
}
664694
false
665695
}
696+
697+
#[cfg(test)]
698+
mod tests {
699+
use super::*;
700+
701+
#[test]
702+
fn test_validated_file_size_positive() {
703+
assert_eq!(validated_file_size(0, "test.parquet").unwrap(), 0);
704+
assert_eq!(validated_file_size(1024, "test.parquet").unwrap(), 1024);
705+
assert_eq!(
706+
validated_file_size(i64::MAX, "test.parquet").unwrap(),
707+
i64::MAX as u64
708+
);
709+
}
710+
711+
#[test]
712+
fn test_validated_file_size_negative() {
713+
let err = validated_file_size(-1, "data/test.parquet").unwrap_err();
714+
let msg = err.to_string();
715+
assert!(
716+
msg.contains("-1"),
717+
"Error should contain the negative value: {}",
718+
msg
719+
);
720+
assert!(
721+
msg.contains("data/test.parquet"),
722+
"Error should contain the file path: {}",
723+
msg
724+
);
725+
}
726+
727+
#[test]
728+
fn test_validated_file_size_large_negative() {
729+
let err = validated_file_size(i64::MIN, "bad.parquet").unwrap_err();
730+
let msg = err.to_string();
731+
assert!(msg.contains("bad.parquet"));
732+
assert!(msg.contains(&i64::MIN.to_string()));
733+
}
734+
}

src/table_changes.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use futures::Stream;
3131

3232
use crate::metadata_provider::{DataFileChange, MetadataProvider};
3333
use crate::path_resolver::resolve_path;
34+
use crate::table::validated_file_size;
3435

3536
#[cfg(feature = "encryption")]
3637
use crate::encryption::EncryptionFactoryBuilder;
@@ -443,9 +444,15 @@ impl TableChangesTable {
443444
.map_err(|e| DataFusionError::External(Box::new(e)))?;
444445

445446
// Create PartitionedFile with footer size hint if available
446-
let mut pf = PartitionedFile::new(&resolved_path, data_file.file_size_bytes as u64);
447-
if let Some(footer_size) = data_file.footer_size {
448-
pf = pf.with_metadata_size_hint(footer_size as usize);
447+
let mut pf = PartitionedFile::new(
448+
&resolved_path,
449+
validated_file_size(data_file.file_size_bytes, &resolved_path)?,
450+
);
451+
if let Some(footer_size) = data_file.footer_size
452+
&& footer_size > 0
453+
&& let Ok(hint) = usize::try_from(footer_size)
454+
{
455+
pf = pf.with_metadata_size_hint(hint);
449456
}
450457

451458
// Determine what to read from Parquet

src/table_deletions.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use futures::Stream;
3636

3737
use crate::metadata_provider::{DeleteFileChange, MetadataProvider};
3838
use crate::path_resolver::resolve_path;
39+
use crate::table::validated_file_size;
3940

4041
/// Delete file schema: (file_path: VARCHAR, pos: INT64)
4142
fn delete_file_schema() -> SchemaRef {
@@ -163,9 +164,14 @@ impl TableDeletionsTable {
163164
let resolved_path = resolve_path(&self.table_path, path, is_relative)
164165
.map_err(|e| DataFusionError::External(Box::new(e)))?;
165166

166-
let mut pf = PartitionedFile::new(&resolved_path, size_bytes as u64);
167-
if footer_size > 0 {
168-
pf = pf.with_metadata_size_hint(footer_size as usize);
167+
let mut pf = PartitionedFile::new(
168+
&resolved_path,
169+
validated_file_size(size_bytes, &resolved_path)?,
170+
);
171+
if footer_size > 0
172+
&& let Ok(hint) = usize::try_from(footer_size)
173+
{
174+
pf = pf.with_metadata_size_hint(hint);
169175
}
170176

171177
let builder = FileScanConfigBuilder::new(
@@ -185,9 +191,11 @@ impl TableDeletionsTable {
185191
size_bytes: i64,
186192
footer_size: i64,
187193
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
188-
let mut pf = PartitionedFile::new(path, size_bytes as u64);
189-
if footer_size > 0 {
190-
pf = pf.with_metadata_size_hint(footer_size as usize);
194+
let mut pf = PartitionedFile::new(path, validated_file_size(size_bytes, path)?);
195+
if footer_size > 0
196+
&& let Ok(hint) = usize::try_from(footer_size)
197+
{
198+
pf = pf.with_metadata_size_hint(hint);
191199
}
192200

193201
let builder = FileScanConfigBuilder::new(
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
#![cfg(feature = "metadata-duckdb")]
2+
//! Tests for numeric metadata validation (#58, #59)
3+
//!
4+
//! Verifies that negative file_size_bytes and footer_size values in catalog
5+
//! metadata are caught early with clear error messages instead of wrapping
6+
//! to huge values via unchecked `as u64`/`as usize` casts.
7+
8+
use std::sync::Arc;
9+
10+
use datafusion::error::Result as DataFusionResult;
11+
use datafusion::prelude::*;
12+
use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider};
13+
use tempfile::TempDir;
14+
15+
/// Creates a catalog with data, then corrupts file_size_bytes to a negative value
16+
fn create_catalog_with_negative_file_size(catalog_path: &std::path::Path) -> anyhow::Result<()> {
17+
let conn = duckdb::Connection::open_in_memory()?;
18+
conn.execute("INSTALL ducklake;", [])?;
19+
conn.execute("LOAD ducklake;", [])?;
20+
21+
let ducklake_path = format!("ducklake:{}", catalog_path.display());
22+
conn.execute(&format!("ATTACH '{}' AS test_catalog;", ducklake_path), [])?;
23+
24+
conn.execute(
25+
"CREATE TABLE test_catalog.items (id INT, name VARCHAR);",
26+
[],
27+
)?;
28+
conn.execute(
29+
"INSERT INTO test_catalog.items VALUES (1, 'Widget'), (2, 'Gadget');",
30+
[],
31+
)?;
32+
33+
// Detach the DuckLake catalog so we can tamper with the raw metadata
34+
conn.execute("DETACH test_catalog;", [])?;
35+
36+
// Now open the catalog DB directly and corrupt file_size_bytes
37+
let meta_conn = duckdb::Connection::open(catalog_path)?;
38+
meta_conn.execute("UPDATE ducklake_data_file SET file_size_bytes = -1;", [])?;
39+
40+
Ok(())
41+
}
42+
43+
/// Creates a catalog with data, then corrupts footer_size to a negative value
44+
fn create_catalog_with_negative_footer_size(catalog_path: &std::path::Path) -> anyhow::Result<()> {
45+
let conn = duckdb::Connection::open_in_memory()?;
46+
conn.execute("INSTALL ducklake;", [])?;
47+
conn.execute("LOAD ducklake;", [])?;
48+
49+
let ducklake_path = format!("ducklake:{}", catalog_path.display());
50+
conn.execute(&format!("ATTACH '{}' AS test_catalog;", ducklake_path), [])?;
51+
52+
conn.execute(
53+
"CREATE TABLE test_catalog.items (id INT, name VARCHAR);",
54+
[],
55+
)?;
56+
conn.execute(
57+
"INSERT INTO test_catalog.items VALUES (1, 'Widget'), (2, 'Gadget');",
58+
[],
59+
)?;
60+
61+
// Detach the DuckLake catalog so we can tamper with the raw metadata
62+
conn.execute("DETACH test_catalog;", [])?;
63+
64+
// Now open the catalog DB directly and set footer_size to negative
65+
let meta_conn = duckdb::Connection::open(catalog_path)?;
66+
meta_conn.execute("UPDATE ducklake_data_file SET footer_size = -42;", [])?;
67+
68+
Ok(())
69+
}
70+
71+
fn create_catalog(path: &str) -> DataFusionResult<Arc<DuckLakeCatalog>> {
72+
let provider = DuckdbMetadataProvider::new(path)
73+
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
74+
let catalog = DuckLakeCatalog::new(provider)
75+
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
76+
Ok(Arc::new(catalog))
77+
}
78+
79+
#[tokio::test]
80+
async fn test_negative_file_size_produces_clear_error() -> DataFusionResult<()> {
81+
let temp_dir =
82+
TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
83+
let catalog_path = temp_dir.path().join("neg_size.ducklake");
84+
85+
create_catalog_with_negative_file_size(&catalog_path)
86+
.map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
87+
88+
let catalog = create_catalog(&catalog_path.to_string_lossy())?;
89+
let ctx = SessionContext::new();
90+
ctx.register_catalog("ducklake", catalog);
91+
92+
let result = ctx
93+
.sql("SELECT * FROM ducklake.main.items")
94+
.await?
95+
.collect()
96+
.await;
97+
98+
assert!(
99+
result.is_err(),
100+
"Query should fail with negative file_size_bytes"
101+
);
102+
let err_msg = result.unwrap_err().to_string();
103+
assert!(
104+
err_msg.contains("Invalid file_size_bytes"),
105+
"Error should mention invalid file_size_bytes, got: {}",
106+
err_msg
107+
);
108+
assert!(
109+
err_msg.contains("-1"),
110+
"Error should contain the negative value, got: {}",
111+
err_msg
112+
);
113+
114+
Ok(())
115+
}
116+
117+
#[tokio::test]
118+
async fn test_negative_footer_size_is_gracefully_skipped() -> DataFusionResult<()> {
119+
let temp_dir =
120+
TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
121+
let catalog_path = temp_dir.path().join("neg_footer.ducklake");
122+
123+
create_catalog_with_negative_footer_size(&catalog_path)
124+
.map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
125+
126+
let catalog = create_catalog(&catalog_path.to_string_lossy())?;
127+
let ctx = SessionContext::new();
128+
ctx.register_catalog("ducklake", catalog);
129+
130+
// Negative footer_size should be skipped (not used as hint), query should succeed
131+
let df = ctx.sql("SELECT * FROM ducklake.main.items").await?;
132+
let batches = df.collect().await?;
133+
134+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
135+
assert_eq!(
136+
total_rows, 2,
137+
"Should still return all rows when footer_size is negative"
138+
);
139+
140+
Ok(())
141+
}

0 commit comments

Comments
 (0)