Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 80 additions & 11 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ use datafusion::execution::parquet_encryption::EncryptionFactory;
pub const DELETE_FILE_PATH_COL: &str = "file_path";
pub const DELETE_POS_COL: &str = "pos";

/// Validate and convert file_size_bytes from i64 (as stored in DuckLake metadata) to u64.
///
/// DuckLake stores file sizes as signed integers in SQL. A negative value indicates
/// corrupt or invalid metadata. Without this check, a negative i64 cast to u64 would
/// wrap to a huge value (e.g., -1 becomes u64::MAX), causing confusing downstream errors.
pub(crate) fn validated_file_size(file_size_bytes: i64, file_path: &str) -> DataFusionResult<u64> {
u64::try_from(file_size_bytes).map_err(|_| {
DataFusionError::Execution(format!(
"Invalid file_size_bytes ({}) for file '{}': value must be non-negative",
file_size_bytes, file_path
))
})
}

/// Returns the expected schema for DuckLake delete files
///
/// Delete files have a standard schema: (file_path: VARCHAR, pos: INT64)
Expand Down Expand Up @@ -281,10 +295,15 @@ impl DuckLakeTable {
let resolved_delete_path = self.resolve_file_path(delete_file)?;

// Create PartitionedFile with footer size hint if available
let mut pf =
PartitionedFile::new(&resolved_delete_path, delete_file.file_size_bytes as u64);
if let Some(footer_size) = delete_file.footer_size {
pf = pf.with_metadata_size_hint(footer_size as usize);
let mut pf = PartitionedFile::new(
&resolved_delete_path,
validated_file_size(delete_file.file_size_bytes, &resolved_delete_path)?,
);
if let Some(footer_size) = delete_file.footer_size
&& footer_size > 0
&& let Ok(hint) = usize::try_from(footer_size)
{
pf = pf.with_metadata_size_hint(hint);
}

// Create file scan config for the delete file
Expand Down Expand Up @@ -345,13 +364,18 @@ impl DuckLakeTable {
.iter()
.map(|table_file| {
let resolved_path = self.resolve_file_path(&table_file.file)?;
let mut pf =
PartitionedFile::new(&resolved_path, table_file.file.file_size_bytes as u64);
let mut pf = PartitionedFile::new(
&resolved_path,
validated_file_size(table_file.file.file_size_bytes, &resolved_path)?,
);

// Apply footer size hint if available from DuckLake metadata
// This reduces I/O from 2 reads to 1 read per file (especially beneficial for S3/MinIO)
if let Some(footer_size) = table_file.file.footer_size {
pf = pf.with_metadata_size_hint(footer_size as usize);
if let Some(footer_size) = table_file.file.footer_size
&& footer_size > 0
&& let Ok(hint) = usize::try_from(footer_size)
{
pf = pf.with_metadata_size_hint(hint);
}

Ok(pf)
Expand Down Expand Up @@ -424,9 +448,15 @@ impl DuckLakeTable {
let resolved_path = self.resolve_file_path(&table_file.file)?;

// Create PartitionedFile with footer size hint if available
let mut pf = PartitionedFile::new(&resolved_path, table_file.file.file_size_bytes as u64);
if let Some(footer_size) = table_file.file.footer_size {
pf = pf.with_metadata_size_hint(footer_size as usize);
let mut pf = PartitionedFile::new(
&resolved_path,
validated_file_size(table_file.file.file_size_bytes, &resolved_path)?,
);
if let Some(footer_size) = table_file.file.footer_size
&& footer_size > 0
&& let Ok(hint) = usize::try_from(footer_size)
{
pf = pf.with_metadata_size_hint(hint);
}

// Use read_schema (with original Parquet names) for reading
Expand Down Expand Up @@ -663,3 +693,42 @@ fn is_object_store_not_found(err: &DataFusionError) -> bool {
}
false
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_validated_file_size_positive() {
assert_eq!(validated_file_size(0, "test.parquet").unwrap(), 0);
assert_eq!(validated_file_size(1024, "test.parquet").unwrap(), 1024);
assert_eq!(
validated_file_size(i64::MAX, "test.parquet").unwrap(),
i64::MAX as u64
);
}

#[test]
fn test_validated_file_size_negative() {
let err = validated_file_size(-1, "data/test.parquet").unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("-1"),
"Error should contain the negative value: {}",
msg
);
assert!(
msg.contains("data/test.parquet"),
"Error should contain the file path: {}",
msg
);
}

#[test]
fn test_validated_file_size_large_negative() {
let err = validated_file_size(i64::MIN, "bad.parquet").unwrap_err();
let msg = err.to_string();
assert!(msg.contains("bad.parquet"));
assert!(msg.contains(&i64::MIN.to_string()));
}
}
13 changes: 10 additions & 3 deletions src/table_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use futures::Stream;

use crate::metadata_provider::{DataFileChange, MetadataProvider};
use crate::path_resolver::resolve_path;
use crate::table::validated_file_size;

#[cfg(feature = "encryption")]
use crate::encryption::EncryptionFactoryBuilder;
Expand Down Expand Up @@ -443,9 +444,15 @@ impl TableChangesTable {
.map_err(|e| DataFusionError::External(Box::new(e)))?;

// Create PartitionedFile with footer size hint if available
let mut pf = PartitionedFile::new(&resolved_path, data_file.file_size_bytes as u64);
if let Some(footer_size) = data_file.footer_size {
pf = pf.with_metadata_size_hint(footer_size as usize);
let mut pf = PartitionedFile::new(
&resolved_path,
validated_file_size(data_file.file_size_bytes, &resolved_path)?,
);
if let Some(footer_size) = data_file.footer_size
&& footer_size > 0
&& let Ok(hint) = usize::try_from(footer_size)
{
pf = pf.with_metadata_size_hint(hint);
}

// Determine what to read from Parquet
Expand Down
20 changes: 14 additions & 6 deletions src/table_deletions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use futures::Stream;

use crate::metadata_provider::{DeleteFileChange, MetadataProvider};
use crate::path_resolver::resolve_path;
use crate::table::validated_file_size;

/// Delete file schema: (file_path: VARCHAR, pos: INT64)
fn delete_file_schema() -> SchemaRef {
Expand Down Expand Up @@ -163,9 +164,14 @@ impl TableDeletionsTable {
let resolved_path = resolve_path(&self.table_path, path, is_relative)
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let mut pf = PartitionedFile::new(&resolved_path, size_bytes as u64);
if footer_size > 0 {
pf = pf.with_metadata_size_hint(footer_size as usize);
let mut pf = PartitionedFile::new(
&resolved_path,
validated_file_size(size_bytes, &resolved_path)?,
);
if footer_size > 0
&& let Ok(hint) = usize::try_from(footer_size)
{
pf = pf.with_metadata_size_hint(hint);
}

let builder = FileScanConfigBuilder::new(
Expand All @@ -185,9 +191,11 @@ impl TableDeletionsTable {
size_bytes: i64,
footer_size: i64,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let mut pf = PartitionedFile::new(path, size_bytes as u64);
if footer_size > 0 {
pf = pf.with_metadata_size_hint(footer_size as usize);
let mut pf = PartitionedFile::new(path, validated_file_size(size_bytes, path)?);
if footer_size > 0
&& let Ok(hint) = usize::try_from(footer_size)
{
pf = pf.with_metadata_size_hint(hint);
}

let builder = FileScanConfigBuilder::new(
Expand Down
141 changes: 141 additions & 0 deletions tests/numeric_metadata_validation_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#![cfg(feature = "metadata-duckdb")]
//! Tests for numeric metadata validation (#58, #59)
//!
//! Verifies that negative file_size_bytes and footer_size values in catalog
//! metadata are caught early with clear error messages instead of wrapping
//! to huge values via unchecked `as u64`/`as usize` casts.

use std::sync::Arc;

use datafusion::error::Result as DataFusionResult;
use datafusion::prelude::*;
use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider};
use tempfile::TempDir;

/// Creates a catalog with data, then corrupts file_size_bytes to a negative value
fn create_catalog_with_negative_file_size(catalog_path: &std::path::Path) -> anyhow::Result<()> {
let conn = duckdb::Connection::open_in_memory()?;
conn.execute("INSTALL ducklake;", [])?;
conn.execute("LOAD ducklake;", [])?;

let ducklake_path = format!("ducklake:{}", catalog_path.display());
conn.execute(&format!("ATTACH '{}' AS test_catalog;", ducklake_path), [])?;

conn.execute(
"CREATE TABLE test_catalog.items (id INT, name VARCHAR);",
[],
)?;
conn.execute(
"INSERT INTO test_catalog.items VALUES (1, 'Widget'), (2, 'Gadget');",
[],
)?;

// Detach the DuckLake catalog so we can tamper with the raw metadata
conn.execute("DETACH test_catalog;", [])?;

// Now open the catalog DB directly and corrupt file_size_bytes
let meta_conn = duckdb::Connection::open(catalog_path)?;
meta_conn.execute("UPDATE ducklake_data_file SET file_size_bytes = -1;", [])?;

Ok(())
}

/// Creates a catalog with data, then corrupts footer_size to a negative value
fn create_catalog_with_negative_footer_size(catalog_path: &std::path::Path) -> anyhow::Result<()> {
let conn = duckdb::Connection::open_in_memory()?;
conn.execute("INSTALL ducklake;", [])?;
conn.execute("LOAD ducklake;", [])?;

let ducklake_path = format!("ducklake:{}", catalog_path.display());
conn.execute(&format!("ATTACH '{}' AS test_catalog;", ducklake_path), [])?;

conn.execute(
"CREATE TABLE test_catalog.items (id INT, name VARCHAR);",
[],
)?;
conn.execute(
"INSERT INTO test_catalog.items VALUES (1, 'Widget'), (2, 'Gadget');",
[],
)?;

// Detach the DuckLake catalog so we can tamper with the raw metadata
conn.execute("DETACH test_catalog;", [])?;

// Now open the catalog DB directly and set footer_size to negative
let meta_conn = duckdb::Connection::open(catalog_path)?;
meta_conn.execute("UPDATE ducklake_data_file SET footer_size = -42;", [])?;

Ok(())
}

fn create_catalog(path: &str) -> DataFusionResult<Arc<DuckLakeCatalog>> {
let provider = DuckdbMetadataProvider::new(path)
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
let catalog = DuckLakeCatalog::new(provider)
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
Ok(Arc::new(catalog))
}

#[tokio::test]
async fn test_negative_file_size_produces_clear_error() -> DataFusionResult<()> {
let temp_dir =
TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
let catalog_path = temp_dir.path().join("neg_size.ducklake");

create_catalog_with_negative_file_size(&catalog_path)
.map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;

let catalog = create_catalog(&catalog_path.to_string_lossy())?;
let ctx = SessionContext::new();
ctx.register_catalog("ducklake", catalog);

let result = ctx
.sql("SELECT * FROM ducklake.main.items")
.await?
.collect()
.await;

assert!(
result.is_err(),
"Query should fail with negative file_size_bytes"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Invalid file_size_bytes"),
"Error should mention invalid file_size_bytes, got: {}",
err_msg
);
assert!(
err_msg.contains("-1"),
"Error should contain the negative value, got: {}",
err_msg
);

Ok(())
}

#[tokio::test]
async fn test_negative_footer_size_is_gracefully_skipped() -> DataFusionResult<()> {
let temp_dir =
TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
let catalog_path = temp_dir.path().join("neg_footer.ducklake");

create_catalog_with_negative_footer_size(&catalog_path)
.map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;

let catalog = create_catalog(&catalog_path.to_string_lossy())?;
let ctx = SessionContext::new();
ctx.register_catalog("ducklake", catalog);

// Negative footer_size should be skipped (not used as hint), query should succeed
let df = ctx.sql("SELECT * FROM ducklake.main.items").await?;
let batches = df.collect().await?;

let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total_rows, 2,
"Should still return all rows when footer_size is negative"
);

Ok(())
}
Loading