Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub enum ErrorCode {
DatasetListKeyValues = 44,
DatasetListIndices = 45,
DatasetCreateScalarIndex = 46,
DatasetCalculateDataStats = 47,
}

struct LastError {
Expand Down
153 changes: 153 additions & 0 deletions rust/ffi/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::ptr;
use std::sync::Arc;

use datafusion_sql::unparser::expr_to_sql;
use lance::dataset::statistics::DatasetStatisticsExt;
use lance::dataset::builder::DatasetBuilder;
use lance::Dataset;

Expand All @@ -13,6 +14,23 @@ use crate::runtime;
use super::types::DatasetHandle;
use super::util::{cstr_to_str, parse_optional_filter_ir, slice_from_ptr, FfiError, FfiResult};

#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct LanceFieldStats {
pub field_id: u32,
pub bytes_on_disk: u64,
}

#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct LanceFragmentStats {
pub fragment_id: u64,
/// Number of rows in the fragment. `-1` means unknown.
pub num_rows: i64,
/// Sum of known data file sizes in bytes. Missing/unknown sizes are treated as 0.
pub bytes_on_disk: u64,
}

#[no_mangle]
pub unsafe extern "C" fn lance_open_dataset(path: *const c_char) -> *mut c_void {
match open_dataset_inner(path) {
Expand Down Expand Up @@ -219,6 +237,62 @@ fn dataset_list_fragments_inner(dataset: *mut c_void, out_len: *mut usize) -> Ff
Ok(data)
}

#[no_mangle]
pub unsafe extern "C" fn lance_dataset_list_fragment_stats(
dataset: *mut c_void,
out_len: *mut usize,
) -> *mut LanceFragmentStats {
match dataset_list_fragment_stats_inner(dataset, out_len) {
Ok(ptr) => {
clear_last_error();
ptr
}
Err(err) => {
set_last_error(err.code, err.message);
ptr::null_mut()
}
}
}

fn dataset_list_fragment_stats_inner(
dataset: *mut c_void,
out_len: *mut usize,
) -> FfiResult<*mut LanceFragmentStats> {
if out_len.is_null() {
return Err(FfiError::new(ErrorCode::InvalidArgument, "out_len is null"));
}
let handle = unsafe { super::util::dataset_handle(dataset)? };

let mut out: Vec<LanceFragmentStats> = Vec::with_capacity(handle.dataset.fragments().len());
for frag in handle.dataset.fragments().iter() {
let mut bytes_on_disk = 0u64;
for file in frag.files.iter() {
if let Some(sz) = file.file_size_bytes.get() {
bytes_on_disk = bytes_on_disk.saturating_add(sz.get());
}
}
let num_rows = match frag.num_rows() {
Some(v) => i64::try_from(v).unwrap_or(-1),
None => -1,
};
out.push(LanceFragmentStats {
fragment_id: frag.id,
num_rows,
bytes_on_disk,
});
}

let mut boxed = out.into_boxed_slice();
let len = boxed.len();
let data = boxed.as_mut_ptr();
std::mem::forget(boxed);

unsafe {
std::ptr::write_unaligned(out_len, len);
}
Ok(data)
}

#[no_mangle]
pub unsafe extern "C" fn lance_free_fragment_list(ptr: *mut u64, len: usize) {
if ptr.is_null() {
Expand All @@ -230,6 +304,85 @@ pub unsafe extern "C" fn lance_free_fragment_list(ptr: *mut u64, len: usize) {
}
}

#[no_mangle]
pub unsafe extern "C" fn lance_free_fragment_stats_list(ptr: *mut LanceFragmentStats, len: usize) {
if ptr.is_null() {
return;
}
unsafe {
let slice = std::ptr::slice_from_raw_parts_mut(ptr, len);
let _ = Box::<[LanceFragmentStats]>::from_raw(slice);
}
}

#[no_mangle]
pub unsafe extern "C" fn lance_dataset_list_field_stats(
dataset: *mut c_void,
out_len: *mut usize,
) -> *mut LanceFieldStats {
match dataset_list_field_stats_inner(dataset, out_len) {
Ok(ptr) => {
clear_last_error();
ptr
}
Err(err) => {
set_last_error(err.code, err.message);
ptr::null_mut()
}
}
}

fn dataset_list_field_stats_inner(
dataset: *mut c_void,
out_len: *mut usize,
) -> FfiResult<*mut LanceFieldStats> {
if out_len.is_null() {
return Err(FfiError::new(ErrorCode::InvalidArgument, "out_len is null"));
}

let handle = unsafe { super::util::dataset_handle(dataset)? };

let stats = match runtime::block_on(handle.dataset.calculate_data_stats()) {
Ok(Ok(stats)) => stats,
Ok(Err(err)) => {
return Err(FfiError::new(
ErrorCode::DatasetCalculateDataStats,
format!("dataset calculate_data_stats: {err}"),
))
}
Err(err) => return Err(FfiError::new(ErrorCode::Runtime, format!("runtime: {err}"))),
};

let mut out: Vec<LanceFieldStats> = Vec::with_capacity(stats.fields.len());
for field in stats.fields {
out.push(LanceFieldStats {
field_id: field.id,
bytes_on_disk: field.bytes_on_disk,
});
}

let mut boxed = out.into_boxed_slice();
let len = boxed.len();
let data = boxed.as_mut_ptr();
std::mem::forget(boxed);

unsafe {
std::ptr::write_unaligned(out_len, len);
}
Ok(data)
}

#[no_mangle]
pub unsafe extern "C" fn lance_free_field_stats_list(ptr: *mut LanceFieldStats, len: usize) {
if ptr.is_null() {
return;
}
unsafe {
let slice = std::ptr::slice_from_raw_parts_mut(ptr, len);
let _ = Box::<[LanceFieldStats]>::from_raw(slice);
}
}

#[no_mangle]
pub unsafe extern "C" fn lance_dataset_delete(
dataset: *mut c_void,
Expand Down
17 changes: 17 additions & 0 deletions src/include/lance_ffi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,23 @@ int32_t lance_dataset_create_scalar_index(void *dataset, const char *column,

uint64_t *lance_dataset_list_fragments(void *dataset, size_t *out_len);
void lance_free_fragment_list(uint64_t *ptr, size_t len);
typedef struct LanceFieldStats {
uint32_t field_id;
uint64_t bytes_on_disk;
} LanceFieldStats;

typedef struct LanceFragmentStats {
uint64_t fragment_id;
int64_t num_rows;
uint64_t bytes_on_disk;
} LanceFragmentStats;

LanceFragmentStats *lance_dataset_list_fragment_stats(void *dataset,
size_t *out_len);
void lance_free_fragment_stats_list(LanceFragmentStats *ptr, size_t len);

LanceFieldStats *lance_dataset_list_field_stats(void *dataset, size_t *out_len);
void lance_free_field_stats_list(LanceFieldStats *ptr, size_t len);
void *lance_create_fragment_stream_ir(void *dataset, uint64_t fragment_id,
const char **columns, size_t columns_len,
const uint8_t *filter_ir,
Expand Down
65 changes: 65 additions & 0 deletions src/lance_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,62 @@
// not call `release` unless the caller initialized them to a valid value.
namespace duckdb {

static unique_ptr<BaseStatistics>
LanceScanStatistics(ClientContext &context, const FunctionData *bind_data_p,
column_t column_id) {
(void)context;
if (!bind_data_p) {
return nullptr;
}
auto &bind_data = bind_data_p->Cast<LanceScanBindData>();
if (column_id >= bind_data.types.size()) {
return nullptr;
}
return BaseStatistics::CreateUnknown(bind_data.types[column_id]).ToUnique();
}

static unique_ptr<NodeStatistics>
LanceScanCardinality(ClientContext &context, const FunctionData *bind_data_p) {
(void)context;
if (!bind_data_p) {
return nullptr;
}
auto &bind_data = bind_data_p->Cast<LanceScanBindData>();
if (!bind_data.dataset) {
return nullptr;
}
auto rows = lance_dataset_count_rows(bind_data.dataset);
if (rows < 0) {
return nullptr;
}
auto count = NumericCast<idx_t>(rows);
return make_uniq<NodeStatistics>(count, count);
}

static vector<PartitionStatistics>
LanceScanGetPartitionStats(ClientContext &context,
GetPartitionStatsInput &input) {
(void)context;
if (!input.bind_data) {
return {};
}
auto &bind_data = input.bind_data->Cast<LanceScanBindData>();
if (!bind_data.dataset) {
return {};
}
auto rows = lance_dataset_count_rows(bind_data.dataset);
if (rows < 0) {
return {};
}
PartitionStatistics stats;
stats.row_start = 0;
stats.count = NumericCast<idx_t>(rows);
stats.count_type = CountType::COUNT_EXACT;
vector<PartitionStatistics> out;
out.push_back(stats);
return out;
}

LanceScanBindData::~LanceScanBindData() {
if (dataset) {
lance_close_dataset(dataset);
Expand Down Expand Up @@ -911,6 +967,9 @@ static TableFunction LanceTableScanFunction() {
function.projection_pushdown = true;
function.filter_pushdown = true;
function.filter_prune = true;
function.statistics = LanceScanStatistics;
function.cardinality = LanceScanCardinality;
function.get_partition_stats = LanceScanGetPartitionStats;
function.supports_pushdown_type = LanceSupportsPushdownType;
function.pushdown_complex_filter = LancePushdownComplexFilter;
function.to_string = LanceScanToString;
Expand Down Expand Up @@ -1345,6 +1404,9 @@ void RegisterLanceScan(ExtensionLoader &loader) {
lance_scan.projection_pushdown = true;
lance_scan.filter_pushdown = true;
lance_scan.filter_prune = true;
lance_scan.statistics = LanceScanStatistics;
lance_scan.cardinality = LanceScanCardinality;
lance_scan.get_partition_stats = LanceScanGetPartitionStats;
lance_scan.supports_pushdown_type = LanceSupportsPushdownType;
lance_scan.pushdown_complex_filter = LancePushdownComplexFilter;
lance_scan.to_string = LanceScanToString;
Expand All @@ -1365,6 +1427,9 @@ void RegisterLanceScan(ExtensionLoader &loader) {
internal_namespace_scan.projection_pushdown = true;
internal_namespace_scan.filter_pushdown = true;
internal_namespace_scan.filter_prune = true;
internal_namespace_scan.statistics = LanceScanStatistics;
internal_namespace_scan.cardinality = LanceScanCardinality;
internal_namespace_scan.get_partition_stats = LanceScanGetPartitionStats;
internal_namespace_scan.supports_pushdown_type = LanceSupportsPushdownType;
internal_namespace_scan.pushdown_complex_filter = LancePushdownComplexFilter;
internal_namespace_scan.to_string = LanceScanToString;
Expand Down
17 changes: 17 additions & 0 deletions test/sql/statistics_cardinality.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# name: test/sql/statistics_cardinality.test
# description: Optimizer statistics hooks (cardinality/partition stats)
# group: [sql]

require lance

# Estimated cardinality should reflect dataset row count
query II
EXPLAIN (FORMAT JSON) SELECT * FROM 'test/data/test_data.lance';
----
physical_plan <REGEX>:[\s\S]*"Estimated Cardinality": "5"[\s\S]*

# COUNT(*) should be rewritten using exact partition stats (no scan needed)
query II
EXPLAIN (FORMAT JSON) SELECT count(*) FROM 'test/data/test_data.lance';
----
physical_plan <REGEX>:[\s\S]*"name": "COLUMN_DATA_SCAN"[\s\S]*