diff --git a/kernel/examples/inspect-table/src/main.rs b/kernel/examples/inspect-table/src/main.rs index 67a651ae1..b2df69e3e 100644 --- a/kernel/examples/inspect-table/src/main.rs +++ b/kernel/examples/inspect-table/src/main.rs @@ -1,17 +1,17 @@ use common::LocationArgs; use delta_kernel::actions::visitors::{ - visit_metadata_at, visit_protocol_at, AddVisitor, CdcVisitor, RemoveVisitor, - SetTransactionVisitor, + visit_metadata_at, visit_protocol_at, AddVisitor, CdcVisitor, DomainMetadataVisitor, + RemoveVisitor, SetTransactionVisitor, }; use delta_kernel::actions::{ - get_log_schema, ADD_NAME, CDC_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, - SET_TRANSACTION_NAME, + get_log_schema, ADD_NAME, CDC_NAME, DOMAIN_METADATA_NAME, METADATA_NAME, PROTOCOL_NAME, + REMOVE_NAME, SET_TRANSACTION_NAME, }; use delta_kernel::engine_data::{GetData, RowVisitor, TypedGetData as _}; use delta_kernel::expressions::ColumnName; use delta_kernel::scan::state::{DvInfo, Stats}; use delta_kernel::scan::ScanBuilder; -use delta_kernel::schema::{ColumnNamesAndTypes, DataType}; +use delta_kernel::schema::{ColumnNamesAndTypes, DataType, Schema, SchemaRef}; use delta_kernel::{DeltaResult, Error, ExpressionRef, Snapshot}; use std::collections::HashMap; @@ -46,6 +46,8 @@ enum Commands { /// Show the log in reverse order (default is log replay order -- newest first) #[arg(short, long)] oldest_first: bool, + #[arg(short, long)] + stats_only: bool, }, } @@ -67,19 +69,29 @@ enum Action { Add(delta_kernel::actions::Add), SetTransaction(delta_kernel::actions::SetTransaction), Cdc(delta_kernel::actions::Cdc), + DomainMetadata(delta_kernel::actions::DomainMetadata), } +fn custom_log_schema() -> SchemaRef { + let fields = get_log_schema() + .fields() + .filter(|f| f.name() != "commitInfo") + .cloned(); + Schema::try_new(fields).unwrap().into() +} static NAMES_AND_TYPES: LazyLock = - LazyLock::new(|| get_log_schema().leaves(None)); + LazyLock::new(|| custom_log_schema().leaves(None)); struct LogVisitor { actions: Vec<(Action, usize)>, + stats: HashMap<&'static str, usize>, offsets: HashMap, previous_rows_seen: usize, + stats_only: bool, } impl LogVisitor { - fn new() -> LogVisitor { + fn new(stats_only: bool) -> LogVisitor { // Grab the start offset for each top-level column name, then compute the end offset by // skipping the rest of the leaves for that column. let mut offsets = HashMap::new(); @@ -93,8 +105,10 @@ impl LogVisitor { } LogVisitor { actions: vec![], + stats: HashMap::new(), offsets, previous_rows_seen: 0, + stats_only, } } } @@ -104,7 +118,7 @@ impl RowVisitor for LogVisitor { NAMES_AND_TYPES.as_ref() } fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { - if getters.len() != 55 { + if getters.len() != 59 { return Err(Error::InternalError(format!( "Wrong number of LogVisitor getters: {}", getters.len() @@ -116,34 +130,47 @@ impl RowVisitor for LogVisitor { let (protocol_start, protocol_end) = self.offsets[PROTOCOL_NAME]; let (txn_start, txn_end) = self.offsets[SET_TRANSACTION_NAME]; let (cdc_start, cdc_end) = self.offsets[CDC_NAME]; + let (dm_start, dm_end) = self.offsets[DOMAIN_METADATA_NAME]; + for i in 0..row_count { - let action = if let Some(path) = getters[add_start].get_opt(i, "add.path")? { + let (action, name) = if let Some(path) = getters[add_start].get_opt(i, "add.path")? { let add = AddVisitor::visit_add(i, path, &getters[add_start..add_end])?; - Action::Add(add) + (Action::Add(add), "add") } else if let Some(path) = getters[remove_start].get_opt(i, "remove.path")? { let remove = RemoveVisitor::visit_remove(i, path, &getters[remove_start..remove_end])?; - Action::Remove(remove) + (Action::Remove(remove), "remove") } else if let Some(metadata) = visit_metadata_at(i, &getters[metadata_start..metadata_end])? { - Action::Metadata(metadata) + (Action::Metadata(metadata), "metadata") } else if let Some(protocol) = visit_protocol_at(i, &getters[protocol_start..protocol_end])? { - Action::Protocol(protocol) + (Action::Protocol(protocol), "protocol") } else if let Some(app_id) = getters[txn_start].get_opt(i, "txn.appId")? { let txn = SetTransactionVisitor::visit_txn(i, app_id, &getters[txn_start..txn_end])?; - Action::SetTransaction(txn) + (Action::SetTransaction(txn), "setTransaction") } else if let Some(path) = getters[cdc_start].get_opt(i, "cdc.path")? { let cdc = CdcVisitor::visit_cdc(i, path, &getters[cdc_start..cdc_end])?; - Action::Cdc(cdc) + (Action::Cdc(cdc), "cdc") + } else if let Some(domain) = getters[dm_start].get_opt(i, "domainMetadata.name")? { + let dm = DomainMetadataVisitor::visit_domain_metadata( + i, + domain, + &getters[dm_start..dm_end], + )?; + (Action::DomainMetadata(dm), "domainMetadata") } else { // TODO: Add CommitInfo support (tricky because all fields are optional) continue; }; - self.actions.push((action, self.previous_rows_seen + i)); + + *self.stats.entry(name).or_default() += 1; + if !self.stats_only { + self.actions.push((action, self.previous_rows_seen + i)); + } } self.previous_rows_seen += row_count; Ok(()) @@ -202,8 +229,11 @@ fn try_main() -> DeltaResult<()> { scan_metadata.visit_scan_files((), print_scan_file)?; } } - Commands::Actions { oldest_first } => { - let log_schema = get_log_schema(); + Commands::Actions { + oldest_first, + stats_only, + } => { + let log_schema = custom_log_schema(); let actions = snapshot.log_segment().read_actions( &engine, log_schema.clone(), @@ -211,11 +241,21 @@ fn try_main() -> DeltaResult<()> { None, )?; - let mut visitor = LogVisitor::new(); + let mut visitor = LogVisitor::new(stats_only); for action in actions { visitor.visit_rows_of(action?.actions())?; } + let mut stats: Vec<_> = visitor + .stats + .iter() + .map(|(&name, &count)| (count, name)) + .collect(); + stats.sort_by(|(count1, _), (count2, _)| count2.cmp(count1)); + for (count, name) in stats { + println!("Found {count} '{name}' actions"); + } + if oldest_first { visitor.actions.reverse(); } @@ -227,6 +267,7 @@ fn try_main() -> DeltaResult<()> { Action::Add(a) => println!("\nAction {row}:\n{a:#?}"), Action::SetTransaction(t) => println!("\nAction {row}:\n{t:#?}"), Action::Cdc(c) => println!("\nAction {row}:\n{c:#?}"), + Action::DomainMetadata(d) => println!("\n Action {row}:\n{d:#?}"), } } } diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index f10ecf1de..4ea5da970 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -430,6 +430,7 @@ impl RowVisitor for SidecarVisitor { /// included to only retain the domain metadata for a specific domain (in order to bound memory /// requirements). #[derive(Debug, Default)] +#[internal_api] pub(crate) struct DomainMetadataVisitor { domain_metadatas: DomainMetadataMap, domain_filter: Option, @@ -444,6 +445,7 @@ impl DomainMetadataVisitor { } } + #[internal_api] pub(crate) fn visit_domain_metadata<'a>( row_index: usize, domain: String, diff --git a/kernel/src/engine/arrow_conversion.rs b/kernel/src/engine/arrow_conversion.rs index 42049d26a..42f5db6ce 100644 --- a/kernel/src/engine/arrow_conversion.rs +++ b/kernel/src/engine/arrow_conversion.rs @@ -12,6 +12,7 @@ use itertools::Itertools; use crate::error::Error; use crate::schema::{ ArrayType, DataType, MapType, MetadataValue, PrimitiveType, StructField, StructType, + UNSHREDDED_VARIANT_SCHEMA, }; pub(crate) const LIST_ARRAY_ROOT: &str = "element"; @@ -162,7 +163,7 @@ impl TryFromKernel<&DataType> for ArrowDataType { false, )), DataType::Variant(s) => { - if *t == DataType::unshredded_variant() { + if *t == *UNSHREDDED_VARIANT_SCHEMA { Ok(ArrowDataType::Struct( s.fields() .map(TryIntoArrow::try_into_arrow) @@ -326,11 +327,10 @@ mod tests { #[test] fn test_variant_shredded_type_fail() -> DeltaResult<()> { - let unshredded_variant = DataType::unshredded_variant(); - let unshredded_variant_arrow = ArrowDataType::try_from_kernel(&unshredded_variant)?; + let unshredded_variant_arrow = ArrowDataType::try_from_kernel(&UNSHREDDED_VARIANT_SCHEMA)?; assert!(unshredded_variant_arrow == unshredded_variant_arrow_type()); let shredded_variant = DataType::variant_type([ - StructField::nullable("metadata", DataType::BINARY), + StructField::not_null("metadata", DataType::BINARY), StructField::nullable("value", DataType::BINARY), StructField::nullable("typed_value", DataType::INTEGER), ])?; diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index adb41e75b..f37f3e09a 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -7,7 +7,7 @@ use std::sync::{Arc, OnceLock}; use crate::engine::arrow_conversion::{TryFromKernel as _, TryIntoArrow as _}; use crate::engine::ensure_data_types::DataTypeCompat; -use crate::schema::{ColumnMetadataKey, MetadataValue}; +use crate::schema::{ColumnMetadataKey, MetadataValue, UNSHREDDED_VARIANT_SCHEMA}; use crate::{ engine::arrow_data::ArrowEngineData, schema::{DataType, MetadataColumnSpec, Schema, SchemaRef, StructField, StructType}, @@ -429,7 +429,7 @@ fn get_indices( { // If the field is a variant, make sure the parquet schema matches the unshredded variant // representation. This is to ensure that shredded reads are not performed. - if requested_field.data_type == DataType::unshredded_variant() { + if requested_field.data_type == *UNSHREDDED_VARIANT_SCHEMA { validate_parquet_variant(field)?; } match field.data_type() { diff --git a/kernel/src/engine/ensure_data_types.rs b/kernel/src/engine/ensure_data_types.rs index 2d5a660c3..00f85aabe 100644 --- a/kernel/src/engine/ensure_data_types.rs +++ b/kernel/src/engine/ensure_data_types.rs @@ -269,7 +269,7 @@ mod tests { use crate::engine::arrow_conversion::TryFromKernel as _; use crate::engine::arrow_data::unshredded_variant_arrow_type; - use crate::schema::{ArrayType, DataType, MapType, StructField}; + use crate::schema::{ArrayType, DataType, MapType, StructField, UNSHREDDED_VARIANT_SCHEMA}; use crate::utils::test_utils::assert_result_error_with_message; use super::*; @@ -341,14 +341,14 @@ mod tests { } assert!(ensure_data_types( - &DataType::unshredded_variant(), + &UNSHREDDED_VARIANT_SCHEMA, &unshredded_variant_arrow_type(), true ) .is_ok()); assert_result_error_with_message( ensure_data_types( - &DataType::unshredded_variant(), + &UNSHREDDED_VARIANT_SCHEMA, &incorrect_variant_arrow_type(), true, ), diff --git a/kernel/src/engine/parquet_row_group_skipping/tests.rs b/kernel/src/engine/parquet_row_group_skipping/tests.rs index d14764eb1..bca716791 100644 --- a/kernel/src/engine/parquet_row_group_skipping/tests.rs +++ b/kernel/src/engine/parquet_row_group_skipping/tests.rs @@ -2,6 +2,7 @@ use super::*; use crate::expressions::{column_name, column_pred}; use crate::kernel_predicates::DataSkippingPredicateEvaluator as _; use crate::parquet::arrow::arrow_reader::ArrowReaderMetadata; +use crate::schema::UNSHREDDED_VARIANT_SCHEMA; use crate::Predicate; use std::fs::File; @@ -214,10 +215,7 @@ fn test_get_stat_values() { // Read a random column as Variant. The actual read does not need to be performed, as stats on // Variant should always return None. assert_eq!( - filter.get_min_stat( - &column_name!("chrono.date32"), - &DataType::unshredded_variant() - ), + filter.get_min_stat(&column_name!("chrono.date32"), &UNSHREDDED_VARIANT_SCHEMA,), None ); @@ -396,10 +394,7 @@ fn test_get_stat_values() { // Read a random column as Variant. The actual read does not need to be performed, as stats on // Variant should always return None. assert_eq!( - filter.get_max_stat( - &column_name!("chrono.date32"), - &DataType::unshredded_variant() - ), + filter.get_max_stat(&column_name!("chrono.date32"), &UNSHREDDED_VARIANT_SCHEMA,), None ); diff --git a/kernel/src/schema/mod.rs b/kernel/src/schema/mod.rs index 8c1c4302a..a6c86a7cc 100644 --- a/kernel/src/schema/mod.rs +++ b/kernel/src/schema/mod.rs @@ -1324,6 +1324,15 @@ impl From for DataType { } } +/// A static reference to the canonical unshredded variant schema. Equivalent to +/// [`DataType::unshredded_variant()`], but static. +pub static UNSHREDDED_VARIANT_SCHEMA: LazyLock = LazyLock::new(|| { + DataType::Variant(Box::new(StructType::new_unchecked([ + StructField::not_null("metadata", DataType::BINARY), + StructField::not_null("value", DataType::BINARY), + ]))) +}); + /// cbindgen:ignore impl DataType { pub const STRING: Self = DataType::Primitive(PrimitiveType::String); @@ -1361,13 +1370,13 @@ impl DataType { StructType::new_unchecked(fields).into() } - /// Create a new unshredded [`DataType::Variant`]. This data type is a struct of two not-null + /// Creates a new unshredded [`DataType::Variant`]. This data type is a struct of two not-null /// binary fields: `metadata` and `value`. + /// + /// NOTE: Callers who only need a borrowed reference can avoid allocations by using + /// [`UNSHREDDED_VARIANT_SCHEMA`] instead. pub fn unshredded_variant() -> Self { - DataType::Variant(Box::new(StructType::new_unchecked([ - StructField::not_null("metadata", DataType::BINARY), - StructField::not_null("value", DataType::BINARY), - ]))) + UNSHREDDED_VARIANT_SCHEMA.clone() } /// Create a new [`DataType::Variant`] from the provided fields. For unshredded variants, you @@ -1822,7 +1831,7 @@ mod tests { } "#; let field: StructField = serde_json::from_str(data).unwrap(); - assert_eq!(field.data_type, DataType::unshredded_variant()); + assert_eq!(field.data_type, *UNSHREDDED_VARIANT_SCHEMA); let json_str = serde_json::to_string(&field).unwrap(); assert_eq!( diff --git a/kernel/src/schema/variant_utils.rs b/kernel/src/schema/variant_utils.rs index c9bb88bd8..9b5ba2a61 100644 --- a/kernel/src/schema/variant_utils.rs +++ b/kernel/src/schema/variant_utils.rs @@ -45,14 +45,14 @@ pub(crate) fn validate_variant_type_feature_support( mod tests { use super::*; use crate::actions::Protocol; - use crate::schema::{DataType, StructField, StructType}; + use crate::schema::{DataType, StructField, StructType, UNSHREDDED_VARIANT_SCHEMA}; use crate::table_features::{ReaderFeature, WriterFeature}; use crate::utils::test_utils::assert_result_error_with_message; #[test] fn test_is_unshredded_variant() { fn is_unshredded_variant(s: &DataType) -> bool { - s == &DataType::unshredded_variant() + *s == *UNSHREDDED_VARIANT_SCHEMA } assert!(!is_unshredded_variant( &DataType::variant_type([ diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index a46dc5f1b..d7f94f765 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -29,7 +29,9 @@ use serde_json::json; use serde_json::Deserializer; use tempfile::tempdir; -use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType}; +use delta_kernel::schema::{ + DataType, SchemaRef, StructField, StructType, UNSHREDDED_VARIANT_SCHEMA, +}; use test_utils::{ assert_result_error_with_message, create_table, engine_store_setup, setup_test_tables, @@ -951,7 +953,7 @@ async fn test_append_variant() -> Result<(), Box> { let value_nested_v_array = Arc::new(BinaryArray::from(value_nested_v)) as ArrayRef; let metadata_nested_v_array = Arc::new(BinaryArray::from(metadata_nested_v)) as ArrayRef; - let variant_arrow = ArrowDataType::try_from_kernel(&DataType::unshredded_variant()).unwrap(); + let variant_arrow = ArrowDataType::try_from_kernel(&UNSHREDDED_VARIANT_SCHEMA).unwrap(); let variant_arrow_flipped = variant_arrow_type_flipped(); let i_values = vec![31, 32, 33]; @@ -1064,7 +1066,7 @@ async fn test_append_variant() -> Result<(), Box> { Some(null_bitmap), )?); let variant_arrow_type: ArrowDataType = - ArrowDataType::try_from_kernel(&DataType::unshredded_variant()).unwrap(); + ArrowDataType::try_from_kernel(&UNSHREDDED_VARIANT_SCHEMA).unwrap(); let expected_data = RecordBatch::try_new( Arc::new(expected_schema.as_ref().try_into_arrow()?), vec![