Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/metadata_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,18 @@ pub struct DataFileInfo {

impl DataFileInfo {
/// Create a new data file info with relative path.
///
/// # Panics
///
/// Panics if `record_count` is negative. Record counts originate from
/// `RecordBatch::num_rows()` (always non-negative), so a negative value
/// indicates a programming error.
pub fn new(path: impl Into<String>, file_size_bytes: i64, record_count: i64) -> Self {
assert!(
record_count >= 0,
"record_count must be non-negative, got {}",
record_count
);
Self {
path: path.into(),
path_is_relative: true,
Expand Down Expand Up @@ -300,4 +311,16 @@ mod tests {
let file = DataFileInfo::new("/absolute/path.parquet", 1024, 100).with_absolute_path();
assert!(!file.path_is_relative);
}

#[test]
fn test_data_file_info_zero_record_count() {
let file = DataFileInfo::new("empty.parquet", 0, 0);
assert_eq!(file.record_count, 0);
}

#[test]
#[should_panic(expected = "record_count must be non-negative")]
fn test_data_file_info_negative_record_count_panics() {
DataFileInfo::new("test.parquet", 1024, -1);
}
}
53 changes: 53 additions & 0 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,20 @@ pub(crate) fn validated_file_size(file_size_bytes: i64, file_path: &str) -> Data
})
}

/// Validate and convert record_count from i64 (as stored in DuckLake metadata) to u64.
///
/// DuckLake stores record counts as signed integers in SQL. A negative value indicates
/// corrupt or invalid metadata. Without this check, a negative record_count would cause
/// incorrect behavior (e.g., empty ranges in full-file deletes, or incorrect row filtering).
pub(crate) fn validated_record_count(record_count: i64, file_path: &str) -> DataFusionResult<u64> {
u64::try_from(record_count).map_err(|_| {
DataFusionError::Execution(format!(
"Invalid record_count ({}) for file '{}': value must be non-negative",
record_count, file_path
))
})
}

/// Returns the expected schema for DuckLake delete files
///
/// Delete files have a standard schema: (file_path: VARCHAR, pos: INT64)
Expand Down Expand Up @@ -731,4 +745,43 @@ mod tests {
assert!(msg.contains("bad.parquet"));
assert!(msg.contains(&i64::MIN.to_string()));
}

#[test]
fn test_validated_record_count_positive() {
assert_eq!(validated_record_count(0, "test.parquet").unwrap(), 0);
assert_eq!(validated_record_count(100, "test.parquet").unwrap(), 100);
assert_eq!(
validated_record_count(i64::MAX, "test.parquet").unwrap(),
i64::MAX as u64
);
}

#[test]
fn test_validated_record_count_negative() {
let err = validated_record_count(-1, "data/test.parquet").unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("-1"),
"Error should contain the negative value: {}",
msg
);
assert!(
msg.contains("data/test.parquet"),
"Error should contain the file path: {}",
msg
);
assert!(
msg.contains("record_count"),
"Error should mention record_count: {}",
msg
);
}

#[test]
fn test_validated_record_count_large_negative() {
let err = validated_record_count(i64::MIN, "bad.parquet").unwrap_err();
let msg = err.to_string();
assert!(msg.contains("bad.parquet"));
assert!(msg.contains(&i64::MIN.to_string()));
}
}
6 changes: 5 additions & 1 deletion src/table_deletions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use futures::Stream;

use crate::metadata_provider::{DeleteFileChange, MetadataProvider};
use crate::path_resolver::resolve_path;
use crate::table::validated_file_size;
use crate::table::{validated_file_size, validated_record_count};

/// Delete file schema: (file_path: VARCHAR, pos: INT64)
fn delete_file_schema() -> SchemaRef {
Expand Down Expand Up @@ -143,6 +143,10 @@ impl TableDeletionsTable {
delete_file.data_file_footer_size,
)?;

// Validate record_count before use — a negative value from corrupt metadata
// would cause incorrect behavior (e.g., empty ranges in full-file deletes).
validated_record_count(delete_file.data_record_count, &delete_file.data_file_path)?;

Ok(Arc::new(DeletedRowsExec::new(
current_delete_exec,
previous_delete_exec,
Expand Down
7 changes: 6 additions & 1 deletion tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ use std::sync::Once;

static INSTALL_DUCKLAKE: Once = Once::new();

/// Install the ducklake extension exactly once (thread-safe across parallel tests).
/// Install the ducklake and parquet extensions exactly once (thread-safe across parallel tests).
///
/// Pre-installing parquet avoids race conditions when multiple concurrent tests
/// trigger DuckDB's auto-install simultaneously (observed on macOS CI).
fn ensure_ducklake_installed() {
INSTALL_DUCKLAKE.call_once(|| {
let conn = duckdb::Connection::open_in_memory().expect("open in-memory duckdb");
conn.execute("INSTALL ducklake;", [])
.expect("install ducklake extension");
conn.execute("INSTALL parquet;", [])
.expect("install parquet extension");
});
}

Expand Down
Loading