Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::catalog::{Session, TableProvider};
use datafusion::common::Statistics;
use datafusion::common::stats::Precision;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
Expand Down Expand Up @@ -539,6 +541,46 @@ impl TableProvider for DuckLakeTable {
TableType::Base
}

fn statistics(&self) -> Option<Statistics> {
// Aggregate per-file byte sizes from the cached `table_files`. Mirrors
// DuckLake's own `ducklake_table_info` aggregate exactly:
//
// total_byte_size == SUM(data_file.file_size_bytes)
// - SUM(delete_file.file_size_bytes)
//
// The values come from the ducklake catalog, so this is the same
// source of truth `ducklake_table_info` uses — no extra round trips
// and the numbers will match byte-for-byte.
//
// Marked `Precision::Inexact` because DataFusion documents
// `total_byte_size` as the *uncompressed Arrow output* size, while
// the catalog tracks *compressed parquet* bytes. For wide
// column types (List(Float64) embeddings) the two are nearly
// identical; for narrow scalar schemas the on-disk number is 3-5x
// smaller than Arrow output. Reporting compressed bytes Inexact
// gives consumers a useful lower-bound estimate without misleading
// the optimiser into thinking it's exact Arrow size. When
// `record_count` is plumbed through `DuckLakeFileData`, a follow-up
// can populate `num_rows` and use `calculate_total_byte_size` for a
// closer Arrow-side estimate.
let data_bytes: i64 = self
.table_files
.iter()
.map(|f| f.file.file_size_bytes)
.sum();
let delete_bytes: i64 = self
.table_files
.iter()
.filter_map(|f| f.delete_file.as_ref())
.map(|df| df.file_size_bytes)
.sum();
let net_bytes = (data_bytes - delete_bytes).max(0) as usize;

let mut stats = Statistics::new_unknown(&self.schema);
stats.total_byte_size = Precision::Inexact(net_bytes);
Some(stats)
}

fn supports_filters_pushdown(
&self,
filters: &[&Expr],
Expand Down
121 changes: 121 additions & 0 deletions tests/table_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
use std::sync::Arc;

use arrow::array::Int64Array;
use datafusion::common::stats::Precision;
use datafusion::error::Result as DataFusionResult;
use datafusion::prelude::*;
use datafusion_ducklake::metadata_provider::MetadataProvider;
use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider};
use tempfile::TempDir;

Expand Down Expand Up @@ -157,3 +159,122 @@ async fn test_empty_table_aggregate() -> DataFusionResult<()> {

Ok(())
}

/// Creates a catalog with a table populated with rows so DuckLake writes a real
/// data file. Used to validate that `DuckLakeTable::statistics()` agrees with
/// the catalog's per-file sizes.
fn create_populated_table_catalog(catalog_path: &std::path::Path) -> anyhow::Result<()> {
let conn = duckdb::Connection::open_in_memory()?;
conn.execute("INSTALL ducklake;", [])?;
conn.execute("LOAD ducklake;", [])?;

let data_dir = catalog_path.with_extension("ducklake.files");
std::fs::create_dir_all(&data_dir)?;

let ducklake_path = format!("ducklake:{}", catalog_path.display());
conn.execute(&format!("ATTACH '{}' AS test_catalog;", ducklake_path), [])?;
conn.execute("CREATE TABLE test_catalog.tbl (a INTEGER, b VARCHAR);", [])?;
// Insert enough rows that DuckLake actually emits a data file.
conn.execute(
"INSERT INTO test_catalog.tbl SELECT i, repeat('x', 100) FROM range(0, 1000) t(i);",
[],
)?;
Ok(())
}

/// Validates that `DuckLakeTable::statistics()` returns the same byte total as
/// directly summing `file_size_bytes - delete_file_size_bytes` from the
/// catalog's per-file metadata — i.e. the same aggregate `ducklake_table_info`
/// produces.
#[tokio::test]
async fn test_statistics_total_byte_size_matches_catalog_aggregate() -> DataFusionResult<()> {
let temp_dir =
TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
let catalog_path = temp_dir.path().join("stats.ducklake");
create_populated_table_catalog(&catalog_path)
.map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;

let catalog = create_catalog(&catalog_path.to_string_lossy())?;
let schema = datafusion::catalog::CatalogProvider::schema(catalog.as_ref(), "main")
.expect("main schema exists");
let table = schema
.table("tbl")
.await
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?
.expect("tbl present in main schema");

// What our impl reports.
let stats = table.statistics().expect("statistics() returned None");
let our_bytes = match stats.total_byte_size {
Precision::Exact(b) | Precision::Inexact(b) => b as i64,
Precision::Absent => {
panic!("total_byte_size was Absent for a populated table")
},
};

// Canonical: sum file_size_bytes - delete_file_size_bytes directly from
// the catalog at the latest snapshot. Same aggregate ducklake_table_info
// computes; same source rows our statistics() impl reads.
let provider = DuckdbMetadataProvider::new(catalog_path.to_string_lossy().to_string())
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
let snapshot_id = provider
.get_current_snapshot()
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
let schema_meta = provider
.get_schema_by_name("main", snapshot_id)
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?
.expect("main schema metadata");
let table_meta = provider
.get_table_by_name(schema_meta.schema_id, "tbl", snapshot_id)
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?
.expect("tbl metadata");
let files = provider
.get_table_files_for_select(table_meta.table_id, snapshot_id)
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
let canonical_bytes: i64 = files
.iter()
.map(|f| {
let data = f.file.file_size_bytes;
let dels = f.delete_file.as_ref().map_or(0, |d| d.file_size_bytes);
data - dels
})
.sum();

assert!(
our_bytes > 0,
"expected populated table to report non-zero bytes, got {}",
our_bytes
);
assert_eq!(
our_bytes, canonical_bytes,
"statistics().total_byte_size must equal SUM(file_size) - SUM(delete_file_size) \
from the catalog (our_bytes={}, canonical={})",
our_bytes, canonical_bytes
);

// Hold temp_dir to outlive the catalog handle.
std::mem::forget(temp_dir);
Ok(())
}

/// Empty tables — no data files yet — should still return Statistics with
/// total_byte_size == 0, not Absent or None.
#[tokio::test]
async fn test_statistics_zero_for_empty_table() -> DataFusionResult<()> {
let ctx = setup_empty_table_context("stats_empty").await?;
let cat = ctx
.catalog("ducklake")
.expect("ducklake catalog registered");
let schema = cat.schema("main").expect("main schema");
let table = schema
.table("tbl")
.await
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?
.expect("tbl present");
let stats = table.statistics().expect("statistics() returned None");
match stats.total_byte_size {
Precision::Exact(0) | Precision::Inexact(0) => {},
other => panic!("expected zero bytes for empty table, got {:?}", other),
}
Ok(())
}
Loading