diff --git a/src/table.rs b/src/table.rs index 2550710..f9dfc2e 100644 --- a/src/table.rs +++ b/src/table.rs @@ -27,6 +27,8 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::catalog::{Session, TableProvider}; +use datafusion::common::Statistics; +use datafusion::common::stats::Precision; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource}; use datafusion::datasource::source::DataSourceExec; @@ -539,6 +541,46 @@ impl TableProvider for DuckLakeTable { TableType::Base } + fn statistics(&self) -> Option { + // Aggregate per-file byte sizes from the cached `table_files`. Mirrors + // DuckLake's own `ducklake_table_info` aggregate exactly: + // + // total_byte_size == SUM(data_file.file_size_bytes) + // - SUM(delete_file.file_size_bytes) + // + // The values come from the ducklake catalog, so this is the same + // source of truth `ducklake_table_info` uses — no extra round trips + // and the numbers will match byte-for-byte. + // + // Marked `Precision::Inexact` because DataFusion documents + // `total_byte_size` as the *uncompressed Arrow output* size, while + // the catalog tracks *compressed parquet* bytes. For wide + // column types (List(Float64) embeddings) the two are nearly + // identical; for narrow scalar schemas the on-disk number is 3-5x + // smaller than Arrow output. Reporting compressed bytes Inexact + // gives consumers a useful lower-bound estimate without misleading + // the optimiser into thinking it's exact Arrow size. When + // `record_count` is plumbed through `DuckLakeFileData`, a follow-up + // can populate `num_rows` and use `calculate_total_byte_size` for a + // closer Arrow-side estimate. + let data_bytes: i64 = self + .table_files + .iter() + .map(|f| f.file.file_size_bytes) + .sum(); + let delete_bytes: i64 = self + .table_files + .iter() + .filter_map(|f| f.delete_file.as_ref()) + .map(|df| df.file_size_bytes) + .sum(); + let net_bytes = (data_bytes - delete_bytes).max(0) as usize; + + let mut stats = Statistics::new_unknown(&self.schema); + stats.total_byte_size = Precision::Inexact(net_bytes); + Some(stats) + } + fn supports_filters_pushdown( &self, filters: &[&Expr], diff --git a/tests/table_tests.rs b/tests/table_tests.rs index db0799b..43bfa1d 100644 --- a/tests/table_tests.rs +++ b/tests/table_tests.rs @@ -6,8 +6,10 @@ use std::sync::Arc; use arrow::array::Int64Array; +use datafusion::common::stats::Precision; use datafusion::error::Result as DataFusionResult; use datafusion::prelude::*; +use datafusion_ducklake::metadata_provider::MetadataProvider; use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider}; use tempfile::TempDir; @@ -157,3 +159,122 @@ async fn test_empty_table_aggregate() -> DataFusionResult<()> { Ok(()) } + +/// Creates a catalog with a table populated with rows so DuckLake writes a real +/// data file. Used to validate that `DuckLakeTable::statistics()` agrees with +/// the catalog's per-file sizes. +fn create_populated_table_catalog(catalog_path: &std::path::Path) -> anyhow::Result<()> { + let conn = duckdb::Connection::open_in_memory()?; + conn.execute("INSTALL ducklake;", [])?; + conn.execute("LOAD ducklake;", [])?; + + let data_dir = catalog_path.with_extension("ducklake.files"); + std::fs::create_dir_all(&data_dir)?; + + let ducklake_path = format!("ducklake:{}", catalog_path.display()); + conn.execute(&format!("ATTACH '{}' AS test_catalog;", ducklake_path), [])?; + conn.execute("CREATE TABLE test_catalog.tbl (a INTEGER, b VARCHAR);", [])?; + // Insert enough rows that DuckLake actually emits a data file. + conn.execute( + "INSERT INTO test_catalog.tbl SELECT i, repeat('x', 100) FROM range(0, 1000) t(i);", + [], + )?; + Ok(()) +} + +/// Validates that `DuckLakeTable::statistics()` returns the same byte total as +/// directly summing `file_size_bytes - delete_file_size_bytes` from the +/// catalog's per-file metadata — i.e. the same aggregate `ducklake_table_info` +/// produces. +#[tokio::test] +async fn test_statistics_total_byte_size_matches_catalog_aggregate() -> DataFusionResult<()> { + let temp_dir = + TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; + let catalog_path = temp_dir.path().join("stats.ducklake"); + create_populated_table_catalog(&catalog_path) + .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?; + + let catalog = create_catalog(&catalog_path.to_string_lossy())?; + let schema = datafusion::catalog::CatalogProvider::schema(catalog.as_ref(), "main") + .expect("main schema exists"); + let table = schema + .table("tbl") + .await + .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))? + .expect("tbl present in main schema"); + + // What our impl reports. + let stats = table.statistics().expect("statistics() returned None"); + let our_bytes = match stats.total_byte_size { + Precision::Exact(b) | Precision::Inexact(b) => b as i64, + Precision::Absent => { + panic!("total_byte_size was Absent for a populated table") + }, + }; + + // Canonical: sum file_size_bytes - delete_file_size_bytes directly from + // the catalog at the latest snapshot. Same aggregate ducklake_table_info + // computes; same source rows our statistics() impl reads. + let provider = DuckdbMetadataProvider::new(catalog_path.to_string_lossy().to_string()) + .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; + let snapshot_id = provider + .get_current_snapshot() + .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; + let schema_meta = provider + .get_schema_by_name("main", snapshot_id) + .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))? + .expect("main schema metadata"); + let table_meta = provider + .get_table_by_name(schema_meta.schema_id, "tbl", snapshot_id) + .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))? + .expect("tbl metadata"); + let files = provider + .get_table_files_for_select(table_meta.table_id, snapshot_id) + .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; + let canonical_bytes: i64 = files + .iter() + .map(|f| { + let data = f.file.file_size_bytes; + let dels = f.delete_file.as_ref().map_or(0, |d| d.file_size_bytes); + data - dels + }) + .sum(); + + assert!( + our_bytes > 0, + "expected populated table to report non-zero bytes, got {}", + our_bytes + ); + assert_eq!( + our_bytes, canonical_bytes, + "statistics().total_byte_size must equal SUM(file_size) - SUM(delete_file_size) \ + from the catalog (our_bytes={}, canonical={})", + our_bytes, canonical_bytes + ); + + // Hold temp_dir to outlive the catalog handle. + std::mem::forget(temp_dir); + Ok(()) +} + +/// Empty tables — no data files yet — should still return Statistics with +/// total_byte_size == 0, not Absent or None. +#[tokio::test] +async fn test_statistics_zero_for_empty_table() -> DataFusionResult<()> { + let ctx = setup_empty_table_context("stats_empty").await?; + let cat = ctx + .catalog("ducklake") + .expect("ducklake catalog registered"); + let schema = cat.schema("main").expect("main schema"); + let table = schema + .table("tbl") + .await + .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))? + .expect("tbl present"); + let stats = table.statistics().expect("statistics() returned None"); + match stats.total_byte_size { + Precision::Exact(0) | Precision::Inexact(0) => {}, + other => panic!("expected zero bytes for empty table, got {:?}", other), + } + Ok(()) +}