diff --git a/src/table.rs b/src/table.rs index 4c4ea93..ed8870a 100644 --- a/src/table.rs +++ b/src/table.rs @@ -49,6 +49,20 @@ use datafusion::execution::parquet_encryption::EncryptionFactory; pub const DELETE_FILE_PATH_COL: &str = "file_path"; pub const DELETE_POS_COL: &str = "pos"; +/// Validate and convert file_size_bytes from i64 (as stored in DuckLake metadata) to u64. +/// +/// DuckLake stores file sizes as signed integers in SQL. A negative value indicates +/// corrupt or invalid metadata. Without this check, a negative i64 cast to u64 would +/// wrap to a huge value (e.g., -1 becomes u64::MAX), causing confusing downstream errors. +pub(crate) fn validated_file_size(file_size_bytes: i64, file_path: &str) -> DataFusionResult { + u64::try_from(file_size_bytes).map_err(|_| { + DataFusionError::Execution(format!( + "Invalid file_size_bytes ({}) for file '{}': value must be non-negative", + file_size_bytes, file_path + )) + }) +} + /// Returns the expected schema for DuckLake delete files /// /// Delete files have a standard schema: (file_path: VARCHAR, pos: INT64) @@ -281,10 +295,15 @@ impl DuckLakeTable { let resolved_delete_path = self.resolve_file_path(delete_file)?; // Create PartitionedFile with footer size hint if available - let mut pf = - PartitionedFile::new(&resolved_delete_path, delete_file.file_size_bytes as u64); - if let Some(footer_size) = delete_file.footer_size { - pf = pf.with_metadata_size_hint(footer_size as usize); + let mut pf = PartitionedFile::new( + &resolved_delete_path, + validated_file_size(delete_file.file_size_bytes, &resolved_delete_path)?, + ); + if let Some(footer_size) = delete_file.footer_size + && footer_size > 0 + && let Ok(hint) = usize::try_from(footer_size) + { + pf = pf.with_metadata_size_hint(hint); } // Create file scan config for the delete file @@ -345,13 +364,18 @@ impl DuckLakeTable { .iter() .map(|table_file| { let resolved_path = self.resolve_file_path(&table_file.file)?; - let mut pf = - PartitionedFile::new(&resolved_path, table_file.file.file_size_bytes as u64); + let mut pf = PartitionedFile::new( + &resolved_path, + validated_file_size(table_file.file.file_size_bytes, &resolved_path)?, + ); // Apply footer size hint if available from DuckLake metadata // This reduces I/O from 2 reads to 1 read per file (especially beneficial for S3/MinIO) - if let Some(footer_size) = table_file.file.footer_size { - pf = pf.with_metadata_size_hint(footer_size as usize); + if let Some(footer_size) = table_file.file.footer_size + && footer_size > 0 + && let Ok(hint) = usize::try_from(footer_size) + { + pf = pf.with_metadata_size_hint(hint); } Ok(pf) @@ -424,9 +448,15 @@ impl DuckLakeTable { let resolved_path = self.resolve_file_path(&table_file.file)?; // Create PartitionedFile with footer size hint if available - let mut pf = PartitionedFile::new(&resolved_path, table_file.file.file_size_bytes as u64); - if let Some(footer_size) = table_file.file.footer_size { - pf = pf.with_metadata_size_hint(footer_size as usize); + let mut pf = PartitionedFile::new( + &resolved_path, + validated_file_size(table_file.file.file_size_bytes, &resolved_path)?, + ); + if let Some(footer_size) = table_file.file.footer_size + && footer_size > 0 + && let Ok(hint) = usize::try_from(footer_size) + { + pf = pf.with_metadata_size_hint(hint); } // Use read_schema (with original Parquet names) for reading @@ -663,3 +693,42 @@ fn is_object_store_not_found(err: &DataFusionError) -> bool { } false } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_validated_file_size_positive() { + assert_eq!(validated_file_size(0, "test.parquet").unwrap(), 0); + assert_eq!(validated_file_size(1024, "test.parquet").unwrap(), 1024); + assert_eq!( + validated_file_size(i64::MAX, "test.parquet").unwrap(), + i64::MAX as u64 + ); + } + + #[test] + fn test_validated_file_size_negative() { + let err = validated_file_size(-1, "data/test.parquet").unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("-1"), + "Error should contain the negative value: {}", + msg + ); + assert!( + msg.contains("data/test.parquet"), + "Error should contain the file path: {}", + msg + ); + } + + #[test] + fn test_validated_file_size_large_negative() { + let err = validated_file_size(i64::MIN, "bad.parquet").unwrap_err(); + let msg = err.to_string(); + assert!(msg.contains("bad.parquet")); + assert!(msg.contains(&i64::MIN.to_string())); + } +} diff --git a/src/table_changes.rs b/src/table_changes.rs index e05f45e..22886d2 100644 --- a/src/table_changes.rs +++ b/src/table_changes.rs @@ -31,6 +31,7 @@ use futures::Stream; use crate::metadata_provider::{DataFileChange, MetadataProvider}; use crate::path_resolver::resolve_path; +use crate::table::validated_file_size; #[cfg(feature = "encryption")] use crate::encryption::EncryptionFactoryBuilder; @@ -443,9 +444,15 @@ impl TableChangesTable { .map_err(|e| DataFusionError::External(Box::new(e)))?; // Create PartitionedFile with footer size hint if available - let mut pf = PartitionedFile::new(&resolved_path, data_file.file_size_bytes as u64); - if let Some(footer_size) = data_file.footer_size { - pf = pf.with_metadata_size_hint(footer_size as usize); + let mut pf = PartitionedFile::new( + &resolved_path, + validated_file_size(data_file.file_size_bytes, &resolved_path)?, + ); + if let Some(footer_size) = data_file.footer_size + && footer_size > 0 + && let Ok(hint) = usize::try_from(footer_size) + { + pf = pf.with_metadata_size_hint(hint); } // Determine what to read from Parquet diff --git a/src/table_deletions.rs b/src/table_deletions.rs index 9a330d4..9c3b6a4 100644 --- a/src/table_deletions.rs +++ b/src/table_deletions.rs @@ -36,6 +36,7 @@ use futures::Stream; use crate::metadata_provider::{DeleteFileChange, MetadataProvider}; use crate::path_resolver::resolve_path; +use crate::table::validated_file_size; /// Delete file schema: (file_path: VARCHAR, pos: INT64) fn delete_file_schema() -> SchemaRef { @@ -163,9 +164,14 @@ impl TableDeletionsTable { let resolved_path = resolve_path(&self.table_path, path, is_relative) .map_err(|e| DataFusionError::External(Box::new(e)))?; - let mut pf = PartitionedFile::new(&resolved_path, size_bytes as u64); - if footer_size > 0 { - pf = pf.with_metadata_size_hint(footer_size as usize); + let mut pf = PartitionedFile::new( + &resolved_path, + validated_file_size(size_bytes, &resolved_path)?, + ); + if footer_size > 0 + && let Ok(hint) = usize::try_from(footer_size) + { + pf = pf.with_metadata_size_hint(hint); } let builder = FileScanConfigBuilder::new( @@ -185,9 +191,11 @@ impl TableDeletionsTable { size_bytes: i64, footer_size: i64, ) -> DataFusionResult> { - let mut pf = PartitionedFile::new(path, size_bytes as u64); - if footer_size > 0 { - pf = pf.with_metadata_size_hint(footer_size as usize); + let mut pf = PartitionedFile::new(path, validated_file_size(size_bytes, path)?); + if footer_size > 0 + && let Ok(hint) = usize::try_from(footer_size) + { + pf = pf.with_metadata_size_hint(hint); } let builder = FileScanConfigBuilder::new( diff --git a/tests/numeric_metadata_validation_tests.rs b/tests/numeric_metadata_validation_tests.rs new file mode 100644 index 0000000..bd72152 --- /dev/null +++ b/tests/numeric_metadata_validation_tests.rs @@ -0,0 +1,141 @@ +#![cfg(feature = "metadata-duckdb")] +//! Tests for numeric metadata validation (#58, #59) +//! +//! Verifies that negative file_size_bytes and footer_size values in catalog +//! metadata are caught early with clear error messages instead of wrapping +//! to huge values via unchecked `as u64`/`as usize` casts. + +use std::sync::Arc; + +use datafusion::error::Result as DataFusionResult; +use datafusion::prelude::*; +use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider}; +use tempfile::TempDir; + +/// Creates a catalog with data, then corrupts file_size_bytes to a negative value +fn create_catalog_with_negative_file_size(catalog_path: &std::path::Path) -> anyhow::Result<()> { + let conn = duckdb::Connection::open_in_memory()?; + conn.execute("INSTALL ducklake;", [])?; + conn.execute("LOAD ducklake;", [])?; + + let ducklake_path = format!("ducklake:{}", catalog_path.display()); + conn.execute(&format!("ATTACH '{}' AS test_catalog;", ducklake_path), [])?; + + conn.execute( + "CREATE TABLE test_catalog.items (id INT, name VARCHAR);", + [], + )?; + conn.execute( + "INSERT INTO test_catalog.items VALUES (1, 'Widget'), (2, 'Gadget');", + [], + )?; + + // Detach the DuckLake catalog so we can tamper with the raw metadata + conn.execute("DETACH test_catalog;", [])?; + + // Now open the catalog DB directly and corrupt file_size_bytes + let meta_conn = duckdb::Connection::open(catalog_path)?; + meta_conn.execute("UPDATE ducklake_data_file SET file_size_bytes = -1;", [])?; + + Ok(()) +} + +/// Creates a catalog with data, then corrupts footer_size to a negative value +fn create_catalog_with_negative_footer_size(catalog_path: &std::path::Path) -> anyhow::Result<()> { + let conn = duckdb::Connection::open_in_memory()?; + conn.execute("INSTALL ducklake;", [])?; + conn.execute("LOAD ducklake;", [])?; + + let ducklake_path = format!("ducklake:{}", catalog_path.display()); + conn.execute(&format!("ATTACH '{}' AS test_catalog;", ducklake_path), [])?; + + conn.execute( + "CREATE TABLE test_catalog.items (id INT, name VARCHAR);", + [], + )?; + conn.execute( + "INSERT INTO test_catalog.items VALUES (1, 'Widget'), (2, 'Gadget');", + [], + )?; + + // Detach the DuckLake catalog so we can tamper with the raw metadata + conn.execute("DETACH test_catalog;", [])?; + + // Now open the catalog DB directly and set footer_size to negative + let meta_conn = duckdb::Connection::open(catalog_path)?; + meta_conn.execute("UPDATE ducklake_data_file SET footer_size = -42;", [])?; + + Ok(()) +} + +fn create_catalog(path: &str) -> DataFusionResult> { + let provider = DuckdbMetadataProvider::new(path) + .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; + let catalog = DuckLakeCatalog::new(provider) + .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; + Ok(Arc::new(catalog)) +} + +#[tokio::test] +async fn test_negative_file_size_produces_clear_error() -> DataFusionResult<()> { + let temp_dir = + TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; + let catalog_path = temp_dir.path().join("neg_size.ducklake"); + + create_catalog_with_negative_file_size(&catalog_path) + .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?; + + let catalog = create_catalog(&catalog_path.to_string_lossy())?; + let ctx = SessionContext::new(); + ctx.register_catalog("ducklake", catalog); + + let result = ctx + .sql("SELECT * FROM ducklake.main.items") + .await? + .collect() + .await; + + assert!( + result.is_err(), + "Query should fail with negative file_size_bytes" + ); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Invalid file_size_bytes"), + "Error should mention invalid file_size_bytes, got: {}", + err_msg + ); + assert!( + err_msg.contains("-1"), + "Error should contain the negative value, got: {}", + err_msg + ); + + Ok(()) +} + +#[tokio::test] +async fn test_negative_footer_size_is_gracefully_skipped() -> DataFusionResult<()> { + let temp_dir = + TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; + let catalog_path = temp_dir.path().join("neg_footer.ducklake"); + + create_catalog_with_negative_footer_size(&catalog_path) + .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?; + + let catalog = create_catalog(&catalog_path.to_string_lossy())?; + let ctx = SessionContext::new(); + ctx.register_catalog("ducklake", catalog); + + // Negative footer_size should be skipped (not used as hint), query should succeed + let df = ctx.sql("SELECT * FROM ducklake.main.items").await?; + let batches = df.collect().await?; + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total_rows, 2, + "Should still return all rows when footer_size is negative" + ); + + Ok(()) +}